서론
DataSet을 dataFrame 처럼, 바로 구글링 해서 이용하려고 했더니, 했더니, datafram 이 완벽 호환 되는 개념이 아니라, 써야하는 조인연산, 필터 연산이 너무 달라지는 것 같아 당황해서 롤백한 적이 있습니다. (아마, group 연산을 진행하면서 그렇게 느꼈던 것 같아요.) 한 번 dataframe 과 무슨 차이인지, 배워보려합니다!
고정관념
- DataSet vs dataframe vs rdd 속도차이는, 빌드 타임에 DataSet 이 정의되니까 어찌되었던 빠를 것이다. (⇒ 연산에 따라 달라진다.)
- DataFrame 이랑 동일하게 api 를 이용할 것이다 (⇒ 맞다.)
궁금증
- 느린데, 왜 만든 것 일까? ⇒ 이용하는 시기 부분에 등장하지만, 타입 안정성을 요구하는 경우를 위해 만든 것같다
- DataFrame이랑 별로 겹치지도 않는데, 왜 상속관계인지 ⇒ 일전에 시도했을때, group 연산자를 잘못써서 그렇게 느꼈던 것같다,,
DataSet 개념
- 우선, DataSet은 구조적 api 의 기본 데이터타입입니다. DataFrame 또한 row 타입의 DataSet 입니다. 그리고, 스파크가 지원하는 다양한 언어에서 사용할 수 있습니다.
- DataSet 의 경우는 jvm 을 이용하는 언어인 스칼라와 자바에서만 사용할 수 있습니다. DataSet을 이용해 DataSet의 각 로우를 구성하는 객체를 정의합니다.
- 스칼라에서는 스키마가 정의된 케이스 클래스 객체를 이용해 DataSet을 정의합니다.
- 자바에서는 자바빈 객체를 이용해 DataSet을 정의합니다.
- 도메인별 특정 객체를 효과적으로 지원하기 위해 인코더(encoder) 라 불리는 특수 개념이 필요합니다.
- 인코더는 도메인별 특정 객체 T 를 스파크 내부 데이터타입으로 매핑하는 시스템을 의미합니다.
- 예를 들어, name(string)과 age(int)두 개의 필드를 가진 Person 클래스가 있다고 가정해봅시다. 인코더는 런타임 환경에서 Person 객체를 바이너리구조로 생성하도록 스파크에게 지시합니다. DataFrame 이나 “표준" 구조적 API 를 이용한다면, Row 타입을 직렬화된 바이너리구조로 변환합니다. 도메인에 특화된 객체를 만들어 이용하려면, 스칼라의 케이스 클래스 (case class) 또는, 자바 빈 형태로 사용자 정의 데이터타입을 정의해야합니다.
- DataSet API를 이용한다면, 스파크는 데이터셋에 접근할 때마다 Row 포맷이 아닌 사용자 정의 데이터타입으로 반환합니다.
- 이 변환 작업은 느리긴하지만, 사용자에게 더 많은 유연성을 제공합니다.
- 사용자 정의데이터 타입을 이용하면, 성능은 나빠지게 됩니다. 이게 파이썬 udf 보다 자릿수가 달라질 정도로 훨씬 느립니다. 그 이유는 프로그래밍 언어를 전환하는 것이 사용자 정의 데이터 타입을 사용하는 것 보다 훨씬 느리기 때문입니다.
DataSet 을 이용하는 시기
DataSet을 이용하면, 성능이 떨어지는데, 사용할 이유가 있을까? 라는 질문에 대해 사용하는 경우에 대해 정리해봅시다.
- 사용하는 경우
- DataFrame기능만으로 수행할 연산을 표현할 수 없는 경우
- 복잡한 비즈니스 sql 이나, DataFrame대신, 단일 함수로 인코딩해야하는 경우
- 성능 저하를 감수하더라도 타입 안정성(type safe)을 가진 데이터 타입을 사용하고 싶은 경우
- 또한,DataSet api 는 타입 안정성이 있습니다. 예를 들어두 문자열을 사용해 뺄셈 연산을 하는 것 처럼 데이터 타입이유효하지 않은 작업은 런타임이 아닌 컴파일 타임에 오류가 발생합니다.
- 만일,정확도와 방어적 코드를 가장 중요시한다면, 성능을 희생하더라도, DataSet을 이용하는 것이 좋은 선택일 수 있습니다.
- DataSet api 를 이용하면, 잘못된 데이터로부터 애플리케이션을 보호할수는 없지만, 보다 우아하게데이터를 제어하고 구조화할 수 있습니다.
- 단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 재사용하려면, DataSet을 사용하는 것이 적합합니다.
- 스칼라를 사용해본 경험이 있다면, 스파크 api는 스칼라 sequence 타입의 api 가 일부 반영돼있지만, 분산 방식으로 동작하는 것을 알 수 있을 겁니다.
- 2015년 스칼라 창시자인 마틴 오더스키가 2015년 유럽 스파크 서밋에서 그렇게 언급했습니다.
- 결국 DataSet을 사용하는 장점 중 하나는 로컬과 분산 환경에 재사용할 수 있는 것입니다.
- 케이스 클래스로 구현된 데이터 타입을 이용해 모든 데이터와 트랜스포메이션을 정의하면, 재사용할 수 있습니다.
- 또한, 올바른 클래스와 데이터 타입이 지정된 dataframe 을 로컬 디스크에 저장하면,다음 처리 과정에 이용할수 있어, 더 쉽게 데이터를 다룰 수 있습니다.
- 스칼라를 사용해본 경험이 있다면, 스파크 api는 스칼라 sequence 타입의 api 가 일부 반영돼있지만, 분산 방식으로 동작하는 것을 알 수 있을 겁니다.
- 더 적합한 워크로드를 만들기 위해 DataFrame과 DataSet을 동시에이용해야할 수 있습니다. 하지만, 성능과 타입 안정성을 위해 반드시 희생할 수 밖에 없습니다. 이러한 방식은 대량의 DataFrame 기반의 etl 트랜스포메이션의 마지막 단계에서 이용할 수 있습니다.
- 예를 들어 드라이버로 데이터를 수집해, 단일 노드의 라이브러리로 수집된 데이터를 처리해야하는 경우 입니다.
- 반대로 트랜스포메이션의 첫번째 단계에서 이용할 수 있습니다. 예를들어 스파크 sql 에서 필터링 전에 로우 단위로 데이터를 파싱하는 경우 입니다.
- DataFrame기능만으로 수행할 연산을 표현할 수 없는 경우
DataSet 생성
- DataSet 을 생성하는 것은 수동작업이므로, 정의할 스키마를 미리 알고 있어야합니다.
자바 : Encoders
자바 인코더는 매우 다양합니다 . 데이터타입 클래스를 정의한 다음 DataFrame 에 지정해 인코딩할 수 있습니다.
import org.apache.spark.sql.Encoders
public class Flight implements Serializable {
String DEST_COUNTRY_NAME;
String ORIGIN_COUNTRY_NAME;
Long DEST_COUNTRY_NAME;
}
Dataset<Flight> flights = spark.read.parquet("/data/flight-data/parquet/20210.f\\")
.as(Encoders.bean(Flight.class));
Java Encoder
https://docs.oracle.com/javaee/7/tutorial/websocket007.htm
WebSocket용 Java API는 인코더 및 디코더를 사용하여 WebSocket 메시지와 사용자 정의 Java 유형 간의 변환을 지원합니다. 인코더는 Java 개체를 가져와 WebSocket 메시지로 전송할 수 있는 표현을 생성합니다. 예를 들어 인코더는 일반적으로 JSON, XML 또는 이진 표현을 생성합니다. 디코더는 역기능을 수행합니다. WebSocket 메시지를 읽고 Java 객체를 생성합니다.
Java Bean
- 자바빈(JavaBean)은 인수 없는 생성자, 게터/세터 쌍, 기타 메서드로 구성된 클래스를 의미합니다.
- 게터와 세터는 반드시 정해진 패턴을 따라야 합니다.
- 자바빈즈의 사양은 썬 마이크로시스템즈 에서 다음과 같이 정의되었습니다. "빌더 형식의 개발도구에서 가시적으로 조작이 가능하고 또한 재사용이 가능한 소프트웨어 컴포넌트입니다."
public void setProperty(Type newValue)
public Type getProperty()
스칼라 : 케이스 클래스
스칼라에서 DataSet을 생성하려면, 스칼라 Case class 구문을 이용해 데이터 타입을 정의해야합니다. 케이스 클래스는 다음과 같은 특징을 가진 정규 클래스(regular class) 입니다.
스칼라 case class
https://docs.scala-lang.org/ko/tour/case-classes.html
- 불변성
- 패턴 매칭으로 분해 가능
- 참조값 대신 클래스 구조를 기반으로 비교
- 사용하기 쉽고 다루기 편함
이러한 특징으로 케이스 클래스를 판별할 수 있으므로 데이터 분석시 매우 유용합니다. 케이스 클래스는 불변성을 가지며, 값 대신구조로 비교할 수 있습니다.
스칼라 문서는 케이스 클래스를 다음과 같이 설명합니다.
- 불변성(val)이므로 객체들이 언제 어디서 변경되었는지 추적할 필요 없습니다.
- 값으로 비교하면, 인스턴스를 마치 원시(primitive) 데이터 타입의 값처럼 비교합니다. 그러므로 클래스 인스턴스가 값으로 비교되는지, 참조로 비교되는지 더는 불확실해하지 않아도됩니다.
- 패턴 매칭은 로직 분기를 단순화해 버그를 줄이고 가독성을 좋게 만듭니다.
스파크에서 케이스 클래스를 사용할 때도 이러한 장점은 유지됩니다. DataSet을 생성하기 위해 예제 데이터셋 중 하나는 case class 로 정의합니다.
case class Flight(
DEST_COUNTRY_NAME : String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt
)
앞서 데이터셋의 레코드를 표현할 case class 를 정의했습니다. 즉, Flight 데이터 타입의 DataSet을 생성했습니다. Flight 데이터 타입은 스카마만 정의되어있을 뿐 아무런 메서드도 정의되어 있지 않습니다.
데이터를 읽으면, DataFrame이 반환됩니다. 그리고 as 메서드를 사용해 Flight 데이터 타입으로 변환합니다.
val flightsDF = spark.read.parquet("/data/flight-data/parquet/20/")
val flights = fligthsDF.as[Flight]
액션
지금까지 DataSet 의 강력한 힘을 보았습니다. 하지만, DataSet과 DataFrame에 collect, take, count 와 같은 액션을 적용할 수 있다는 사실이 더 중요하니, 반드시 기억하기 바랍니다.
fligths.show(2)
또한, 케이스 클래스에 실제로 접근할 때, 어떠한 데이터 타입도 필요하지않다는 사실을 알고있어야합니다. case class 의 속성명을 지정하면, 속성에 맞는 값과 데이터 타입 모두를 반환합니다.
flights.first.DEST_COUNTRY_NAME //United States
트랜스포메이션
DataSet의 트랜스포메이션은 DataFrame과 동일합니다. DataFrame의 모든 트랜스포메이션은 DataSet을 이용할 수 있습니다.
DataSet을 이용하면, 원형 JVM 데이터를 다루기 때문에 dataframe 을 이용해 트랜스포메이션을 수행하는 것 보다 좀더 복잡하고 강력한 데이터 타입으로 트랜스포메이션을 수행하는 것보다 좀 더 복잡하고 강력한 데이터 타입으로 트랜스포메이션을 사용할 수 있습니다.
필터링
Flight 클래스를 파라미터로 사용해 불리언값을 반환하는 함수를 만들어보겠습니다.
불리언 값은 출발지와 도착지가 동일한지 나타냅니다. 이 함수는 사용자 정의 함수가 아닌, 일반 함수입니다.
이전까지 했던 방식과 매우 다르므로 매우 눈 크~게 떠주세요!!!!!!
def originIsDestination(flight_row: Flight): Boolean = {
return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}
위에서 정의한 함수를 filter 메서드를 적용해 각행이 true를 반환하는지 평가하고 데이터셋을 필터링할 수 있습니다.
flights.filter(flight_row => originIsDestination(flight_row)).first()
이 함수는 스파크 코드에서 호출하지 않아도된다. 스파크 코드에서 사용하기 전에 사용자 정의 함수처럼, 로컬머신의 데이터를 대상으로 테스트 할 수 있습니다.
예를 들어 다음 예제의 데이터셋은 드라이버에 모을 수 있을 만큼 아주 작기 때문에 동일한 필터 연산을 수행할 수 있습니다.
flights.collect().filter(flight_row => originIsDestination(flight_row)).first()
함수를 사용했을 때와 동일한 결과를 확인할 수 있습니다.
매핑
필터링은 단순한 트랜스포메이션입니다. 때로는, 특정 값을 다른 값으로 매핑해야합니다.
DataSet을 다루는 간단한 예제는 로우의 특정 컬럼값을 추출하는 것입니다. DataFrame에 매핑작업을 수행하는 것은 DataSet의 select 메서드를 사용하는 것과 같습니다.
val destinations = flights.map(f=> f.DEST_COUNTRY_NAME)
최종적으로 String 데이터 타입의 DataSet을 반환한다. 스파크는 결과로 반환할 jvm 데이터 타입을 알고 있기 때문에, 컴파일 타임에 데이터 타입의 유효성을 검사할 수 있습니다.
드라이버는 결과값을 모아 문자열 타입의 배열로 반환합니다.
val localDestinations = destinations.take(4)
조인
- 조인은 Dataframe 에서와 마찬가지로 DataSet에도 동일하게 적용됩니다. 하지만,DataSet은 joinWith 처럼 정교한 메소드를 제공합니다. joinWith 메소드는 co-group (RDD 용어) 와 매우 유사하며, DataSet안쪽에 다른 두 개의 중첩된 DataSet으로 구성됩니다. 각 컬럼은 단일 DataSet이므로, DataSet객체를 컬럼처럼 다룰 수 있습니다. 그러므로, 조인 수행시 , 더 많은 정보를 유지할 수 있으며, 고급맵이나 필터처럼 정교하게 데이터를 다룰 수 있습니다.
case class FlightMetadata(count: BigInt, randomData: BigInt)
val flightsMeta = spark.range(1000).map(x => scala.utils.Random.nextLong))
.withColumnRenamed("_1","count").withColumnRenamed("_2","randomData")
.as[FlightMetadata]
val flight2= flights
.joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))
최종적으로 로우는 Flight와 FlightMetadata 로 이뤄진 일종의 키-값 형태의 DataSet을 반환합니다. DataSet 이나, 복합 데이터 타입의 DataFrame으로 데이터를 조회할 수 있습니다.
flights.selectExpr("_1.DEST_COUNTRY_NAME")
이전 예제처럼 드라이버로 데이터를 모은 다음 결과를 반환합니다.
일반 조인 역시 아주 잘 동작합니다. 하지만, DataFrame을 반환하므로, JVM 데이터 타입 정보를 모두 잃게 됩니다.
val flights2 = flights.join(flightMeta, Seq("count"))
이 정보를 다시 얻으려면, 다른 DataSet 을 정의해야합니다. DataFrame과 DataSet을 조인하는 것은 아무런 문제가 되지 않고, 최종적으로 동일한 결과를 반환합니다.
val flights2 = flights.join(flightsMeta.toDF(), Seq("count"))
그룹화와 집계
- 그룹화와 집계는 이전 장에서 보았던 것과 동일한 기본 표준을 따르므로, groupBy , rollup 그리고 cube 메서드를 여전히 이용할 수 있습니다. 하지만, DataSet 대신, DataFrame 을 반환하기 때문에, 데이터 타입 정보를 잃게 됩니다.
flights.groupBy("DEST_COUNTRY_NAME").count()
- groupByKey
- 데이터 타입 정보를 잃은 것은 그다지, 큰 문제는 아니지만, 이를 유지할 수 있는, 그룹화와 집계 방법이 있습니다. 한가지 예로 groupByKey 메서드는 DataSet 의 특정 키를 기준으로 그룹화하고 형식화된 DataSet을 반환합니다.
- 하지만, 이 함수는 컬럼명대신, 함수를 파라미터로 사용해야합니다. 따라서, 다음 예제와 같이 훨씬 정교화한 그룹화 함수를 이용할 수 있습니다.
flights.groupByKey(x=> x.DEST_COUNTRY_NAME).count()
- groupByKey 메서드의 파라미터로 함수를 사용함으로써 유연성을 얻을 수 있습니다. 하지만, 스파크는 함수와 JVM데이터 타입을 최적화할 수 없으므로, 트레이드 오프가 발생합니다. 이로인해, 성능차이가 발생하고, 실행계획으로 그 이유를 확인할 수 있습니다.
다음 예제는 groupByKey 메서드를 이용해, DataFrame에 새로운 컬럼을 추가한 다음 그룹화를 수행합니다.
flights.groupByKey(x=>x.DEST_COUNTRY_NAME).count.explain
DataSet의 키를 이용해 그룹화를 수행한 다음, 결과를 키-값의 형태로 함수에 전달해 원시 객체 형태로 그룹화된 데이터를 다룰 수 있습니다.
def grpSum(countryName: String, values: Integer[Flight]) = {
values.dropWhile(_.count < 5).map(x=> (countryName, x))
}
flights.groupByKey(x=> x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(3
def grpSum2(f:Flight):Integer={
1
}
flights.groupByKey(x=> x.DEST_COUNTRY_NAME).mapValues(grpSum2).count().take(5)
- 다음 예제처럼 새로운 처리 방법을 생성해 그룹을 축소하는 방법을 정의할 수 있습니다.
def sum2(left: Fligt, right: Flight) = {
Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}
flights.groupByKey(x=> x.DEST_COUNTRY_NAME).reduceGroups((1,r) => sum2(1,r)).take(5)
flights.groupBy("DEST_COUNTRY_NAME").count().explain
- groupByKey 메서드는 동일한 결과를 반환하지만, 데이터 스캔 직후에, 집계를 수행하는 groupBy 메서드에 비해 , 더 비싼 처리인 것을 실행계획을 통해 알 수 있습니다.
- 그러므로, 사용자가 정의한 인코딩으로 세밀한 처리가 필요하거나, 필요한 경우에만,DataSet의 groupByKey 메서드를 이용해야합니다.
- DataSet은 빅데이터 처리 파이프라인의 처음과 끝 작업에 주로 이용하게 됩니다.
정리
- 우리는 DataSet의 기초와 DataSet의 사용이 적합한 경우를 예제와 함께 알아보았습니다. 다시 말해, DataSet을 이용하기 위해 기본적으로 알아야할 내용을 알아보았습니다. 속도 비교글까지 정리하려고 했으나, 현재까지는 데이터가 작고, 로컬모드로 돌려서 인지, dataset 과 dataframe 의 시간 차이가 크게 안나네요…,인생…… 해당 부분이 정리가 되면, 추후의 글에서 뵙겠습니다!
참고 문헌
'스파크' 카테고리의 다른 글
스파크 튜닝 관련 20230409 (0) | 2023.04.09 |
---|---|
Spark Join전략과 hint (5) | 2022.09.25 |
Spark: Cluster Computing with Working Sets 정리(해석) (0) | 2022.05.22 |