3장. 스파크 기능 둘러보기¶
이 장에서는 다음과 같은 내용을 설명합니다.
- spark-submit 명령으로 운영용 어플리케이션 실행
- Dataset: 타입 안정성(typr-safe, 타입 세이프)를 제공하는 구조적 API
- 구조적 스트리밍
- 머신 러닝과 고급 분석
- RDD : 스파크의 저수준 API
- SparkR
- 서드파티 패키지 에코 시스템
3.1. 운영용 애플리케이션 실행하기¶
스파크를 사용하면 빅데이터 프로그램을 쉽게 개발할 수 있습니다.spark-submit
명령을 사용해 대화형 셸에서 개발한 프로그램을 운영용 애플리케이션으로 쉽게 전환할 수 있습니다.
spark-submit
명령은 애플리케이션 코드를 클러스터에 전송해 실행시키는 역할을 합니다.- 실행에 필요한 자원과 실행방식, 다양한 옵션을 지정할 수 있습니다.
- Spark 어플리케이션은 스탠드얼론, Mesos, YARN 클러스터 매니저를 이용해 실행됩니다.
사용자는 Spark 가 지원하는 프로그래밍 언어로 어플리케이션을 개발한 다음 실행할 수 있다. 가장 간단한 방법은 로컬 머신에서 어플리케이션을 실행하는 겁니다. Spark 어플리케이션 예제를 실행해보겠다.
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.12-3.1.2.jar 10
위 스칼라 어플리케이션 예제는 π값을 계산합니다.
다음은 파이썬으로 작성한 애플리케이션을 실행하는 예제이다.
./bin/spark-submit --master local examples/src/main/python/pi.py 10
spark-submit
명령 중 master 옵션의 인수를 변경하면 스파크가 지원하는 스파크 스탠드얼론, 메소스, YARN 에서 동일한 애플리케이션을 실행할 수 있다.
3.2. Dataset: 타입 안정성을 제공하는 구조적 API¶
Dataset은 자바와 스칼라의 정적 데이터 타입에 맞는 코드, 즉, 정적 타입 코드(Statically Typed Code)를 지원하기 위해 고안된 Spark 의 구조적 API 이다. Dataset은 타입 안정성을 지원하고, 동적타입 언어인 Python 과 R에서는 사용할 수 없다.
DataFrame은 다양한 데이터타입의 테이블형 데이터를 보관할 수 있는 Row 타입의 객체로 구성된 분산 컬렉션입니다. Dataset API 는 DataFrame의 레코드를 사용자가 Java나 스칼라로 정의한 클래스에 할당하고, 자바의 ArrayList 또는 스칼라의 Seq 객체 등의 고정 타입형 컬렉션으로 다룰 수 있는 기능을 제공한다.
Dataset API 는 타입 안정성을 지원하므로 초기화에 사용한 클래스 대신 다른 클래스를 사용해 접근할 수 없다.
Dataset 클래스는 내부 객체의 데이터타입을 매개변수로 사용합니다. 예를 들어
Dataset[Person]
은Person
클래스의 객체만 가질 수 있다. Spark 2.0 에서의 자바의 JavaBean 패턴과 스칼라의 케이스 클래스 유형으로 정의된 클래스를 지원한다.Dataset은 필요한 경우 선택적으로 사용할 수 있다. 예를 들어 아래 예제와 같이 데이터 타입을 정의하고 map, filter 함수를 사용할 수 있다. Spark 는 처리를 마치고 결과를 DataFrame으로 자동 변환해 반환합니다. 또한, Spark가 제공하는 여러 함수를 이용해 추가 작업을 할 수 있다.
아래 예제는 타입 안정성 함수와 DataFrame을 사용해 비즈니스 로직을 신속하게 작성하는 방법을 보여주는 예이다.
// in Scala
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
마지막으로 알아볼 Dataset의 장점은 collect
메소드나 take
메소드를 호출하면 DataFrame을 구성하는 Row 타입의 객체가 아닌 Dataset에 매개변수로 지정한 타입의 객체를 반환하는 겁니다.
따라서 코드의 변경 없이 타입 안정성을 보장할 수 있고, 로컬이나 분산 클러스터 환경에서 데이터를 안전하게 다룰 수 있다.
// in Scala
flights
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !="Canada")
.map(flight_row => flight_row)
.take(5)
flights.take(5)
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !="Canada")
.map(fr -> Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))
3.3. 구조적 스트리밍¶
- 구조적 스트리밍은 Spark 2.2 버전에서 안정화된 스트림 처리용 고수준 API이다. 구조적 스트리밍을 사용하면, 구조적 API 로 개발된 배치 모드의 연산을 스트리밍 방식으로 실행되며 지연시간을 줄이고 증분처리 할 수 있다.
- 구조적 스트리밍은 배치 처리용 코들 일부 수정해 스트리밍 처리를 수행하고 값을 빠르게 얻을 수 있다.
- 또한 프로토타입을 배치 잡으로 개발한 다음 스트리밍 잡으로 변환할 수 있으므로 개념 잡기 수월하다.
앞서 설명한 모든 작업은 데이터를 증분 처리해 수행된다.
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17...
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United Kin...
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850...
특정 고객이 대량으로 구매하는 영업 시간을 살펴보겠습니다. 총 구매비용 컬럼을 추가하고 고객이 가장 많이 소비한 날을 찾아보겠습니다.
윈도우 함수는 집계 시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성합니다. 윈도우는 간격을 통해 처리 요건을 명시할 수 있기 때문에 날짜와 타임스탬프 처리에 유용합니다. Spark 는 관련 날짜의 데이터를 그룹화합니다.
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("./data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema
staticSchema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))
지금은 시계열 데이터를 다루기 때문에 데이터를 그룹화하고 집계하는 방법을 알아보자. 이를 위해 특정고객(CustomerId로 구분한다.)이 대량으로 구매하는 영업시간을 살펴본다. 예를 들어 총 구매 비룔 컬럼을 추가하고 고객이 가장 많이 소비한 날을 찾아본다.
윈도우 함수는 집계시에 시계열 컬럼을 기준으로 각 날짜에 대한 전체 데이터를 가지는 윈도우를 구성한다. 윈도우는 간격을 통해 처리하는 처리 요건을 명시할 수 있기 때문에 날짜와 타임스탬프 처리에 유용하다. 스파크는 관련 날짜와 데이터를 그룹화한다.
from pyspark.sql.functions import window,col
staticDataFrame.selectExpr("CustomerID", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
.groupBy(col("CustomerID"), window(col("InvoiceDate"), "1 day")).sum("total_cost").show(5)
+----------+--------------------+-----------------+ |CustomerID| window| sum(total_cost)| +----------+--------------------+-----------------+ | 16057.0|{2011-12-05 09:00...| -37.6| | 14126.0|{2011-11-29 09:00...|643.6300000000001| | 13500.0|{2011-11-16 09:00...|497.9700000000001| | 17160.0|{2011-11-08 09:00...|516.8499999999999| | 15608.0|{2011-11-11 09:00...| 122.4| +----------+--------------------+-----------------+ only showing top 5 rows
스트리밍 코드는 그렇게 크게 바뀌지 않는다. 단지, read 메서드 대신 readStream 메소그를 사용하는 것이 가장 큰 차이이다.
- maxFilesPerTrigger 옵견으로 추가로 지정합니다. ➡️ 이 옵션을 사용해 한 번에 읽을 파일 수를 설정할 수 있다. ➡️ 운영 환경에서는 적용안하는 것을 권장
spark.conf.set("spark.sql.shuffle.partitions", "5") # 셔플 출력 파티션 수 줄이기
streamingDataFrame = spark.readStream.schema(staticSchema).option("maxFilesPerTrigger", 1).format("csv").option("header","true")\
.load("/data/retail-data/by-day/*.csv")# maxFilesPerTrigeer 옵션을 지정하면, 한 번에 읽을 파일 수를 설정할 수 있습니다.
# DataFrame 이 스트리밍 유형인지 확인
streamingDataFrame.isStreaming
# isStreaming 은 Dataset이 데이터를 연속적으로 전달하는 데이터 소스라면 true 를 반환한다.
True
purchaseByCustomerPerHour = \
streamingDataFrame.selectExpr("CustomerID", \
"(UnitPrice * Quantity) as total_cost", "InvoiceDate")\
.groupBy(col("CustomerID"), window(col("InvoiceDate"), "1 day")).sum("total_cost")
이 작업 역시 지연 연산(Lazy Operation) 이므로 데이터 플로를 실행하기 위해 스트리밍 액션을 호출해야한다.
스트리밍 액션은 어딘가에 데이터를 채워넣어야 하므로 count
메소드와 같은 일반적인 액션과 다른 특성을 가진다. 스트리밍 액션은 트리거(Trigger)가 실행된 다음 데이터가 갱신하게될 인메모리 테이블에 데이터를 저장한다.
아래 예제에서 파일마다 트리거를 실행한다. Spark는 집계값보다 더 큰 값이 발생한 경우에만 인메모리 테이블을 갱신해 언제나 큰 값을 얻을 수 있다.
purchaseByCustomerPerHour.writeStream.format("memory").queryName("customer_purchases").outputMode("complete").start()
<pyspark.sql.streaming.StreamingQuery at 0x115612b20>
# 스트림이 시작되면 쿼리 실행 결과가 어떠한 형태로 인메모리 테이블에 기록되는지 확인할 수 있다.
spark.sql("""
SELECT * FROM customer_purchases ORDER BY `sum(total_cost)` DESC
""").show(5)
+----------+------+---------------+ |CustomerID|window|sum(total_cost)| +----------+------+---------------+ +----------+------+---------------+
더 많은 테이블을 읽을수록 테이블 구성이 바뀌는것을 알 수 있습니다. 또한, 상황에 따라 처리 결과를 콘솔에 출력할 수 있습니다.
purchaseByCustomerPerHour.writeStream.format("console").queryName("customer_purchases_2").outputMode("complete").start()
<pyspark.sql.streaming.StreamingQuery at 0x115634250>
3.4. 머신러닝과 고급 분석¶
- MLlib : 내장된 머신러닝 알고리즘 라이브러리 ➡️ 대규모 머신러닝을 수행할 수 잏다.
- MLlib 을 사용하면 대용량 데이터를 대상으로 전처리, 멍잉, 모델 학습, 예측을 할 수 있습니다. 구조적 스트리밍에서 예측할 때도 MLlib에서 학습시킨 예측 모델을 사용할 수 있습니다.
멍잉(munging)은 전처리, 파싱, 필터링과 같이 데이터를 이리저리 핸들링한다는 뜻이다..사실 이 단어는 컴퓨터로 데이터를 처리하는 사람들 사이에서 많이 쓰이는 따끈따끈한 신조어이다.
Spark는 분류(Classification), 회귀(Regression), 군집화(Clustering) 그리고 딥러닝(Deep Learning) 까지 머신러닝과 관련된 API 를 제공합니다.
k-평균이라는 표준 알고리즘을 이용해 군집화를 수행해보자.
k-평균¶
데이터에서 k 개의 중심이 임의로 할당되는 군집화되는 군집화 알고리즘. 중심점에 가까운 점들을 군집에 할당하고, 할당된 점들의 중심을 계산합니다. 이 중심점을 센트로이드(Centroid)라 한다. 해당 센트로이드에 가까운 점들의 군집에 레이블이 지정하고, 새로 계산한 중심으로 센트로이드를 이동시킨다. 이 과정을 정해진 횟수나 수렴할 때까지 반복한다.
다음 예제는 원본 데이터를 올바른 포맷으로 만드는 트랜스포메이션을 정의하고, 모델을 학습한 뒤, 예측을 수행한다.
staticDataFrame.printSchema()
root |-- InvoiceNo: string (nullable = true) |-- StockCode: string (nullable = true) |-- Description: string (nullable = true) |-- Quantity: integer (nullable = true) |-- InvoiceDate: string (nullable = true) |-- UnitPrice: double (nullable = true) |-- CustomerID: double (nullable = true) |-- Country: string (nullable = true)
다음 예제는 원본 데이터를 올바른 포맷으로 만드는 트랜스포메이션을 정의하고 모델을 학습한 뒤, 예측을 수행한다.
MLlib의 머신러닝을 사용하기 위해 수치형 데이터가 필요하다. 예제의 데이터는 타임스탬프, 정수, 문자열 등 다양한 데이터 타입으로 이뤄져있으므로 수치형으로 변환해야한다. 다음은 몇 가지 DataFrame 트랜스포메이션을 사용해 날짜 데이터를 다루는 예제이다.
from pyspark.sql.functions import date_format, col
preparedDataFrame = staticDataFrame.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)
데이터는 학습 데이터셋과 테스트 데이터셋으로 분리해야한다. 예제에서는 특정 구매가 일어난 날짜를 기준으로 직접 분리한다. 또한, MLlib 의 트랜스포메이션 API(TransValidationSplit 이나 CrossValidator)를 사용해 학습 데이터셋과 테스트 데이터셋을 생성할 수 있습니다.
trainDataFrame =preparedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame =preparedDataFrame\
.where("InvoiceDate >= '2011-07-01'")
# 예제 데이터는 시계열 데이터셋으로 임의 날짜를 기준으로 데이터를 분리합니다.
trainDataFrame.count()
245903
testDataFrame.count()
296006
스파크 MLlib은 일반적인 트랜스포메이션을 자동화하는 다양한 트랜스포메이션을 제공합니다. 그 중에 하나는 바로 StringIndexer 이다.
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")
indexer
StringIndexer_daefbe08eb96
위 예제는 요일을 수치형으로 반환합니다. 예를 들어 토요일을 6, 월요일을 1로 반환한다. 하지만 이런 번호 지정 체계는 수치로 표현되어 암묵적으로 토요일이 월월요일보다 더 크다는 것을 의미하게 된다. 이것은 잘못된 방식이다. 이 문제점을 보완하기 위해 OneHotEncoder
를 사용해 각 값을 자체 컬럼으로 인코딩해야한다. 이렇게 하면 특정 요일이 해당요일인지 아닌지 bool 타입으로 나타내게 된다.
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")
# 위 예제 결과는 벡터타입을 구성한 컬럼 중 하나로 사용된다. Spark의 모든 머신러닝 알고리즘은 수치형 벡터 타입을 입력으로 사용한다.
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")
위 예제는 세 가지의 핵심 특징인 가격(UnitPrice), 수량(Quantity), 특정 날짜의 요일(day_of_week_encoded) 를 가지고 있습니다. 다음은 낮ㅇ에 입력값으로 들어올 데이터가 같은 프로세스를 거쳐 변환되도록 파이프라인을 설정하는 예제입니다.
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder,vectorAssembler ])
학습 준비 과정은 2 단계로 이뤄집니다. 우선 변환자(Transformer)를 데이터셋에 적합(Fit)시켜야합니다. 기본적으로 StringIndexer
는 인덱싱할 고윳값의 수를 알아야합니다. 고윳값의 수를 알 수 있다면 인코딩을 매우 쉽게 할 수 있지만, 만약 알 수 없는 컬럼에 있는 모든 고윳값을 조사하고 인덱싱해야합니다.
fittedPipeline = transformationPipeline.fit(trainDataFrame)
학습 데이터셋에 변환자를 적합시키고 나면 학습을 위한 맞춤 파이프라인이 준비됩니다. 그리고 이것을 사용해 일관되고 반복적인 방식ㅇ로 모든 데이터를 변환할 수 있습니다.
transformedTraining = fittedPipeline.transform(trainDataFrame)
이제 모델 학습에 사용할 파이프라인이 마련되었습니다. 하지만, 데이터 캐싱을 설명하기 위해 파이프라인 구성 과정에서 데이터 캐싱과정을 제외시켰습니다. 캐싱은 4부에서 자세히 알아보겠습니다. 동일한 트랜스포메이션을 계속 반복할 수 없으므로 그 대신 모델에 일부 하이퍼파라미터 튜닝 값을 적용합니다. 캐싱을 사용하면 중간 변환된 데이터셋의 복사본을 메모리에 저장하므로 전체 파이프라인을 재실행하는 것보다 훨씬 빠르게 반복적으로 데이터셋에 접근할 수 있습니다.
transformedTraining.cache()
DataFrame[InvoiceNo: string, StockCode: string, Description: string, Quantity: int, InvoiceDate: string, UnitPrice: double, CustomerID: double, Country: string, day_of_week: string, day_of_week_index: double, day_of_week_encoded: vector, features: vector]
학습 데이터셋이 완성되었으므로 모델을 학습할 차례입니다. 머신러닝 모델을 사용하려면 관련 클래스를 임포트하고 인스턴스를 생성해야합니다.
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1)
스파크에서 머신러닝 모델을 학습시키는 과정은 크게 두 단계로 진행됩니다. 첫번째 단계는 아직 학습하지 않은 모델을 초기화하고, 두 번째단계는 해당 모델을 학습시킵니다. MLlib의 DataFrame API에서 제공하는 모든 알고리즘은 항상 두 가지 유형으로 구성되며 다음과 같은 명명규칙을 따른다.
- 학습 전 알고리즘 명칭 : Algorithm (예. KMeans)
- 학습 후 알고리즘 명칭 : AlgorithmModel (예. KMeansModel)
MLlib의 DataFrame API 에서 제공하는 추정자(estimator)는 앞서 사용한 전처리 변환자(StringIndexer)와 거의 동일한 인터페이스를 가지고 있습니다. 이 인터페이스르르 사용해 전체 파이프라인의 학습 과정을 단순화할 수 있습니다. 이 장의 예제에서는 단계별로 설명하기 윟해 파이프라인에서 kmeans 모델 설정 과정을 생략하겠습니다.
kmModel = kmeans.fit(transformedTraining)
kmModel
KMeansModel: uid=KMeans_461470af6da4, k=20, distanceMeasure=euclidean, numFeatures=7
모델 학습이 완료되면, 몇 가지 성과 평가지표에 따라 학습 데이터셋에 대한 비용을 계산할 수 있습니다. 예제에서 사용한 데이터셋의 군집 비용은 상당히 높은 편입니다. 입력 데이터에 대한 전처리와 표준화작업이 적절히 이뤄지지 않았기 때문입니다.
kmModel.computeCost(transformedTraining)
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)
--------------------------------------------------------------------------- AttributeError Traceback (most recent call last) <ipython-input-33-4f802593a0e1> in <module> ----> 1 kmModel.computeCost(transformedTraining) 2 transformedTest = fittedPipeline.transform(testDataFrame) 3 kmModel.computeCost(transformedTest) AttributeError: 'KMeansModel' object has no attribute 'computeCost'
# Make predictions
predictions = model.transform(dataset)
from pyspark.ml.evaluation import ClusteringEvaluator
# Evaluate clustering by computing Silhouette score
evaluator = ClusteringEvaluator()
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Make predictions
from pyspark.ml.evaluation import ClusteringEvaluator
predictionsTrain = kmModel.transform(transformedTraining)
evaluator = ClusteringEvaluator()
transformedTrain = evaluator.evaluate(predictionsTrain)
print("transformedTrain accuracy = " + str(transformedTrain))
transformedTrain accuracy = 0.938430826362008
3.5. 저수준 API¶
Spark는 거의 모든 기능은 RDD기반으로 만들어졌습니다. DataFrame 연산도 RDD 기반으로 만들어져 편리하고 매우 효율적인 분산처리를 위해 저수준 명령으로 컴파일됩니다.
원시데이터를 읽거나 다루는 용도로 RDD를 사용할 수 있지만, 구조적 API를 사용하는 것이 좋습니다. 하지만 RDD를 이용해 파티션과 같은 물리적 실행 특성을 결정할 수 있으므로 DataFrame보다 더 세밀한 제어를 할 수 있습니다.
또한 드라이버 시스템의 메모리에 저장된 원시 데이터를 병렬처리하는데 RDD를 사용할 수 있습니다.
# 숫자를 이용해 병렬화해 RDD를 생성하는 예제. 그런 다음 다른 DataFrame과 함께 사용할 수 있도록
#DataFrame으로 변환한다.
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
DataFrame[_1: bigint]
3.6. sparkR¶
관심없어서 패스
3.7. 스파크의 에코시스템과 패키지¶
스파크가 자랑하는 최고 장점은 커뮤니티가 만들어낸 패키지 에코시스템과 다양한 기능이다. 이 기능중 일부는 스파크 코어 프로젝트에 포함되어 널리 사용된다. 2017년 12월 기준 300개 이상 패키지가 존재하며 계속 늘어나고 있다. 스파크 패키지 목록은 누구나 자신이 개발한 패키지를 공개할 수 있는 저장소인 https://spark-packages.org/ 에서 확인할 수 있다.
3.8. 정리¶
스파크를 비즈니스와 기술적인 문제 해결에 적용할 수 있는 다양한 방법에 대해 알아보았다.
참고 문헌¶
스파크 완벽 가이드
'독후감' 카테고리의 다른 글
스파크 완벽 가이드 5장 (0) | 2021.06.27 |
---|---|
스파크 완벽가이드 4강 (0) | 2021.06.27 |
스파크 완벽 가이드 1-2장 (0) | 2021.06.27 |
쿠버플로우 책 추천 (0) | 2021.06.15 |
조직을 성공으로 이끄는 프로덕트 오너 (0) | 2021.04.23 |