티스토리 뷰

스파크

Spark Dataset 개념과 이해

killog 2022. 7. 24. 23:51
반응형

spark

서론

DataSet을 dataFrame 처럼, 바로 구글링 해서 이용하려고 했더니, 했더니, datafram 이 완벽 호환 되는 개념이 아니라, 써야하는 조인연산, 필터 연산이 너무 달라지는 것 같아 당황해서 롤백한 적이 있습니다. (아마, group 연산을 진행하면서 그렇게 느꼈던 것 같아요.) 한 번 dataframe 과 무슨 차이인지, 배워보려합니다!

고정관념

  1. DataSet vs dataframe vs rdd 속도차이는, 빌드 타임에 DataSet 이 정의되니까 어찌되었던 빠를 것이다. (⇒ 연산에 따라 달라진다.)
  2. DataFrame 이랑 동일하게 api 를 이용할 것이다 (⇒ 맞다.)

궁금증

  1. 느린데, 왜 만든 것 일까? ⇒ 이용하는 시기 부분에 등장하지만, 타입 안정성을 요구하는 경우를 위해 만든 것같다
  2. DataFrame이랑 별로 겹치지도 않는데, 왜 상속관계인지 ⇒ 일전에 시도했을때, group 연산자를 잘못써서 그렇게 느꼈던 것같다,,

DataSet 개념

  1. 우선, DataSet은 구조적 api 의 기본 데이터타입입니다. DataFrame 또한 row 타입의 DataSet 입니다. 그리고, 스파크가 지원하는 다양한 언어에서 사용할 수 있습니다.
  2. DataSet 의 경우는 jvm 을 이용하는 언어인 스칼라와 자바에서만 사용할 수 있습니다. DataSet을 이용해 DataSet의 각 로우를 구성하는 객체를 정의합니다.
    1. 스칼라에서는 스키마가 정의된 케이스 클래스 객체를 이용해 DataSet을 정의합니다.
    2. 자바에서는 자바빈 객체를 이용해 DataSet을 정의합니다.
  3. 도메인별 특정 객체를 효과적으로 지원하기 위해 인코더(encoder) 라 불리는 특수 개념이 필요합니다.
  4. 인코더는 도메인별 특정 객체 T 를 스파크 내부 데이터타입으로 매핑하는 시스템을 의미합니다.
  5. 예를 들어, name(string)과 age(int)두 개의 필드를 가진 Person 클래스가 있다고 가정해봅시다. 인코더는 런타임 환경에서 Person 객체를 바이너리구조로 생성하도록 스파크에게 지시합니다. DataFrame 이나 “표준" 구조적 API 를 이용한다면, Row 타입을 직렬화된 바이너리구조로 변환합니다. 도메인에 특화된 객체를 만들어 이용하려면, 스칼라의 케이스 클래스 (case class) 또는, 자바 빈 형태로 사용자 정의 데이터타입을 정의해야합니다.
  6. DataSet API를 이용한다면, 스파크는 데이터셋에 접근할 때마다 Row 포맷이 아닌 사용자 정의 데이터타입으로 반환합니다.
    1. 이 변환 작업은 느리긴하지만, 사용자에게 더 많은 유연성을 제공합니다.
    2. 사용자 정의데이터 타입을 이용하면, 성능은 나빠지게 됩니다. 이게 파이썬 udf 보다 자릿수가 달라질 정도로 훨씬 느립니다. 그 이유는 프로그래밍 언어를 전환하는 것이 사용자 정의 데이터 타입을 사용하는 것 보다 훨씬 느리기 때문입니다.

DataSet 을 이용하는 시기

DataSet을 이용하면, 성능이 떨어지는데, 사용할 이유가 있을까? 라는 질문에 대해 사용하는 경우에 대해 정리해봅시다.

  • 사용하는 경우
    • DataFrame기능만으로 수행할 연산을 표현할 수 없는 경우
      • 복잡한 비즈니스 sql 이나, DataFrame대신, 단일 함수로 인코딩해야하는 경우
    • 성능 저하를 감수하더라도 타입 안정성(type safe)을 가진 데이터 타입을 사용하고 싶은 경우
    • 또한,DataSet api 는 타입 안정성이 있습니다. 예를 들어두 문자열을 사용해 뺄셈 연산을 하는 것 처럼 데이터 타입이유효하지 않은 작업은 런타임이 아닌 컴파일 타임에 오류가 발생합니다.
      • 만일,정확도와 방어적 코드를 가장 중요시한다면, 성능을 희생하더라도, DataSet을 이용하는 것이 좋은 선택일 수 있습니다.
      • DataSet api 를 이용하면, 잘못된 데이터로부터 애플리케이션을 보호할수는 없지만, 보다 우아하게데이터를 제어하고 구조화할 수 있습니다.
    • 단일 노드의 워크로드와 스파크 워크로드에서 전체 로우에 대한 다양한 트랜스포메이션을 재사용하려면, DataSet을 사용하는 것이 적합합니다.
      • 스칼라를 사용해본 경험이 있다면, 스파크 api는 스칼라 sequence 타입의 api 가 일부 반영돼있지만, 분산 방식으로 동작하는 것을 알 수 있을 겁니다.
        • 2015년 스칼라 창시자인 마틴 오더스키가 2015년 유럽 스파크 서밋에서 그렇게 언급했습니다.
        • 결국 DataSet을 사용하는 장점 중 하나는 로컬과 분산 환경에 재사용할 수 있는 것입니다.
          • 케이스 클래스로 구현된 데이터 타입을 이용해 모든 데이터와 트랜스포메이션을 정의하면, 재사용할 수 있습니다.
          • 또한, 올바른 클래스와 데이터 타입이 지정된 dataframe 을 로컬 디스크에 저장하면,다음 처리 과정에 이용할수 있어, 더 쉽게 데이터를 다룰 수 있습니다.
    • 더 적합한 워크로드를 만들기 위해 DataFrame과 DataSet을 동시에이용해야할 수 있습니다. 하지만, 성능과 타입 안정성을 위해 반드시 희생할 수 밖에 없습니다. 이러한 방식은 대량의 DataFrame 기반의 etl 트랜스포메이션의 마지막 단계에서 이용할 수 있습니다.
      • 예를 들어 드라이버로 데이터를 수집해, 단일 노드의 라이브러리로 수집된 데이터를 처리해야하는 경우 입니다.
      • 반대로 트랜스포메이션의 첫번째 단계에서 이용할 수 있습니다. 예를들어 스파크 sql 에서 필터링 전에 로우 단위로 데이터를 파싱하는 경우 입니다.

DataSet 생성

  • DataSet 을 생성하는 것은 수동작업이므로, 정의할 스키마를 미리 알고 있어야합니다.

자바 : Encoders

자바 인코더는 매우 다양합니다 . 데이터타입 클래스를 정의한 다음 DataFrame 에 지정해 인코딩할 수 있습니다.

import org.apache.spark.sql.Encoders

public class Flight implements Serializable {
  String DEST_COUNTRY_NAME;
  String ORIGIN_COUNTRY_NAME;
  Long DEST_COUNTRY_NAME;
}

Dataset<Flight> flights = spark.read.parquet("/data/flight-data/parquet/20210.f\\")
.as(Encoders.bean(Flight.class));

Java Encoder

https://docs.oracle.com/javaee/7/tutorial/websocket007.htm

WebSocket용 Java API는 인코더 및 디코더를 사용하여 WebSocket 메시지와 사용자 정의 Java 유형 간의 변환을 지원합니다. 인코더는 Java 개체를 가져와 WebSocket 메시지로 전송할 수 있는 표현을 생성합니다. 예를 들어 인코더는 일반적으로 JSON, XML 또는 이진 표현을 생성합니다. 디코더는 역기능을 수행합니다. WebSocket 메시지를 읽고 Java 객체를 생성합니다.

Java Bean

  • 자바빈(JavaBean)은 인수 없는 생성자, 게터/세터 쌍, 기타 메서드로 구성된 클래스를 의미합니다.
  • 게터와 세터는 반드시 정해진 패턴을 따라야 합니다.
  • 자바빈즈의 사양은 썬 마이크로시스템즈 에서 다음과 같이 정의되었습니다. "빌더 형식의 개발도구에서 가시적으로 조작이 가능하고 또한 재사용이 가능한 소프트웨어 컴포넌트입니다."
public void setProperty(Type newValue)
public Type getProperty()

스칼라 : 케이스 클래스

스칼라에서 DataSet을 생성하려면, 스칼라 Case class 구문을 이용해 데이터 타입을 정의해야합니다. 케이스 클래스는 다음과 같은 특징을 가진 정규 클래스(regular class) 입니다.

스칼라 case class

https://docs.scala-lang.org/ko/tour/case-classes.html

  • 불변성
  • 패턴 매칭으로 분해 가능
  • 참조값 대신 클래스 구조를 기반으로 비교
  • 사용하기 쉽고 다루기 편함

이러한 특징으로 케이스 클래스를 판별할 수 있으므로 데이터 분석시 매우 유용합니다. 케이스 클래스는 불변성을 가지며, 값 대신구조로 비교할 수 있습니다.

스칼라 문서는 케이스 클래스를 다음과 같이 설명합니다.

  1. 불변성(val)이므로 객체들이 언제 어디서 변경되었는지 추적할 필요 없습니다.
  2. 값으로 비교하면, 인스턴스를 마치 원시(primitive) 데이터 타입의 값처럼 비교합니다. 그러므로 클래스 인스턴스가 값으로 비교되는지, 참조로 비교되는지 더는 불확실해하지 않아도됩니다.
  3. 패턴 매칭은 로직 분기를 단순화해 버그를 줄이고 가독성을 좋게 만듭니다.

스파크에서 케이스 클래스를 사용할 때도 이러한 장점은 유지됩니다. DataSet을 생성하기 위해 예제 데이터셋 중 하나는 case class 로 정의합니다.

case class Flight(
     DEST_COUNTRY_NAME : String, 
     ORIGIN_COUNTRY_NAME: String, 
     count: BigInt
)

앞서 데이터셋의 레코드를 표현할 case class 를 정의했습니다. 즉, Flight 데이터 타입의 DataSet을 생성했습니다. Flight 데이터 타입은 스카마만 정의되어있을 뿐 아무런 메서드도 정의되어 있지 않습니다.

데이터를 읽으면, DataFrame이 반환됩니다. 그리고 as 메서드를 사용해 Flight 데이터 타입으로 변환합니다.

val flightsDF = spark.read.parquet("/data/flight-data/parquet/20/")
val flights = fligthsDF.as[Flight]

액션

지금까지 DataSet 의 강력한 힘을 보았습니다. 하지만, DataSet과 DataFrame에 collect, take, count 와 같은 액션을 적용할 수 있다는 사실이 더 중요하니, 반드시 기억하기 바랍니다.

fligths.show(2)

또한, 케이스 클래스에 실제로 접근할 때, 어떠한 데이터 타입도 필요하지않다는 사실을 알고있어야합니다. case class 의 속성명을 지정하면, 속성에 맞는 값과 데이터 타입 모두를 반환합니다.

flights.first.DEST_COUNTRY_NAME //United States

트랜스포메이션

DataSet의 트랜스포메이션은 DataFrame과 동일합니다. DataFrame의 모든 트랜스포메이션은 DataSet을 이용할 수 있습니다.

DataSet을 이용하면, 원형 JVM 데이터를 다루기 때문에 dataframe 을 이용해 트랜스포메이션을 수행하는 것 보다 좀더 복잡하고 강력한 데이터 타입으로 트랜스포메이션을 수행하는 것보다 좀 더 복잡하고 강력한 데이터 타입으로 트랜스포메이션을 사용할 수 있습니다.

필터링

Flight 클래스를 파라미터로 사용해 불리언값을 반환하는 함수를 만들어보겠습니다.

불리언 값은 출발지와 도착지가 동일한지 나타냅니다. 이 함수는 사용자 정의 함수가 아닌, 일반 함수입니다.

이전까지 했던 방식과 매우 다르므로 매우 눈 크~게 떠주세요!!!!!!

def originIsDestination(flight_row: Flight): Boolean = {
     return flight_row.ORIGIN_COUNTRY_NAME == flight_row.DEST_COUNTRY_NAME
}

위에서 정의한 함수를 filter 메서드를 적용해 각행이 true를 반환하는지 평가하고 데이터셋을 필터링할 수 있습니다.

flights.filter(flight_row => originIsDestination(flight_row)).first()

이 함수는 스파크 코드에서 호출하지 않아도된다. 스파크 코드에서 사용하기 전에 사용자 정의 함수처럼, 로컬머신의 데이터를 대상으로 테스트 할 수 있습니다.

예를 들어 다음 예제의 데이터셋은 드라이버에 모을 수 있을 만큼 아주 작기 때문에 동일한 필터 연산을 수행할 수 있습니다.

flights.collect().filter(flight_row => originIsDestination(flight_row)).first()

함수를 사용했을 때와 동일한 결과를 확인할 수 있습니다.

매핑

필터링은 단순한 트랜스포메이션입니다. 때로는, 특정 값을 다른 값으로 매핑해야합니다.

DataSet을 다루는 간단한 예제는 로우의 특정 컬럼값을 추출하는 것입니다. DataFrame에 매핑작업을 수행하는 것은 DataSet의 select 메서드를 사용하는 것과 같습니다.

val destinations = flights.map(f=> f.DEST_COUNTRY_NAME)

최종적으로 String 데이터 타입의 DataSet을 반환한다. 스파크는 결과로 반환할 jvm 데이터 타입을 알고 있기 때문에, 컴파일 타임에 데이터 타입의 유효성을 검사할 수 있습니다.

드라이버는 결과값을 모아 문자열 타입의 배열로 반환합니다.

val localDestinations = destinations.take(4)

조인

  • 조인은 Dataframe 에서와 마찬가지로 DataSet에도 동일하게 적용됩니다. 하지만,DataSet은 joinWith 처럼 정교한 메소드를 제공합니다. joinWith 메소드는 co-group (RDD 용어) 와 매우 유사하며, DataSet안쪽에 다른 두 개의 중첩된 DataSet으로 구성됩니다. 각 컬럼은 단일 DataSet이므로, DataSet객체를 컬럼처럼 다룰 수 있습니다. 그러므로, 조인 수행시 , 더 많은 정보를 유지할 수 있으며, 고급맵이나 필터처럼 정교하게 데이터를 다룰 수 있습니다.
case class FlightMetadata(count: BigInt, randomData: BigInt)

val flightsMeta = spark.range(1000).map(x => scala.utils.Random.nextLong))
.withColumnRenamed("_1","count").withColumnRenamed("_2","randomData")
.as[FlightMetadata]

val flight2= flights
.joinWith(flightsMeta, flights.col("count") === flightsMeta.col("count"))

최종적으로 로우는 Flight와 FlightMetadata 로 이뤄진 일종의 키-값 형태의 DataSet을 반환합니다. DataSet 이나, 복합 데이터 타입의 DataFrame으로 데이터를 조회할 수 있습니다.

flights.selectExpr("_1.DEST_COUNTRY_NAME")

이전 예제처럼 드라이버로 데이터를 모은 다음 결과를 반환합니다.

일반 조인 역시 아주 잘 동작합니다. 하지만, DataFrame을 반환하므로, JVM 데이터 타입 정보를 모두 잃게 됩니다.

val flights2 = flights.join(flightMeta, Seq("count"))

이 정보를 다시 얻으려면, 다른 DataSet 을 정의해야합니다. DataFrame과 DataSet을 조인하는 것은 아무런 문제가 되지 않고, 최종적으로 동일한 결과를 반환합니다.

val flights2 =  flights.join(flightsMeta.toDF(), Seq("count"))

그룹화와 집계

  1. 그룹화와 집계는 이전 장에서 보았던 것과 동일한 기본 표준을 따르므로, groupBy , rollup 그리고 cube 메서드를 여전히 이용할 수 있습니다. 하지만, DataSet 대신, DataFrame 을 반환하기 때문에, 데이터 타입 정보를 잃게 됩니다.
flights.groupBy("DEST_COUNTRY_NAME").count()
  1. groupByKey
    1. 데이터 타입 정보를 잃은 것은 그다지, 큰 문제는 아니지만, 이를 유지할 수 있는, 그룹화와 집계 방법이 있습니다. 한가지 예로 groupByKey 메서드는 DataSet 의 특정 키를 기준으로 그룹화하고 형식화된 DataSet을 반환합니다.
    2. 하지만, 이 함수는 컬럼명대신, 함수를 파라미터로 사용해야합니다. 따라서, 다음 예제와 같이 훨씬 정교화한 그룹화 함수를 이용할 수 있습니다.
flights.groupByKey(x=> x.DEST_COUNTRY_NAME).count()
  • groupByKey 메서드의 파라미터로 함수를 사용함으로써 유연성을 얻을 수 있습니다. 하지만, 스파크는 함수와 JVM데이터 타입을 최적화할 수 없으므로, 트레이드 오프가 발생합니다. 이로인해, 성능차이가 발생하고, 실행계획으로 그 이유를 확인할 수 있습니다.

다음 예제는 groupByKey 메서드를 이용해, DataFrame에 새로운 컬럼을 추가한 다음 그룹화를 수행합니다.

flights.groupByKey(x=>x.DEST_COUNTRY_NAME).count.explain

DataSet의 키를 이용해 그룹화를 수행한 다음, 결과를 키-값의 형태로 함수에 전달해 원시 객체 형태로 그룹화된 데이터를 다룰 수 있습니다.

def grpSum(countryName: String, values: Integer[Flight]) = {

     values.dropWhile(_.count < 5).map(x=> (countryName, x))
}

flights.groupByKey(x=> x.DEST_COUNTRY_NAME).flatMapGroups(grpSum).show(3
def grpSum2(f:Flight):Integer={
1
}

flights.groupByKey(x=> x.DEST_COUNTRY_NAME).mapValues(grpSum2).count().take(5)
  • 다음 예제처럼 새로운 처리 방법을 생성해 그룹을 축소하는 방법을 정의할 수 있습니다.
def sum2(left: Fligt, right: Flight) = {

Flight(left.DEST_COUNTRY_NAME, null, left.count + right.count)
}

flights.groupByKey(x=> x.DEST_COUNTRY_NAME).reduceGroups((1,r) => sum2(1,r)).take(5)

flights.groupBy("DEST_COUNTRY_NAME").count().explain
  • groupByKey 메서드는 동일한 결과를 반환하지만, 데이터 스캔 직후에, 집계를 수행하는 groupBy 메서드에 비해 , 더 비싼 처리인 것을 실행계획을 통해 알 수 있습니다.
    • 그러므로, 사용자가 정의한 인코딩으로 세밀한 처리가 필요하거나, 필요한 경우에만,DataSet의 groupByKey 메서드를 이용해야합니다.
    • DataSet은 빅데이터 처리 파이프라인의 처음과 끝 작업에 주로 이용하게 됩니다.

정리

  • 우리는 DataSet의 기초와 DataSet의 사용이 적합한 경우를 예제와 함께 알아보았습니다. 다시 말해, DataSet을 이용하기 위해 기본적으로 알아야할 내용을 알아보았습니다. 속도 비교글까지 정리하려고 했으나, 현재까지는 데이터가 작고, 로컬모드로 돌려서 인지, dataset 과 dataframe 의 시간 차이가 크게 안나네요…,인생…… 해당 부분이 정리가 되면, 추후의 글에서 뵙겠습니다!

참고 문헌

  1. https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
  2. 스파크 완벽가이드
  3. https://docs.oracle.com/javaee/7/tutorial/websocket007.htm
  4. https://docs.scala-lang.org/ko/tour/case-classes.html
반응형

'스파크' 카테고리의 다른 글

스파크 튜닝 관련 20230409  (0) 2023.04.09
Spark Join전략과 hint  (5) 2022.09.25
Spark: Cluster Computing with Working Sets 정리(해석)  (0) 2022.05.22
댓글
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함