안녕하세요~
이번에는 스파크 공부하면서, 역시 공부 시작은 시초논문이지🧐 하면서
Spark: Cluster Computing with Working Sets 논문 내용 정리와 개인적인 결론(사견) 을 넣어보았습니다.
개인적으로 스파크의 사용법과 메모리 관리에만 신경을 썼었는데, 스파크를 왜 만들었는지에 대한 정수를 이해하게 되는 기분이었습니다.
스파크 이용자라면, 한번쯤 읽어봤거나, 읽어보거나, 저의 블로그를 한번 읽어보시면 좋을거같아요 ☺️
이 글이 즐거웠거나.. 이 친구 열심히 했자낭...하면 광고 한번 눌러주세요!
내가 생각하는 결론
- 맵리듀스 사용자 모아놓고 불편해서 내가 이렇게 만들었다! 라는 생각이 시초였던 것같음.
- 맵리듀스 캐싱을 위한 rdd라는 개념의 도입 +(shared variable )⇒ 스파크
- 구현 방식 :
- 정적 getInstance 메소드를 통하기 보다는 각 라인에 대한 싱글턴 객체가 직접 이전 라인의 싱글턴 객체를 참조하도록 생성 코드를 수정.
- 언어 통합을 위해 Scala를 사용하는것은 맵과 리듀스 태스크를 정의하는데 클로저를 사용하는 하둡을 위한 Scala 인터페이스인 SMR에서 영감을 받았다. SMR를 위해 공유 변수와 클로저 직렬화의 더 강건한 구현을 기여했다.
- 노드간 스케줄링을 위한 관리자로는 Mesos 이용
- 정적 getInstance 메소드를 통하기 보다는 각 라인에 대한 싱글턴 객체가 직접 이전 라인의 싱글턴 객체를 참조하도록 생성 코드를 수정.
- 결론 : 메모리에 데이터셋을 저장해서, 반복계산에 이점이 생김
- 논문 양상 : 자신이 생각한 문제들 해결을 위한"직접 구현한 스파크", 스파크에서의 핵심과 직접 구현한 추상체(rdd, shared variable: broadcast var, accumulator var)에 대한 설명과 예시를 가지고, 실험하고, 그에 따른 퍼포먼스 비교
- 스파크 논문이 거친 raw 한 개발한 사람의 말투라면, 같은 내용이 좀더 세련되게 스파크 도큐먼트에 나온 것을 확인할 수 있어서 비교하면서 정리하였습니다. 중간중간 이해를 돕기 위한 블로그나 위키 등이 포함되어있습니다.
RDD in spark document
https://spark.apache.org/docs/3.2.1/rdd-programming-guide.html
- Spark가 제공하는 주요 추상화는 병렬로 작동할 수 있는 클러스터의 노드에 분할된 요소 모음인 탄력적인 분산 데이터 세트 (RDD)입니다.
- RDD는 모든 데이터 소스와 같은 외부 스토리지 시스템의 데이터 세트 또는 드라이버 프로그램의 기존 Scala 컬렉션으로 시작하여 변환하여 생성됩니다.
- 사용자는 Spark에 RDD를 메모리에 유지 하도록 요청할 수도 있으므로 병렬 작업에서 효율적으로 재사용할 수 있습니다. 마지막으로 RDD는 노드 장애로부터 자동으로 복구됩니다.
- Spark의 두 번째 추상화는 병렬 작업에서 사용할 수 있는 공유 변수 입니다.
- 기본적으로 Spark는 다른 노드에서 일련의 작업으로 병렬로 함수를 실행할 때 함수에 사용된 각 변수의 복사본을 각 작업에 전달합니다. 경우에 따라 작업 간에 또는 작업과 드라이버 프로그램 간에 변수를 공유해야 합니다.
- Spark는 두 가지 유형의 공유 변수를 지원합니다 .
- 모든 노드의 메모리에 값을 캐시하는 데 사용할 수 있는 브로드캐스트 변수 와
- 카운터 및 합계와 같이 "추가"만 되는 변수인 accumulator입니다.
Abstract
- MapReduce 이야기 (빅데이터 처리에 이득)
- 문제점 : 그러나 이러한 시스템의 대부분은 다른 일반적인 애플리케이션에 적합하지 않은 비순환(acyclic=단방향) 데이터 흐름 모델을 기반으로 구축됩니다.
- 해결 : 이 논문에서 제안하는 Spark는 맵리듀스의 확장성과 내고장성을 그대로 유지하면서 여러 병렬 작업에서 작업 중인 데이터 세트를 재사용하는 애플리케이션(분석도구, ml 학습 알고리즘)을 지원한다.
- 이러한 목적을 달성하기 위해 스파크는 RDD(resilient distributed datasets)라는 추상화를 소개한다.
- RDD는 여러 머신에 걸쳐 파티션이 분산되어있는 읽기 전용의 오브젝트 컬렉션이며, 파티션이 유실되면, 재구성할 수 있다.
- 스파크는 반복적인 기계학습 잡에서 하둡의 10배 이상의 성능을 보여주며, 39GB 데이터셋에 대한 인터랙디브 쿼리에서도 초단위의 응답시간을 갖는다.
1. Introduction
- 맵리듀스에 대한 상세 소개 : 확장성과 내고장성 , 다만 비순환적
- 클러스터 컴퓨팅의 유명한 새 모델
- 신뢰성이 부족한 장비로 구성된 클러스터 상에서 data-parallel한 계산은 locality-aware scheduling(지역적 스케줄링), fault tolerance(내고장성), 그리고 load balancing을 자동으로 제공하는 시스템에 의해 수행된다
- Dryad 와 Map-Reduce-Merge 같은 시스템들은 제공된 데이터 흐름의 타입을 일반화했다.
- 이러한 시스템은 어떤 프로그래밍 모델을 제공하여 scalability(확장성)과 fault tolerance(내고장성)을 달성하는데 이 모델에서는 사용자가 일련의 오퍼레이션을 통해 비순환적인 데이터 흐름 그래프를 생성하여 입력 데이터에 전달하게 된다.
- 이는 기반 시스템이 스케줄링을 관리하고 사용자 인터랙션 없이도 고장에 반응할 수 있게 돕는다.
- 맵리두스의 “비순환적인" 데이터흐름이 힘든 애플리케이션이 있다.
- 예 : 다수의 병렬 오퍼레이션상에서 작업 데이터 셋을 재사용하는
- 예제1. 반복적인 작업(Interactive jobs)
- 많은 기계학습 알고리즘은 파라미터 튜닝을 위해 한 함수를 같은 데이터셋에 반복적으로 적용한다(예. gradient descent)
- 각 반복 작업을 MapReduce/Dryad job으로 표현할 수는 있지만 각 잡은 데이터를 디스크로부터 재적재해야하며 이는 엄청난 성능 상의 불이익을 유발시킨다.
- 예제2. 애드혹 분석( Interactive analytics )
- 하둡은 분석 쿼리를 개별 맵리듀스 잡으로 수행하고 디스크에서 데이터를 읽기 때문에 수십 초 수준의 많은 지연을 발생시킨다.
- 이상: 사용자가 관심있는 데이터셋을 다수의 머신상의 메모리로 로드하여 반복적으로 쿼리를 날리기
- 예제1. 반복적인 작업(Interactive jobs)
- 예 : 다수의 병렬 오퍼레이션상에서 작업 데이터 셋을 재사용하는
- 맵리듀스의 장점인 “확장성과 내고장성”은 가져가면서 , “비순환” 단점을 해소할 새로운 클러스터 컴퓨팅 프레임워크 스파크 를 제안한다.
- 스파크의 메인 추상화 RDD(resilient distributed dataset=복원력 있는 분산 데이터 세트)
- RDD는 머신의 집합에서 분할되어있는 읽기 전용의 객체 컬렉션이며, 파티션이 유실되었을 경우 재구성할 수도 있다.
- 사용자는 across machines에서(머신 전체 범위에서) 메모리에 RDD를 명시적으로 캐시할 수 있고, MapReduce-like 병렬 작업에서 그것을 재사용할 수 있다.
- RDD는 lineage개념을 통해 내고장성을 지원한다.
- 만약 RDD의 일부분이 유실되더라도 RDD는 유실된 부분을 다른 RDD로부터 어떻게 도출하는지에 대한 충분한 정보를 갖고있다.
- RDD가 일반적인 공유 메모리 추상화는 아니지만, 표현력이라는 측면과 확장성과 신뢰성이라는 다른 측면 사이에서의 절충점을 제공한다.
- 스파크의 구현: 스칼라(Scala)
- https://ko.wikipedia.org/wiki/스칼라_(프로그래밍_언어)
- 스칼라 위키
- 스칼라(영어: Scala)는 객체 지향 프로그래밍 언어와 함수형 프로그래밍의 요소가 결합된 다중패러다임 프로그래밍 언어이다. 스칼라라는 이름은 "Scalable Language (확장 가능한 언어)"에서 유래된 것이다.
- 기존의 Java 언어가 너무 복잡하다는 단점을 극복하기 위해 2004년 Martin Odersky가 처음 개발하여 배포했다. 간결한 소스 코드를 사용하여 Java에서 구현할 수 있는 대부분의 기능을 구현할 수 있다.[1] Scala는 자바 바이트코드를 사용하기 때문에 자바 가상 머신(JVM)에서 실행할 수 있고, Java 언어와 호환되어 대부분의 자바 API를 그대로 사용할 수 있다.
- 양상을 보면, class → object 로 바뀐거 외에 자바랑 똑같이 생김.
- 타입추론또한, 자바 10부터 제공
- Scala는 자바 바이트코드를 사용하기 때문에 자바 가상 머신(JVM)에서 실행할 수 있고, Java 언어와 호환되어 대부분의 자바 API를 그대로 사용할 수 있다.
- 정적타입언어(=컴파일이 필요하단뜻)
- 자바 기반 언어 (=자바 바이트코드를 이용)
- DryadLINQ와 유사한 함수형 언어 인터페이스를 제공(=함수형 프로그래밍이라는 뜻)
- 게다가 스파크는 수정된 버전의 스칼라 인터프리터에서 인터랙티브하게 사용할 수 있으며, 이 인터프리터는 사용자가 클러스터 상의 병렬적인 작업에서 RDD, 함수, 변수, 클래스를 정의하고 사용할 수 있게 해준다.
- 우리는 스파크가 클러스터상의 거대한 데이터를 인터랙티브하게 처리하는데 사용되는 효율적이며 일반 목적으로 사용할 수 있게 해주는 최초의 프로그래밍 언어라고 생각한다.
- 초기 시스템을 사용해본 결과, 스파크는 반복적인 기계학습 작업에서는 하둡보다 10배 이상 빨랐으며 39 GB 데이터셋을 인터랙티브하게 스캔하는데 수 초 단위의 지연만으로 사용할 수 있었다.
- 이 논문은 다음과 같이 구성되어 있다. 2절은 스파크 프로그래밍 모델과 RDD를 설명한다. 3절은 몇 가지 예제 잡을 보여준다. 4절은 스칼라와 스칼라 인터프리터에 대한 통합을 포함해서 구현을 설명하고 5절은 초기 결과를 보여준다. 6절에서는 관련 연구를 조사하고 7절에서 논의하며 끝낸다.
2. Programming Model
- 스파크는 병렬 프로그래밍을 위한 두 가지 주요 추상화를 제공한다. 그것은 RDD와 RDD상에서의 병렬 작업이다 (데이터셋에 적용할 함수를 전달하여 수행됨). 추가로 스파크는 클러스터 상에서 수행중인 함수에서 사용될 수 있는 두 가지 제한적인 타입의 공유 변수를 제공한다. 이건 나중에 설명한다.
2.1 Resilient Distributed Datasets (RDDs)
- RDD는 머신 집합 상에 분할되어있는 읽기 전용의 객체 컬렉션이며 부분적인 유실이 있을 경우 재구성할 수있다.
- RDD의 원소가 물리적인 스토리지에 존재할 필요는 없다. 대신 RDD에 대한 핸들(=RDD를 계산하는데 충분한 정보를 담고 있는)은 신뢰할 수 있는 저장소의 데이터에서 시작됩니다.
- 스파크에서 각 RDD는 스칼라 객체로 표현된다. 스파크는 RDD를 4가지 방법으로 구성한다.
- HDFS와 같은 공유 파일시스템의 파일로부터 생성
- 드라이버 프로그램에서 스칼라 컬렉션을 "병렬화(=배열화)"함으로써, 여러 노드로 전송될 여러 조각으로 분할
- 기존 RDD를 변환함으로써, 유형 A의 요소가 있는 데이터 세트는 유형 A의 사용자 제공 함수를 통해 각 요소를 전달하는 flatMap이라는 연산을 사용하여 유형 B의 요소가 있는 데이터 세트로 변환할 수 있다.맵 및 필터를 포함한 다른 변환은 flatMap을 사용하여 표현할 수 있다.
- 이미 존재하는 RDD의 persistence를 캐시하여 생성. 디폴트로 RDD는 lazy하고 ephemeral하다. 즉, 데이터셋의 파티션은 그들이 병렬 오퍼레이션에서 사용될 때 마다 그때그때 실체화된다(예. by passing a block of a file through a map function) 그리고 사용 후에 메모리에서 삭제된다. 그러나 사용자는 두 가지 액션을 통해 RDD의 persistence를 바꿀 수 있다. : cache, save
- (참고사항)Persistence와 Cache가 무엇인가?(https://deviscreen.tistory.com/94)
- RDD 를 영구적으로 저장하는 기술로 결과를 즉시 저장해두었다가 이후에 필요하면 다시 불러와서 사용이 가능하다. 이것으로 컴퓨팅오버헤드를 줄일 수 있다. RDD를 저장하는 방법은 cache() 와 persist() 메서드가 있다.
- cache()를 사용하면 기본 저장 장소가 MEMORY_ONRY 이고 persist를 하면 다양한 저장소에 수준을 저장할 수 있다.
- cache() 메서드는 모든 메모리의 모든 RDD를 저장할 수 있다. 그리고 RDD를 메모리에 유지하고 병렬작업에서 효율적으로 사용 할 수 있다.
- Spark 에서 Persist가 필요한 이유
- persist 함으로서 반복적인 무거운 알고리즘과 메모리의 소비를 줄 일 수 있다.
- 여러번 반복 연산하는 것을 피하려면 스파크에 데이터 영속화(persist/persistence)를 요청할 수 있다.
- RDD 를 영구적으로 저장하는 기술로 결과를 즉시 저장해두었다가 이후에 필요하면 다시 불러와서 사용이 가능하다. 이것으로 컴퓨팅오버헤드를 줄일 수 있다. RDD를 저장하는 방법은 cache() 와 persist() 메서드가 있다.
- cache 액션은 데이터셋을 lazy하게 내버려둔다. 하지만, 최초로 계산 이후, 메모리에 남겨두라는 힌트를 제공한다.
- 만약 클러스터에 데이터셋의 모든 파티션을 캐쉬할 충분한 메모리가 없다면, 방출될 여지가 있다.
- 우리는 노드에 장애가 나거나 데이터셋이 너무 클 때에도 스파크 프로그램이 계속 동작할 수 있도록 (성능은 떨어지지만) 이 디자인을 선택했다. 이 아이디어는 가상 메모리와 비슷하다.
- save 액션은 데이터셋을 평가하고 HDFS 같은 분산 파일시스템에 rdd를 저장한다. 저장된 버전은 차후에 rdd 오퍼레이션에서 사용된다.
- 또한 우리는 다양한 레벨의 persistence를 제공하도록 스파크를 확장할 계획이다(예. 다수 노드상에서 in-memory replication). 우리의 사용자가 RDD에 대한 저장 비용, 접근 속도, 일부를 잃어버릴 확률, 재계산하는 비용 사이에서 trade-off할 수 있도록 하는 것을 목표로 하고 있다.
- (참고사항)Persistence와 Cache가 무엇인가?(https://deviscreen.tistory.com/94)
2.2 Parallel Operations
RDD에 몇 개의 병렬 오퍼레이션을 수행할 수 있다.
- reduce
- 드라이버 프로그램에서 결과를 생성하기 위해 결합적(associative) 함수를 이용하여 데이터셋의 원소를 병합한다.
- collect
- 데이터셋의 모든 원소를 드라이버 프로그램에 보낸다. 예를 들어, 배열을 병렬로 업데이트하는 간단한 방법은 배열을 parallelize하고 map하고 collect하는 것이다.
- foreach
- 사용자가 제공한 함수를 통해 각 원소를 전달한다. 이것은 오로지 데이터를 다른 시스템에 복사하거나 아래에서 설명할 공유 변수를 업데이트하는것 같은 함수의 부수효과를 위해서만 수행된다. (map vs foreach : http://stackoverflow.com/questions/354909/is-there-a-difference-between-foreach-and-map)
- 스파크는 현재 MapReduce같은 grouped reduce 작업은 지원하지 않는다. reduce 결과는 오직 한 프로세스(드라이버)에서 취합된다. 이후 분산 데이터셋에 대한 shuffle 변환을 사용하여 grouped reduce 작업을 지원할 계획이다. 이건 7절에 소개한다. 그러나 single reducer만 사용해도 많은 유용한 알고리즘들을 표현하기에 충분하다. 예를 들어 ‘MapReduce for machine learning on multicore systems’ 논문에서도 병렬 리듀서의 지원 없이 10개의 학습 알고리즘을 구현했다.
2.3 Shared Variables
https://spark.apache.org/docs/3.2.1/rdd-programming-guide.html#shared-variables
프로그래머는 스파크에게 클로져(함수)를 전달하면서 map, filter, reduce 같은 오퍼레이션을 호출한다. 함수형 언어에서 일반적인 것처럼, 이 클러져는 그것이 생성된 스코프에서 변수를 참조할 수 있다. 보통 스파크가 클로저를 워커 노드에서 수행할 때 이 변수들은 워커로 카피된다. 또한 스파크는 간단하지만 일반적으로 사용되는 두 가지 패턴을 지원하기 위해서 프로그래머가 두 가지 제한된 타입의 공유 변수를 생성할 수 있게 해준다.
- Broadcast variables
- 읽기 전용의 큰 데이터 조각(예: 룩업 테이블)이 다중 병렬 작업에 사용되는 경우, 모든 함수에 그 데이터를 묶어주는것 대신, 워커들에게 한 번만 배포하는 것이 바람직하다. Spark는 프로그래머가 값을 랩하는 "Broadcast Variable" 개체를 만들고 각 워커들에게만 복사되도록 합니다
scala>val broadcastVar= sc.broadcast(Array(1, 2, 3)) broadcastVar: org.apache.spark.broadcast.Broadcast\[Array\[Int\]\]=Broadcast(0) scala> broadcastVar.value res0: Array\[Int\]=Array(1, 2, 3)
- 읽기 전용의 큰 데이터 조각(예: 룩업 테이블)이 다중 병렬 작업에 사용되는 경우, 모든 함수에 그 데이터를 묶어주는것 대신, 워커들에게 한 번만 배포하는 것이 바람직하다. Spark는 프로그래머가 값을 랩하는 "Broadcast Variable" 개체를 만들고 각 워커들에게만 복사되도록 합니다
- Accumulators
- associative , commutative 연산을 통해서만 "추가"되는 변수로서, 병렬로 효율적으로 지원되어야 한다. counters (as in MapReduce) 혹은 sums을 구현하는데 이용할 수 있다..스파크는 기본적으로 numeric types타입을 지원하고, 새로운 유형을 개발자가 추가할 수 있다.
3. Examples
여기에서는 몇 가지 스파크 프로그램 샘플을 소개한다. 스칼라는 타입 추론을 지원하기 때문에 변수 타입을 생략하는 것에 주의하라.
3.1. Text Search
HDFS에 저장된 로그파일에서 “ERROR”가 있는 라인 수를 세고 싶다고 해보자. 다음과 같이 데이터셋 객체에서 시작하여 구현할 수 있다.
- 먼저 라인들의 집합으로 HDFS 파일을 나타내는 file이라는 분산 데이터셋을 생성한다.
- 그리고 문자열 “ERROR”를 담고있는 라인의 집합(errs)을 생성하기 위해 이 데이터셋을 필터링
- 필터링된라인을 1로 매핑하고
- reduce를 이용하여 그것들을 합산한다.
errs와 ones는 실체화된적이 없는 lazy RDD라는 점에 주의하라. 대신, reduce가 호출될 때, 각 워커 노드는 ones를 평가하기 위해 스트리밍 방식으로 입력 블럭을 스캔하고, local reduce를 수행하기 위해 이것들을 합산하고, 그것에 대한 local count를 드라이버로 보낸다. 이런 방식으로 lazy 데이터셋을 사용해서 스파크는 MapReduce를 비슷하게 모방한다.
스파크가 다른 프레임워크와 다른 지점은 중간 데이터셋의 일부를 오퍼레이션에 걸쳐 persist로 만들수 있다는 부분이다. 예를 들어 errs 데이터셋을 재사용하고 싶다면 다음과 같이 errs로부터 캐쉬된 RDD를 생성할 수 있다.
val cachedErrs = errs.cache()
3.2. Logistic Regression
- 아래 프로그램을 로지스틱 회귀를 구현한다. 이것은 점들을 두 집합으로 분리하는 가장 최적의 w를 찾으려고 시도하는 반복적인 분류 알고리즘이다.
- 알고리즘은 gradient descent를 수행한다. gradient descent는 랜덤값으로 w를 시작하고, 각 반복에서 개선하는 방향으로 w를 움직이기 위해 데이터셋 상에서 w의 함수를 합한다.
- 고로 각 반복에 걸쳐 데이터를 메모리에 캐쉬하면 커다란 이득이 된다.
- 로지스틱 회귀를 자세하게 설명하지 않지만 스파크의 새로운 몇 가지 기능을 보여주는데 이 예시를 사용한다.
- points라는 RDD를 생성하고 for 루프를 수행하며 그것을 반복 처리한다.
- Scala에서 for 키워드는 클로져를 loop 몸체로 가지는 컬렉션의 foreach 메소드를 호출하기 위한 syntactic sugar이다. 즉, for(p <- points){body}라는 코드는 points.foreach(p => {body})와 같다. 그러므로 스파크의 병렬 foreach 오퍼레이션을 수행한 것이다.
- gradient를 합산하기 위해 Vector 타입의 값을 가지는 grad라는 accumulator 변수를 사용한다. accumulator와 for 문법의 조합은 스파크 프로그램을 좀 더 명령형 순차 프로그램처럼 보이게 해준다.
3.3. Alternating Least Squares
- 다수의 노드에 공유 데이터셋을 복사하는 반복적인 잡을 위한 브로드캐스트 변수의 이점 측정용
마지막 예제는 ALS라 불리는 알고리즘이다. ALS는 사용자의 별점 이력에 기반하여 그들이 보지 않은 영화에 대한 별점을 예측하는 협업 필터링 문제에 사용된다. 이전까지의 예제와는 다르게 ALS는 데이터 집약적이기 보다는 계산 집약적이다.
여기서는 ALS를 간단히 설명한다. 자세한 내용은 [27]을 참고하라. 우리는 U 사용자가 M 영화에 별점 몇점을 줄것인지 예측하기를 원하고, 사용자가 선호도를 표시한 몇 개의 영화에 대한 별점 정보가 부분적으로 담겨있는 메트릭스 R을 갖고 있다고 해보자. ALS는 R을 각각 mk, ku 차원을 가진 M과 U 두 메트릭스의 곱으로 모델링한다. 즉, 각 사용자와 영화는 특징을 설명하는 k 차원 특성 벡터를 가진다. 그리고 영화에 대한 사용자의 선호도는 해당 사용자와 영화의 특성벡터의 내적이다. ALS는 알려진 선호도값에서 M과 U를 풀고나서 알려지지 않은 값을 예측하기 위해 M*U를 계산한다. 이는 아래와 같은 반복적인 프로세스로 수행된다.
- M을 랜덤값으로 초기화한다.
- 주어진 M에 대해 R과의 오차를 최소화하도록 U를 최적화한다.
- 주어진 U에 대해 R과의 오차를 최소화하도록 M를 최적화한다.
- 수렴할때까지 단계 2와 3을 반복한다.
ALS는 단계 2, 3에서 각 노드에 다른 사용자/영화를 업데이트하여 병렬화할 수 있다. 그러나 모든 스텝에서 R을 사용하기 때문에 매 단계에서 각 노드에 재전송하지 않도록 R을 broadcast 변수로 만드는 편이 유용하다. 스파크의 ALS 구현은 아래와 같다. 0부터 u까지 컬렉션(스칼라 range 객체)을 parallelize하고 각 배열을 업데이트하기 위해 그것을 collect하는 것에 유의하라.
4. Implementation
스파크는 Mesos 위에 구축된다.
- Mesos 가 일종의 클러스터 관리자로 Spark 마스터역할로 시작한듯.
- 드라이버가 작업을 생성하고 스케줄링을 위한 작업을 발행하기 시작하면 Mesos는 어떤 기계가 어떤 작업을 처리하는지 결정합니다
- 스파크 도큐먼트 running-on-mesos
- https://spark.apache.org/docs/3.2.1/running-on-mesos.html
- 참고 : Apache Mesos 지원은 Apache Spark 3.2.0부터 더 이상 사용되지 않습니다. 향후 버전에서 제거될 예정입니다.Mesos와 함께 Spark를 배포할 때의 이점은 다음과 같습니다.
- Spark와 다른 프레임워크 간의 동적 분할
- 여러 Spark 인스턴스 간의 확장 가능한 파티셔닝
- Spark는 Apache Mesos 에서 관리하는 하드웨어 클러스터에서 실행할 수 있습니다 .
- Mesos는 다수의 병렬 어플리케이션이 fine-grained 방법으로 클러스터를 공유하고 클러스터 상에서 태스크를 실행하기 위해 애플리케이션을 위한 API를 제공하는 클러스터 운영 체제이다. 이것은 스파크가 하둡, MPI같은 기존의 클러스터 프레임워크와 함께 수행되고 그들 간에 데이터를 공유할 수 있게 해준다.(?) 게다가 Mesos 상에 구축하는 것은 스파크에 들여야 하는 프로그래밍 노력을 엄청나게 감소시켜준다.
- (https://spark.apache.org/docs/latest/running-on-mesos.html)
스파크 코어는 RDD의 구현체이다. 예로, 위 예시에서 cachedErrs라는 캐쉬된 데이터셋을 정의하고, map과 reduce를 사용해 그것의 원소를 세고 있다고 해보자.
이 데이터셋은 그림 1에서 보는 것처럼 각 RDD의 리니지를 포착하는 객체의 연쇄로서 저장된다. 각 데이터셋 객체는 그것의 부모에 대한 포인터를 갖고 있으며 부모가 어떻게 변환되었는지에 대한 정보도 가지고 있다.
내부적으로 각 RDD 객체는 세 가지 오퍼레이션으로 구성된 동일한 간단한 인터페이스를 구현한다.
- getPartitions: 파티션 ID 리스트를 반환한다.
- getIterator(partition): 파티션 상에서 반복을 수행한다.
- getPreferredLocations(partition): 데이터 지역성을 달성하는데 필요한 태스크 스케줄링을 위해 사용된다.
- 데이터셋에서 병렬 오퍼레이션이 수행될 때, 스파크는 데이터셋의 각 부분을 처리하기 위해 태스크를 생성하고 이것을 워커 노드에 보낸다. 우리는 delay scheduling이라는 기술을 사용해서 각 태스크를 적당한 위치 중 한군데로 보내려고 시도한다. 일단 워커에서 수행되면 각 태스크는 각 파티션을 읽기 시작하기 위해 getIterator를 호출한다.
- 다른 종류의 RDD는 RDD 인터페이스를 어떻게 구현했는지만 다르다.
- 예들 들어, HdfsTextFile에서는 파티션은 블럭 ID이고, 적절한 위치는 블럭 위치이며, getIterator는 블럭을 읽기 위해 스트림을 연다.
- MappedDataset에서는 파티션과 적당한 위치가 부모와 동일하다. 그러나 반복자는 맵 함수를 부모의 원소에 적용한다.
- 마지막으로 CachedDataset에서는 getIterator메소드가 변환된 파티션의 캐쉬된 지역 카피본을 찾고, 각 파티션의 선호 위치는 처음엔 부모의 선호 위치와 동일하게 시작하지만 파티션이 어떤 노드에 캐쉬되고나면 그 노드에서 재사용하기 편하게 하기위해 업데이트 된다.
- 이 디자인은 고장을 쉽게 처리할 수 있게 해준다. 만약 노드가 실패하면 그것의 파티션은 부모 데이터셋으로부터 다시 읽어들여지고 결국 다른 노드에 캐쉬된다.
- 마지막으로, 태스크를 워커에 적재하는 것은 워커에 클로저를 적재하는것을 필요로 한다.
- 이는 분산 데이터셋을 정의하기 위해 사용되는 클로저와 reduce같이 오퍼레이션에 전달하기 위한 클로저 모두에 해당한다.
- 이를 위해 우리는 스칼라 클로저가 자바 객체이고 자바 serialization을 사용하여 직렬화될수 있다는 점에 의존한다. 이것은 계산을 다른 머신에 보내기 쉽게 만들어주는 스칼라의 특징이다.
- 스칼라의 내장 함수 구현은 이상적이지 않다.
- 사유: 클로저 오브젝트가 몸체에서 실제로 사용되지 않는 함수의 스코프 바깥에서 변수를 참조하는 경우가 발견됨
- Shared Variables
- 스파크에는 브로드캐스트 변수와 accumulator 두 종류의 공유 변수가 커스텀 직렬화 포맷의 클래스를 사용하여 구현되었다.
- 브로드캐스트 변수 사용자가 브로드캐스트 변수 b를 v라는 값으로 생성하면 v는 공유 파일 시스템에 파일로 저장된다. b의 직렬화된 형태는 이 파일에 대한 경로이다. 워커 노드에서 b의 값이 요청되면 스파크는 먼저 v가 로컬 캐쉬에 있는지 여부를 체크하고 만약 없다면 파일 시스템에서 읽어들인다. 처음에는 브로드캐스트 변수에 HDFS를 사용했으나 좀 더 효율적인 스트리밍 브로드캐스트 시스템을 개발중이다.
- accumulator accumulator는 다른 직렬화 트릭을 사용해서 구현된다. 각 accumulator에는 생성 시점에 고유 ID가 주어진다. accumulator가 저장될 때 직렬화 형식은 ID와 해당 타입의 zero 값을 포함한다. 워커에서는 태스크를 수행하는 각 스레드를 위해 별도의 accumulator 복사본이 thread-local 변수를 사용하여 생성되고 태스크가 시작할 때 zero로 리셋된다. 각 태스크가 수행된 후 워커는 여러 accumulator에 가해진 업데이트를 담고 있는 메세지를 드라이버 프로그램에 보낸다. 드라이버는 실패로 인해 재실행된 태스크에서 중복 카운팅되는 것을 막기 위해 각 오퍼레이션의 각 파티션으로부터 단 한번만 업데이트한다.
- Interpreter Integration(인터프리터 통합)
- 일반 스칼라 인터프리터
- 스칼라 인터프리터는 보통 사용자가 형식화한 각 라인에 대한 클래스를 컴파일하여 작동한다. 이 클래스는 그 라인 상의 변수나 함수를 담고 있는 싱글톤 객체를 포함하며, 라인의 코드를 해당 생성자에서 수행한다.
- 예를 들어 사용자가 x = 5 다음에 println(x)를 쳤다면 인터프리터는 x를 담고 있는 Line1이라 불리는 클래스를 정의하며 두 번째 라인을 println(Line1.getInstance().x)로 컴파일하게 만든다. 이 클래스들은 각 라인을 실행하기 위해 JVM에 로드된다.
- 변경점: 스파크에서 인터프리터가 작동하도록 하기 위해 두 가지를 수정하였다.
- 우리는 인터프리터가 ( worker가 커스텀 자바 클래스 로더를 사용해 로드한 ****)공유 파일시스템에 정의한 클래스를 출력하게 했다
- 정적 getInstance 메소드를 통하기 보다는 각 라인에 대한 싱글턴 객체가 직접 이전 라인의 싱글턴 객체를 참조하도록 생성 코드를 수정했다. (e.g., a line setting x = 7 in the example above)
- 이는 클로저가 워커에 보내지기 위해 직렬화될 때 마다 싱글턴이 참조하는 현재 상태를 캡쳐할 수 있게 해주었다. 만약 이렇게 하지 않았다면 싱글턴 객체에 대한 업데이트는 워커들로 전파되지 않을 것이다.
- 일반 스칼라 인터프리터
5. Results
스파크 구현은 아직 초기 단계지만 클러스터 컴퓨팅 프레임워크로써의 가능성을 보여주는 3 가지 실험 결과를 소개한다.
Logistic Regression
- 3.2절에서 소개한 로지스틱 회귀의 성능과 하둡을 이용한 구현의 성능을 비교하였다. 4코어 m1.xlarge EC2 노드 20대에서 29 GB 데이터를 사용했을 때 결과는 그림 2와 같다. 하둡 구현에서는 각 반복이 독립적인 맵리듀스 잡으로 수행되기 때문에 127s 소요된다. 스파크에서는 첫 번째 반복만 174s 걸렸을(자바 대신 스칼라를 사용하기 때문인 것 같음) 뿐 이후부터는 캐쉬된 데이터를 재사용하기 때문에 각 반복에서 단 6s만 소요된다. 이는 잡을 10배 더 빠르게 수행하게 해준다.
- 잡 수행중에 노드를 정지시키는 실험도 수행
- 복구 성공: 데이터 파티션이 다른 노드들에서 병렬로 재계산되고 캐쉬되었지만
- 속도 측면 : 10-이터레이션의 경우 평균적으로 50s(21%)까지 느려졌다. 고장난 노드의 이 실험에서는 복구 시간이 다소 높았다.
- 사유: 이는 큰 HDFS 블럭 크기(128 MB)를 사용하여 노드 당 12 블럭만 존재했고 복구 프로세스가 클러스터의 모든 코어를 활용하지 못했기 때문이다. 작은 블럭 크기를 사용하면 복구 시간도 빨라질 것이다.
Alternating Least Squares
- 연구 목적 : 브로드캐스트 변수 성능테스트
- 브로드캐스트 변수를 사용해서 R을 워커의 메모리에 캐싱하는 것은 성능을 2.8배까지 개선시켰다.(5000 영화, 15000 사용자, 30 노드 EC2 클러스터에서 실험)
Interactive Spark
- 연구수행
- 9 GB 위키피디아 덤프를 15개의 m1.xlarge EC2 머신상의 메모리에 로드하는데 스파크 인터프리터를 사용했고 거기에 인터랙티브한 쿼리를 수행했다.
- 결과
- 데이터셋이 처음 쿼리됐을 때 대략 35초 정도 소요되어 하둡 잡으로 수행했을 때와 비슷했다. 하지만 후속 쿼리들은 단지 0.5~1초만에 전체 데이터를 스캔할 수 있었다
6. Related Work
Distributed Shared Memory
스파크 RDD는 이미 광범위하게 연구되어온 분산 공유 메모리 (DSM)에 대한 추상화로 볼 수 있다. RDDs 는 두 가지 방법으로 DSM 인터페이스와 다르다.
- RDDs 는 훨씬 제한적 프로그래밍 모델을 제공한다, 그러나 그것은 만약 클러스터 노드가 실패했을 때 데이터셋을 효과적으로 재 구축할 수 있다.
- 일부 DSM 시스템은 내결함성을 체크 포인팅을 통해 달성하는 동안
- 스파크는 RDD 객체안에 캡쳐된 계보 정보를 사용해서 잃어버린 RDDs 의 파티션을 재구축한다.
- 이것은 오직 잃어버린 파티션만 다시 계산해야 한다는 것을 의미하고, 그것은 프로그램의 체크포인트로 프로그램을 되돌릴 필요 없이, 다른 노드 위에서 병렬로 다시 계산할 수 있다는 의미이다.
- 추가로, 그들은 노드가 실패하지 않는 경우 오버헤드가 없다.
- RDDs 는 임의의 노드가 글로벌 주소 공간에 접근하게 하는 것이 아니라 맵리듀스처럼 데이터에 계산을 전송한다.
Language Integration
- 스파크의 언어 통합은 DryadLINQ와 비슷하다. DryadLINQ는 쿼리를 정의한 표현 트리를 캡쳐하고 클러스터 상에서 수행하기 위해서 언어 통합 쿼리를 위한 .NET의 지원을 사용한다.
- DryadLINQ와는 달리 스파크는 RDD가 병렬 오퍼레이션들에 걸쳐 메모리상에 영속되는 것을 허용한다. 게다가 스파크는 커스텀 직렬화 형식의 클래스를 사용해 구현된 shared variables (broadcast variables and accumulators)를 지원하여 언어 통합 모델의 내용을 풍부하게 한다.
- 언어 통합을 위해 스칼라를 사용하는것은 맵과 리듀스 태스크를 정의하는데 클로저를 사용하는 하둡을 위한 스칼라 인터페이스인 SMR에서 영감을 받았다. SMR를 위해 공유 변수와 클로저 직렬화의 더 강건한 구현을 기여했다.
7. Discussion and Future Work
- 스파크는 클러스터 프로그래밍을 위해 세 가지 간단한 데이터 추상화를 제공한다.
- RDD
- 두 개의 제한적 공유 변수 타입인
- 브로드캐스트 변수
- accumulato
이 세 추상화는 제한적이지만 반복적이고 인터랙티브한 계산을 필요로 하는 기존의 클러스터 컴퓨팅 프레임워크에서는 어려움에 놓여있는 여러 애플리케이션을 표현하는데 충분히 강력하다는 것을 발견했다.
- 더불어, 신뢰성있는 스토리지의 사용 가능한 데이터로부터 데이터셋을 재구성하기 위한 충분한 정보를 가지는 데이터셋 기능이라는 RDD의 핵심 아이디어는 클러스터 프로그래밍을 위한 다른 추상화를 개발하는데 유용하다는 것을 입증한다고 생각한다.
앞으로 네 영역에 집중할 계획이다.
- RDD의 속성과 스파크의 다른 추상화 그리고 다양한 분야의 애플리케이션이나 작업에 대한 적용성을 정식으로 특징짓기
- 프로그래머가 스토리지 비용과 재구성 비용 간의 트레이드오프를 선택할 수 있도록 RDD 추상화를 개선하기
- 주어진 key로 RDD를 재분배하는 shuffle 오퍼레이션을 포함하여, RDD를 변환하는 새 오퍼레이션 정의하기. 이러한 오퍼레이션은 group-by나 join을 구현할 수 있게 해준다.
- 스파크 인터프리터 위에 SQL이나 R shell 같은 더욱 고수준의 인터랙티브 인터페이스를 제공
참고 문헌
- https://ko.wikipedia.org/wiki/스칼라_(프로그래밍_언어)
- https://www.usenix.org/legacy/event/hotcloud10/tech/full_papers/Zaharia.pdf
- https://bt22dr.wordpress.com/2014/09/14/441/
- https://deviscreen.tistory.com/94
- https://spark.apache.org/docs/latest/
'스파크' 카테고리의 다른 글
스파크 튜닝 관련 20230409 (0) | 2023.04.09 |
---|---|
Spark Join전략과 hint (5) | 2022.09.25 |
Spark Dataset 개념과 이해 (0) | 2022.07.24 |