1장. 아파치 스파크란¶
- 아파치 스파크는 통합 컴퓨팅 엔진이며 클러스터 환경에서 데이터를 병렬로 처리하는 라이브러리 집합입니다.
- 스파크는 널리 쓰이는 파이썬, 자바, 스칼라, R 을 지원하고 SQL 뿐만 아니라 스트리밍, 머신러닝에 이르기 까지 넓은 범위의 라이브러리를 제공합니다.
- 스파크는 저장소 시스템의 데이터를 연산하는 역할
- 스파크는 데이터 저장 위치에 관계 없이 처리에 집중하게 만들어졌습니다.
- (자바 8이어야 돌아감. 파이썬 3.9이하여야한다는 썰이 많음)
2장. 스파크 간단히 살펴보기¶
2.1. 스파크 기본 아키텍처¶
- 컴퓨터 클러스터는 여러 컴퓨터의 자원을 모아 하나의 컴퓨터처럼 사용할 수 있게 만듭니다.
- 컴퓨터 클러스터에서 작업을 조율하는프레임워크가 바로 "스파크"입니다.
- 스파크는 클러스터의 데이터 처리 작업을 관리하고 조율합니다.
- 스파크가 연산에 사용할 클러스터는 스파크 스탠드얼론 클러스터 매니저, 하둡 YARN, 메소스 Mesos 같은 클러스터 매니저에서 관리합니다.
- 사용자는 클러스터 매니저에 스파크 애플리케이션을 제출 (submit)합니다.
- 이를 제출 받은 클러스터 매니저는 애플리케이션 실행에 필요한 자원을 할당하며 우리는 할당받은 자원으로 작업을 처리합니다.
2.1.1. 스파크 애플리케이션¶
- 스파크 애플리케이션은 드라이버 프로세스와 다수의 익스큐터 프로세스로 구성됩니다.
- 드라이버 프로세스는 클러스터 노드 중 하나에서 실행되며
main()
함수를 실행합니다.- 이는 스파크 애플리케이션 정보의 유지 관리, 사용자 프로그램이나 입력에 대한 응답, 전반적인 익스큐터 프로세스의 작업과 관련된 분석, 배포, 스케줄링 역할을 수행하기 때문에 필수적입니다.
- 드라이버 프로세스는 스파크 애플리케이션의 심장과 같은 존재로 애플리케이션의 수명 주기 동안 관련 정보를 모두 유지합니다.
- 드라이버 프로세스는 클러스터 노드 중 하나에서 실행되며
- 익스큐터는 드라이버 프로세스가 할당한 작업을 수행합니다. 즉, 드라이버가 할당한 코드를 실행하고, 진행 상황을 다시 드라이버 노드에 제공하는 두 가지 역할을 합니다.
-
- 클러스터 매니저가 물리적 머신을 관리하고, 스파크 애플리케이션에 자원을 할당하는 방법을 나타낸다. 클러스터 매니저는 스파크 스탠드얼론 클러스터 매니저, 하둡 YARN, a메소스 중 하나를 선택할 수 있고, 하나의 클러스터에서 여러 개의 스파크 애플리케이션을 실행할 수있다.
- 스파크는 사용 가능한 자원을 파악하기 위해 클러스터 매니저를 사용한다.
- 드라이버 프로세스는 주어진 작업을 완료하기 위해 드라이버 프로그램의 명령을 익스큐터에서 실행할 책임이 있다. ( 스파크 코드 실행 역할 )
드라이버는 스파크 언어 api 로 다양한 언어로 수행할 수 ㅣㅇㅆ다.
2.2 스파크의 다양한 언어 API¶
- 스칼라
- 스파크는 스칼라로 개발되어있으므로 스칼라가 스파크의 기본이다
- 자바
- 스파크가 스칼라로 개발되었지만, 스파크 창시자들은 자바를 이용해 스파크 코드를 작성할 수 있게 심혈을 기울였다.
- 파이썬
- 파이썬은 스칼라가 지원하는 거의 모든 구조를 지원합니다
- SQL
- R
각 언어 api는 앞서 설면한 핵심 개념을 유지하고 있다. 사용자는 스파크 코드를 실행하기 위해 SparkSession 객체를 진입점으로 이용할 수 있다. 파이썬이나 R로 스파크를 이용할 때는 JVM 코드를 명시적으로 작성하지 않습니다. 스파크는 사용자를 대신해 파이썬이나 R로 작성한 코드를 익스큐터의 JVM에서 실행할 수 있는 코드로 변환합니다
2.3 스파크 API¶
다양한 언어로 스파크를 이용할 수 있는 이유는 스파크가 기본적으로 두 가지 api 를 제공하기 때문이다.
- 저수준의 비구조적 API
- 고수준의 구조적 API
2.4. 스파크 시작하기¶
- 실제 스파크 어플리케이션을 개발하려면 사용자 명령과 데이터를 스파크 애플리케이션에 전송하는 방법을 알아야한다.
- SparkSession을 생성하면서 자세히 알아보자.
- 대화형 세션 시작하기
./bin/spark-shell
: 스칼라 콘솔에 접속할 수 있다../bin/pyspark
: 파이썬 콘솔에 접속할 수 있다.
- 대화형 세션 시작하기
spark-submit
: 스탠드얼론 애플리케이션을 스파크에 제출- 대화형 모드로 스파크를 시작하면 스파크 애플리케이션을 관리하는 SparkSession 이 자동으로 생성됩니다. 하지만, 스탠드얼론 애플리케이션으로 스파크를 시작하면 사용자 애플리케이션코드에서 SparkSession 객체를 직접 생성해야한다.
2.5. SparkSession¶
- 스파크 애플리케이션은 SparkSession이라 불리는 드라이버프로세스로 제어합니다.
- SparkSession 인스턴스는 사용자가 정의한 처리 명령을 클러스터에서 실행합니다.
- 하나의 SparkSession은 하나의 스파크 애플리케이션에 대응합니다.
- 스칼라와 파이썬 콘솔을 시작하면 spark 변수로 sparkSession 을 사용할 수 있습니다.
- 일정 범위의 숫자를 만들기 . ( = 스프레드시트에서 컬럼명을 지정한 것과 같다. )
myRange = spark.range(1000).toDf("number")
이렇게 사면 한 개의 컬럼과, 1000개의 로우로 구성되며, 각 로우에은 0부터 999까지 값이 할당된다.
이 숫자들은 분산컬렉션 을 나타낸ㄴ다.
클러스터모드에서 코드 예제를 실행하면 숫자 범위의 각 부분이 서로 다른 익스큐터에 할당된다. 이것이 스파크의 DataFrame 이다.
2.6. DataFrame¶
Dataframe 은 가장 대표적인 구조적 API이다.
DataFrame 은 테이블의 데이터를 로우와 컬럼으로 단순하게 표현한다. 컬럼과 컬럼의 타입을 정의한 목록을 스키마라고 한다.
- 분산컴퓨터와 단일 컴퓨터 분석의 차이점
- 데이터가 수천 컴퓨터에 분산되어있다. 이유는 단일 컴퓨터에 저장하기에는 데이터가 너무 크거나 계산에 너무 오랜 시간이 걸릴 수 있기 때문이다.
2.6.1. 파티션¶
스파크는 모든 익스큐터가 병령로 작업을 수행할 수 잇도록 파티션이라 불리는 청크 단위로 떼이터를 분할 합니다.
파티션은 클러스터의 물리적 머신이 존재하는 로우의 집합을 의미한다.
DataFrame 의 파티션은 실행중인 데이터가 컴퓨터 클러스터에 물리적으로 분산된 방식을 나타낸다.
예) 파티션 1, 수천개 익스큐터 => 병렬성 1
파티션 수백개 , 익스큐터 1개 -> 병렬성 1
DataFrame을 사용하면 파티션을 수동, 혹은 개별적으로 처리할 필요가 없다. 물리적 파티션에 데이터 변환용 함수를 지정하면, 스파크가 실제 처리 방법을 결정한다. RDD 인터페이스를 이용하는 저수준 API 역시 제공된다.
2.7. 트랜스포메이션¶
- 스파크의 핵심 데이터구조는 불변성(한 번 생성하면 변경할 수 없다.)을 가진다.
- 트랜스포메이션: dataframe 을 변경하려면 원하는 변경 방법을 스파크에 알려주는 것
- 스파크에서 비즈니스 로직을 표현하는 핵심 표현
# Dataframe에서 짝수찾기
divisBy2 = myRange.where("number % 2 = 0")
놀랍게도 이 실행결과는 출력되지 않는다. 추상적인 트랜스포메이션만 지정하 상태이기 떄문에 액션(action)을 호출하지 않으면, 스파크는 실제 트랜스포메이션을 수행하지 않는다.
트랜스포메이션 두 가지 유형¶
- 좁은 의존성
- 트랜스포메이션은 각 입력 파티션이 하나의 출력 파티션에만 영향을 미친다.
- 넓은 의존성
- 하나의 입력 파티션이 여러 출력 파티션에 영향을 미친다.
셔플 : 스파크가 클러스터에서 파티션을 교환한다.
좁은 트랜스포메이션을 사용하면 스파크에서 파이프라이닝을 자동으로 수행한다.
즉, DataFrame 에 여러 필터를 지정하는 경우, 모든 작업이 메모리에서 일어납니다. 하지만, 셔플은 다른 방식으로 동작합니다. 스파크는 셔플의 결과를 디스크에 저장합니다.
사실 이건 셔플 최적화에 대한 토론을 많이 볼 수 있다. 지금은 두 종류의 트랜스포메이션이 있단 사실만 알고 지나가자.
2.7.1. 지연 연산¶
지연연산은 스파크가 연산 그래프를 처리하기 까지 기다리는 동작 방식을 의미한다.
스파크는 명령 즉시 실행 안하고, 원시 데이터에 적용할 트랜스포메이션의 실행계획을 생성한다.
스파크는 코드를 실행하는 마지막 순간까지 대기하다가 원형 DataFrame 트래스포메이션을 간결한 물리적 실행 계획으로 컴파일한다.
스파크는 이 과정을 거치면, 전체 데이터 흐름을 최적화하는 엄청난 강점을 가지고 있다.
DataFrame 의 조건절 푸시다운이 예가 될 수 있다. 아주 복잡한 스파크잡이 원시 데이터에서 하나의 로우만 가져오는 필터를 가지고 있다면 필요한 레코드 하나만 읽는 것이 효율덕이다.
스파크는 이 필터를 데이터소스로 위임하는 최적화 작업을 자동으로 수행한다.
그러면 개발자가 막짜도 되는건가???? 흠흠
2.8. 액션¶
사용자는 트랜스포메이션을 사용해 논리적 실행 계획을 세울 수 있다 하지만 실제 연산을 수행하려면 액션 명령을 내려야한다. 액션은 일련의 트랜스포메이션으로부터 결과를 계산하라고 지시하는 명령이다. 가장 단순한 액션이 count ㅇ메서드는 DataFrame의 전체 레코드수를 반환한다.
divisBy2.count()
이거 외에도 세 가지 유형의 액션이 있습니다.
- 콘솔에서 데이터를 보는 액션
- 각 언어로 된 네이티브 객체에 데이터를 모으는 액션
- 출력 데이터소스에 저장하는 액션
액션을 지정하면 스파크잡이 시작된다. 스파크잡은 필터(좁은 트랜스포메이션)을 수행한 후, 파티션별로 레코드 수를 카운트(넓은 트랜스포메이션) 한다.
그리고 각 언어에 적합한 네이티브 객체를 결과를 모은다. 이 댸, 스파크가 제공하는 스파크 ui로 클러스터에서 실행중인 스파크 잡을 모니터링할 수 있다.
2.9. 스파크 UI¶
- 스파크 ui 는 스파크 잡의 진행상황을 모니터링할 때 사용합니다. 스파크 ui 는 드라이버 노드의 4040 포트로 접속할 수 있습니다. 로컬모드에서 스파크를 실행했다면, 스파크의 ui 주소는
http://localhost:4040
입니다. - 스파크 ui에서 스파크잡의 상태, 환경 설정, 클러스터의 상태 등의 정보를 확인할 수 있습니다.
- 스파크 ui는 스파크 잡을 튜닝하고 디버깅할 때 유용합니다.
2.10. 종합예제¶
스파크는 다양한 데이터소스를 지원합니다. 데이터는 SparkSession의 DataFrameReader 클래스를 사용해 읽습니다. 이 떄, 특정 파일 포맷과 몇 가지 옵션을 함께 설정합니다. 예제에선 스파크 DataFrame 의 스키마 정보를 알아내는 스키마 추론 기능을 사용합니다.
그리고 파일의 첫 로우를 헤더로 지정하는 옵션도 함께 설정합니다.
스파크는 스키마 정보를 얻기 위해 데이터를 조금 읽습니다. 그리고 해당 로우의 데이터 타입을 스파크 데이터타입에 맞게 분석합니다. 하지만, 운영 환경에서는 데이터를 읽는 시점에 스키마를 지정하는 옵션을 사용합니다.
flightData2015 = spark.read.option('inferSchema', "true").option("header", "true").csv("/data/flight-data/csv/2015-summary.csv")
스칼라와 파이썬에서 사용하는 DataFrame 은 불특정 다수의 로우와 컬럼을 가진다. 로우 수를 알 수 없는 이유는 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메이션이기 때문이다. 스파크는 각 컬럼의 데이터타입을 추론하기 위해 적은 양의 데이터를 읽습니다.
import sys, os, gzip, json
import datetime, argparse
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("./data/flight-data/csv/2015-summary.csv")
스칼라와 파이썬에서 사용하는 DataFrame은 불특정 다수의 로우와 컬럼을 가집니다. 로우의 수를 알 수 없는 이유는 데이터를 읽는 과정이 지연 연산 형태의 트랜스포메션이기 때문이다.스파크는 각 컬럼의 데이터 타입을 추론하기 위해 적은 양의 데이터를 읽습니다.
DataFrame 의 take 액션을 호출하면, 이전의 head 명령과 같은 결과를 확인 가능하다.
flightData2015.take(3)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1),
Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344)]
이제 트랜스포메이션을 추가로 지정합니다. 정수 데이터 타입인 count 컬럼을 기준으로 데이터를 정렬합니다.
sort 메소드는 DataFrame 을 변경하지 않습니다. 트랜스포메이션으로 sort 메소드를 사용하면, 이전의 DataFrame을 변환해 새로운 DataFrame 을 생성해 반환합니다. 결과 DataFrame에 take 메소드를 호출하면 어떤 일이 일어나는 지 확인할 수 있습니다.
sort 메소드는 단지 트랜스포메이션이기 떄문에 호출 시 데이터에 아무런 변화도 일어나지 않는다. 하지만, 스파크는 실행 계획을 만들고 검토해 클러스터에 처리할 방법을 알아냅니다. 특정 DataFrame 객체에 explain
메서드를 호출하면 DataFrame의 계보나 스파크의 쿼리 실행 계획을 확인할 수 있습니다.
flightData2015.sort("count").explain()
== Physical Plan ==
*(1) Sort [count#18 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#18 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#32]
+- FileScan csv [DEST_COUNTRY_NAME#16,ORIGIN_COUNTRY_NAME#17,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Documents/working_directory/local/Spark-The-Definitive-Guide/d..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,ORIGIN_COUNTRY_NAME:string,count:int>
이제 트랜스포메니션 실행계획을 시작하기 위해 액션을 호출합니다. 액션을 실행하려면 몇 가지 설정이 필요합니다. 스파크는 셔플 수행시, 기본적으로 200개의 셔플 파티션이 생성됩니다. 이 값을 5로 설정해 셔플의 출력 파티션 수를 줄입니다.
spark.conf.set("spark.sql.shuffle.partitions", "5") # 셔플 출력 파티션 수 줄이기
flightData2015.sort("count").take(2)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Singapore', count=1),
Row(DEST_COUNTRY_NAME='Moldova', ORIGIN_COUNTRY_NAME='United States', count=1)]
트랜스포메이션의 논리적 실행 계획은 DataFrame 의 계보를 정의합니다. 스파크는 계보를 통해 입력 데이터에 수행한 연산을 전체 파티션에서 어떻게 재연산하는지 알 수 있습니다. 이 기능은 스파크의 프로그래밍 모델인 함수형 프로그래밍의 핵심입니다. 함수형 프로그래밍은 데이터의 변환 규칙이 일정한 경우, 같은 입력에 대해 항상 같은 출력을 생성합니다.
사용자는 물리적 데이터를 직접 다루지 않습니다. 대신 앞서 설정한 셔플 파티션 파라미터와 같은 속성으로 물리적 실행 특성을 제어합니다. 앞서 셔플 파티션 수를를 5로 해서 5개의 출력 파티션이 생성됩니다. 이 값을 변경하면, 스파크 잡의 실제 실행 특성을 제어할 수 있다.
2.10.1 DataFrame과 SQL¶
- 사용자가 SQL이나 DataFrame(R, SQL, 파이썬)으로 비즈니스 로직을 표현하면 스파크에서 실제 코드를 실행하기 전에 그 로직을 기본실행계획(explain 메서드를 호출해 실행 셰획을 확인할 수 있습니다. ) 으로 컴파일합니다.
- 스파크 SQL을 활용하면 모든 DataFrame 을 테이블이나 뷰로 등록한 후, SQL 쿼리로 이용할 수 있습니다. 스파크는 SQL 쿼리를 DataFrame 코드와 같은 실행계획으로 컴파일 하므로 둘 사이에 성능 차이는 없습니다.
flightData2015.createOrReplaceTempView("flight_data_2015")
이제 SQL로 데이터를 조회할 수 있습니다. 새로운 DataFrame 을 반환하는 spark.sql 메서드로 SQL 쿼리를 실행합니다. spark는 SparkSession 의 변수 입니다. DataFrame 에 쿼리를 수행하면 새로운 DataFrame를 반환합니다. 로직을 작성할 때, 반복적인 느낌이 들지만 매우 효율적이다. 또한, 사용자는 어던 상황에서든 가장 편리한 방식으로 트랜스포메이션을 지정할 수 있습니다.
sqlWay = spark.sql("""SELECT DEST_COUNTRY_NAME , count(1) FROM flight_data_2015 GROUP BY DEST_COUNTRY_NAME""")
sqlWay
DataFrame[DEST_COUNTRY_NAME: string, count(1): bigint]
dataFrameWay =flightData2015.groupBy("DEST_COUNTRY_NAME").count()
dataFrameWay
DataFrame[DEST_COUNTRY_NAME: string, count: bigint]
dataFrameWay.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 5), ENSURE_REQUIREMENTS, [id=#64]
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_count(1)])
+- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Documents/working_directory/local/Spark-The-Definitive-Guide/d..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
sqlWay.explain()
== Physical Plan ==
*(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 5), ENSURE_REQUIREMENTS, [id=#83]
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_count(1)])
+- FileScan csv [DEST_COUNTRY_NAME#16] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Documents/working_directory/local/Spark-The-Definitive-Guide/d..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string>
max 함수는 필터링을 수행해 단일 로우 결과로 반환하는 트랜스포메이션입니다.
spark.sql("SELECT max(COUNT) from flight_data_2015").take(1)
[Row(max(COUNT)=370002)]
# 파이썬
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)
[Row(max(count)=370002)]
# 상위 5개의 도착 국가를 찾아내자!
maxSql = spark.sql("""SELECT DEST_COUNTRY_NAME ,sum(count) as destination_total FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME ORDER BY sum(count) DESC LIMIT 5
""")
maxSql.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
| United States| 411352|
| Canada| 8399|
| Mexico| 7140|
| United Kingdom| 2025|
| Japan| 1548|
+-----------------+-----------------+
# 상위 5개의 도착 국가를 찾아내자!
from pyspark.sql.functions import desc
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count")\
.withColumnRenamed("sum(count)","destination_total" ).sort(desc("destination_total")).limit(5).show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
| United States| 411352|
| Canada| 8399|
| Mexico| 7140|
| United Kingdom| 2025|
| Japan| 1548|
+-----------------+-----------------+
flightData2015.groupBy("DEST_COUNTRY_NAME").sum("count")\
.withColumnRenamed("sum(count)","destination_total" ).sort(desc("destination_total")).limit(5).explain()
== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#117L DESC NULLS LAST], output=[DEST_COUNTRY_NAME#16,destination_total#117L])
+- *(2) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[sum(cast(count#18 as bigint))])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#16, 5), ENSURE_REQUIREMENTS, [id=#229]
+- *(1) HashAggregate(keys=[DEST_COUNTRY_NAME#16], functions=[partial_sum(cast(count#18 as bigint))])
+- FileScan csv [DEST_COUNTRY_NAME#16,count#18] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/user/Documents/working_directory/local/Spark-The-Definitive-Guide/d..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<DEST_COUNTRY_NAME:string,count:int>
실행계획은 트랜스포메이션의 지향성 비순환 그래프(DAG: directed acyclic graph) 이고 액션이 호출되면 결과를 만든다. 그리고 지향성 비순환 그래프의 각 단계는 불변성을 가진 신규 DataFrame 을 생성한다
출력된 실행 계획은 Conceptual Plan 과 정확하게 일치하지 않지만 모든 부분을 포함하고 있다. 실힝 계획을 확인하면 첫 줄에서 limit 구문과 orderBy 구문을 확인할 수 있다.
또한, partial_sum 함수를 호출하는 부분에서 집계가 2 단계로 나누어지는 것을 알 수 있습니다. 단계가 나누어지는 이유는 숫자 목록의 합을 구하는 연산이 가환성(Commutative) 을 가지고 있어 합계 연산시 파티션별 처리가 가능하기 때문이다.
'독후감' 카테고리의 다른 글
스파크 완벽가이드 4강 (0) | 2021.06.27 |
---|---|
스파크 완벽 가이드 3장 (0) | 2021.06.27 |
쿠버플로우 책 추천 (0) | 2021.06.15 |
조직을 성공으로 이끄는 프로덕트 오너 (0) | 2021.04.23 |
UML 분석 설계 실무 :wip (0) | 2021.02.05 |