티스토리 뷰

반응형

Intro

kubeflow

네.. 요즘 쿠베플로우에 관심을 주고 있어요.. 이 꼴뚜기 예찬자가 머선일? 🧐 이러면 이거도 꼴뚜기에요……. 🥺

쿠버플로우 파이프라인을 이용해보려고 하는데요, 제약사항은 저는 데엔 관점에서 쿠베플로우를 쓰고 있기때문에,수많은 쿠베플로우 기능 전체를 이용한 것이 아니라는 점입니다.

굉장히 많은 기능을 안고 있기 때문에, 다른 기능들이 궁금하시다면, 도큐먼트를 차근차근 따라가 보는 것도 좋을 것 같아요😊 저는, 슥슥 지나가고, 그 중에 일부인 파이프라인 이용법에대해서만 이야기 해볼겁니다!

kubeflow란?

개념 : Kubernetes를 활용한 머신 러닝 툴킷

Kubeflow는 Kubernetes를 활용한 머신 러닝 툴킷입니다. 쿠버네티스와 + 머신러닝툴들이 결합된 총 키트라고 보면됩니다.

ML 워크플로의 각 단계에 사용할 플랫폼과 서비스를 선택하도록 구성을 조정할 수 있습니다.

  1. 데이터 준비
  2. 모델 트레이닝,
  3. 예측 제공
  4. 서비스 관리

이러한 것들을 제공합니다.

여기까지가 공식 도큐먼트 설명인데요,

즉, data exploration / feature extraction, preparation / training, tuning / serving / testing / versioning 등 각 단계를 위한 툴들의 집합이자, 각 단계, 구성요소들을 연결해주는 파이프라인 시스템이라 보면 되는데요, data engineer / data scientist 모두가 이용할 수 있는 시스템입니다.

저희가 데이터를 가공해 트레이닝시키고, 서비스 관리하기 까지의 모든 과정에 containerization, scalability, portability, repeatability를 위해 쿠버네티스를 통합한 시스템으로 제공합니다.

역사

  1. Kubeflow는 TensorFlow Extended 라는 Google이 내부적으로 TensorFlow 를 실행하는 방식의 오픈 소스로 시작되었습니다 .
  2. Kubernetes에서 TensorFlow 작업을 실행하는 더 간단한 방법으로 시작했지만 이후 엔드 투 엔드 머신 러닝 워크플로를 실행하기 위한 다중 아키텍처, 다중 클라우드 프레임워크로 확장되었다고 합니다.

what connection between argo & kubeflow?

아니, 목표만 들었을때, 그럼 에어플로나, 알고나 그냥 그런거 쓰면되는거 아냐?

라는 생각이 들면 맞습니다. 그냥 기존 것을 이용해도돼요.

그러면 이건 뭐가 다르길래 쿠베플로 왤케 유명함?! 이런다면, 흔히 우리가 실험하고, 결과를 이용해 서비스를 만들어, 배치 작업으로 운영하기까지의 노트북이나, 하이퍼파라미터 튜닝, 피처스토어 등을 통해 전체 과정을 지원합니다.

https://valohai.com/blog/kubeflow-vs-argo/

특히, argo workflow 의 기능을 그대로, kubeflow 에서 yaml → python 기능을 추가해 (DSL Compiler) 들고온 것이라, 사실상, kubeflow 내부에 argo 기능이 포함돼있다고 보는 것이 맞습니다.

Kubeflow Components

쿠베플로우에서 제공하는 컴포넌트들입니다. 상당히 다양한 툴킷들이 있어, 데이터를 준비하고, 정제해 피처를 가공하고 학습시켜 서빙하기까지의 오퍼레이터들이 제공되는 것들을 볼 수 있습니다.

https://www.kubeflow.org/docs/components/

1. Data/Feature Preparation

  • jupyter notebook
    • 쿠버플로우 내에서 cluster에서 바로 인스턴스 생성 가능합니다! 또한, 쿠베플로우 컴포넌트들을 주피터 내에서 사용 가능해요
  • 또한, Feature Store 를 제공해주는 데요, (아직 알파 단계)

2. Training Operator 제공

https://www.kubeflow.org/docs/components/training/

operator 를 통해, ml 모델을 훈련시킬 수 있습니다.

  • TensorFlow
  • PyTorch
  • Apache MXNet
  • XGBoost
  • Message passing interface (MPI)

등을 제공합니다.

3. Katib제공 : Hyperparameter Tuning

https://www.kubeflow.org/docs/components/katib/

  • Katib란, autoML을 위한 kubeflow native한 프로젝트인데요,
    • 다양한 ML framework 지원하고,
    • autoML algorithm 지원합니다.

4. Pipelines

  • Kubeflow Pipelines를 통해 airflow나, argo workflow 처럼 전체 flow를 모니터링, 구성할 수 있게끔 합니다. recurring run 등을 제공합니다.
  • kubeflow pipeline 은 docker 컨테이너를 기반으로 하는 이식가능하고 확장 가능한 머신러닝 워크플로 구축 및 배포하기 위한 플랫폼입니다.

kubeflow pipeline 사용법

kubeflow 파이프라인 사용법입니다.

Kubeflow Pipelines는 argo workflow 가 내장되어있으니, Argo 의 YAML 파일로 저장되는데요,(?)

Python DSL을 제공해, dsl 컴파일러에 의해, yaml 한 줄 안짜고도, python으로 파이프라인을 작성할 수 있습니다.

파이프라인 생성 과정 미리보기

이런 형태로, 파이프라인을 작성하게됩니다.

  1. 컨테이너 생성 : 도커 컨테이너, 혹은 파이썬 함수를 컨테이너(컴포넌트)화합니다. (간단한 파이썬 함수도 , 파이프라인 에서 이용하려면, 컨테이너 등록이 필요합니다.)
  2. operation 생성 : 컨테이너를 전달할 명령줄 오퍼레이션을 작성합니다. 데이터 마운트나, 해당 컨테이너를 참조하는 작업도 수행 가능합니다.
  3. 순서 설정 : 병렬로 발생할 수 있는 작업과 추가 단계로 이동하기 전에 완료해야 하는 작업을 정의하여 작업을 순서를 지정합니다.
  4. 컴파일 : 다른 워크플로들에 비해 다른 점인데요, Python에 정의된 이 파이프라인을 YAML 파일로 컴파일합니다.

그럼, 이제 직접 예시를 통해 배워볼까요?

1. 컨테이너(컴포넌트) 생성

  • 도커 컨테이너, 혹은 파이썬 함수를 컨테이너(컴포넌트)화합니다.
kfp.components.func_to_container_op(func)
  • 이렇게 컴포넌트로 변환해야만, 파이프라인에서 사용할 수 있습니다.
  • 예제: simple_echo 함수를 이용해 simpleStronglyTypedFunction 라는 이름의 컴포넌트 만들기
import kfp

def simple_echo(i) :
  return i

simpleStronglyTypedFunction =\
      kfp.components.func_to_container_op(simple_echo)

# 예.
foo = simpleStronglyTypedFunction(1)
print(type(foo)) # kfp.dsl._container_op.ContainerOp
  • 기본틀)
def func_to_container_op(func, 
       output_component_file=None, 
       base_image: str = None, 
       extra_code='', 
       packages_to_install: List[str] = None, 
       modules_to_capture: List[str] = None, 
       use_code_pickling=False):
  • 축약형) 함수 위에 @func_to_container_op

2. operation 생성

  • 컨테이너를 전달할 명령줄 오퍼레이션을 작성합니다. 데이터 마운트나, 해당 컨테이너를 참조하는 작업도 수행 가능합니다.

다음은, 컴포넌트들을 이용해, 파이프라인을 구성하고, 오퍼레이션을 생성해봅시다.

2.1. 파이프라인 명시

@dsl.pipeline(
    name='XGBoost Trainer',
    description='A trainer that does end-to-end distributed training for XGBoost models.'
)
def xgb_train_pipeline(
    output='gs://your-gcs-bucket',
    project='your-gcp-project',
    cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER,
    region='us-central1',
    train_data='gs://ml-pipeline-playground/sfpd/train.csv',
    eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
    schema='gs://ml-pipeline-playground/sfpd/schema.json',
    target='resolution',
    rounds=200,
    workers=2,
    true_label='ACTION',
):
 ...
  • 위에 보기와 같이 @dsl.pipeline 으로, 파이프라인의 name, description 과 같은 메타 정보를 준뒤에, 오는 함수가 파이프라인입니다.
  • 파이프라인에는 매개변수를 줄 수 있고, 이후에 원하는 데로 , 컴포넌트들을 배치하면 됩니다.
  • exithandler, 조건문, 볼륨 마운트 등의 다양한 연산을 지원합니다. 더 자세한 정보는 도큐먼트를 확인해주세요.

3. 순서 설정

  • 병렬로 발생할 수 있는 작업과 추가 단계로 이동하기 전에 완료해야 하는 작업을 정의하여 작업을 순서를 지정합니다.
  • after 라는 예약어를 이용해 실행합니다.
...

_create_cluster_op = dataproc_create_cluster_op(
            project_id=project,
            region=region,
            name=cluster_name,
            initialization_actions=[
              os.path.join(_PYSRC_PREFIX,
                           'initialization_actions.sh'),
            ],
            image_version='1.2'
        )

_analyze_op = dataproc_analyze_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            schema=schema,
            train_data=train_data,
            output=output_template
).after(_create_cluster_op).set_display_name('Analyzer')

...
  • 예를 들어 이렇게 지정하는 경우, _create_cluster_op 연산 이후에, _analyze_op 가 수행되게 됩니다.

4. 컴파일

  • Python에 정의된 이 dsl 파이프라인을 YAML 파일로 컴파일합니다.
kfp.compiler.Compiler().compile(echo_pipeline,
  'echo-pipeline.zip')
# 마지막으로 파이프라인을 압축된 YAML 파일로 컴파일 -> 파이프라인 UI에 업로드할 수 있다.
import kfp
def simple_echo(i) :
  return i

simpleStronglyTypedFunction =\
      kfp.components.func_to_container_op(simple_echo)

# 예.
foo = simpleStronglyTypedFunction(1)
print(type(foo)) # kfp.dsl._container_op.ContainerOp

@kfp.dsl.pipeline(
  name='Simple Echo', #  메타데이터
  description='This is an echo pipeline. It echoes numbers.'
)
def echo_pipeline(param_1: kfp.dsl.PipelineParam): #  하나의 매개변수(반향할 숫자)-> 실제 사용 사례에서는 기계 학습 알고리즘을 위한 하이퍼 매개변수와 같이 나중에 조정할 수 있는 변수를 포함할 수 있다
  my_step = simpleStronglyTypedFunction(i= param_1)

kfp.compiler.Compiler().compile(echo_pipeline,
  'echo-pipeline.zip')# 1. 마지막으로 파이프라인을 압축된 YAML 파일로 컴파일 -> 파이프라인 UI에 업로드할 수 있다.
  • 컴파일 후 실행 까지 한번에 수행
    • client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)
    • 해당 명령어를 노트북에서 실행하면, 별도의 run 과정 생성 필요없이, 바로 run 시킬 수 있습니다.

# 2. Submit the pipeline for execution

client = kfp.Client()

#Specify pipeline argument values
arguments = {'param_1': 1}
#Submit a pipeline run
client.create_run_from_pipeline_func(calc_pipeline, arguments=arguments)
  • 쿠베플로우 도큐먼트에서 나온 전체 예시인데요, 한번 흐름을 살펴보세요. GitHub 에서 전체 코드를 볼 수 있습니다 .
@dsl.pipeline(
    name='XGBoost Trainer',
    description='A trainer that does end-to-end distributed training for XGBoost models.'
)
def xgb_train_pipeline(
    output='gs://your-gcs-bucket',
    project='your-gcp-project',
    cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER,
    region='us-central1',
    train_data='gs://ml-pipeline-playground/sfpd/train.csv',
    eval_data='gs://ml-pipeline-playground/sfpd/eval.csv',
    schema='gs://ml-pipeline-playground/sfpd/schema.json',
    target='resolution',
    rounds=200,
    workers=2,
    true_label='ACTION',
):
    output_template = str(output) + '/' + dsl.RUN_ID_PLACEHOLDER + '/data'

    # Current GCP pyspark/spark op do not provide outputs as return values, instead,
    # we need to use strings to pass the uri around.
    analyze_output = output_template
    transform_output_train = os.path.join(output_template, 'train', 'part-*')
    transform_output_eval = os.path.join(output_template, 'eval', 'part-*')
    train_output = os.path.join(output_template, 'train_output')
    predict_output = os.path.join(output_template, 'predict_output')

with dsl.ExitHandler(exit_op=dataproc_delete_cluster_op(
        project_id=project,
        region=region,
        name=cluster_name
    )):
        _create_cluster_op = dataproc_create_cluster_op(
            project_id=project,
            region=region,
            name=cluster_name,
            initialization_actions=[
              os.path.join(_PYSRC_PREFIX,
                           'initialization_actions.sh'),
            ],
            image_version='1.2'
        )

        _analyze_op = dataproc_analyze_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            schema=schema,
            train_data=train_data,
            output=output_template
        ).after(_create_cluster_op).set_display_name('Analyzer')

        _transform_op = dataproc_transform_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=train_data,
            eval_data=eval_data,
            target=target,
            analysis=analyze_output,
            output=output_template
        ).after(_analyze_op).set_display_name('Transformer')

        _train_op = dataproc_train_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            train_data=transform_output_train,
            eval_data=transform_output_eval,
            target=target,
            analysis=analyze_output,
            workers=workers,
            rounds=rounds,
            output=train_output
        ).after(_transform_op).set_display_name('Trainer')
        _predict_op = dataproc_predict_op(
            project=project,
            region=region,
            cluster_name=cluster_name,
            data=transform_output_eval,
            model=train_output,
            target=target,
            analysis=analyze_output,
            output=predict_output
        ).after(_train_op).set_display_name('Predictor')

        _cm_op = confusion_matrix_op(
            predictions=os.path.join(predict_output, 'part-*.csv'),
            output_dir=output_template
        ).after(_predict_op)

        _roc_op = roc_op(
            predictions_dir=os.path.join(predict_output, 'part-*.csv'),
            true_class=true_label,
            true_score_column=true_label,
            output_dir=output_template
        ).after(_predict_op)

       dsl.get_pipeline_conf().add_op_transformer(
        gcp.use_gcp_secret('user-gcp-sa'))

5. 수행

이러한 형태로 작성한 파이프라인을, zip 으로 파이프라인을 업로드 해서 등록하거나, 바로 실행시켜서 run 을 시켜서 “pipeline” 탭에서 내가 등록한 파이프라인을 확인하고, “run”에서 실제로 동작하는 파이프라인들을 확인할 수 있습니다.

kubeflow pipeline 사용법 (gcp 에서 설명 버전)

1. 워크플로의 입력 매개변수와 기본값을 정의

먼저, 워크플로의 입력 매개변수와 기본값을 정의합니다.

import kfp.dsl as dsl

@dsl.pipeline(
name="Workflow 1",
description="demonstrate TFT-based feature processing, "
)
def workflow1(
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval',value='gs://aju-dev-demos-codelabs/KF/taxidata/eval/data.csv'),  
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'), 
...

)

2. 워크플로의 구성요소 단계와 해당 종속성을 정의

다음으로, 워크플로의 구성요소 단계와 해당 종속성을 정의합니다.

  1. 각 구성요소에 대해 docker 컨테이너 이미지와 컨테이너 끝 점에 전달할 인수를 지정합니다.
    1. 여기에서는 사용되지 않지만, 실행할 명령어를 지정해, 컨테이너 끝점을 재정의 하거나, 구성요소 실행중에 작성된 출력파일을 지정할 수 있습니다.
    2. 코드를 제공하고, 사용할 기본 이미지를 지정만하면, 파이프라인이 나머지를 사용할 수 있습니다.
  2. 이제 여기까지 오면, op.after() 구성을 이용해 단계간 종속성을 지정할 수 있습니다.
tfteval = dsl.ContainerOp(    
  name = 'tft-eval',      
  image = 'gcr.io/google-samples/ml-pipeline-dataflow-tftbq-taxi',      
  arguments = [ "--input_handle", input_handle_eval,           
                "--outfile_prefix", outfile_prefix_eval,         
               "--working_dir", '%s/%s/tft-eval' % (working_dir, '{{workflow.name}}'),         
               "--project", project,         
                "--mode", preprocess_mode,          
                "--setup_file", tft_setup_file,    
                 "--max-rows", 5000,         
                 "--ts1", ts1,      
                 "--ts2", ts2,       
                 "--stage", "eval",          
               "--preprocessing-module", preprocessing_module1
                ]     
 # file_outputs = {'transformed': '/output.txt'}      
)  
tfttrain = ...  
tfteval2 = ...  
tfttrain2 = ..

3. 훈련 단계의 정의

train = ...  train.after(tfteval)  
            train.after(tfttrain)  
train2 = ...  
      train2.after(tfteval2)  
      train2.after(tfttrain2

4. 파이프라인 UI를 사용하여 워크플로 모니터링

이렇게 완성된 파이프라인은, cli 로 제출해도되고, 파일을 업로드 해도되고, 주피터로 트리거해서 업로드 해도됩니다.

Kubeflow Pipelines UI(사용자 인터페이스)는 파이프라인 사양, 주어진 파이프라인을 기반으로 하는 실험 , 여러 실험 실행 모니터링 및 검사를 지원합니다 . 위와 같은 사양이 컴파일 되고 UI를 통해 업로드됩니다.

파이프라인 정의를 업로드한 후 사양에서 파생된 파이프라인 그래프를 볼 수 있습니다. (동적으로 생성된 단계가 있는 파이프라인의 경우 이 초기 그래프는 런타임에 수정됩니다.) 그런 다음 해당 파이프라인을 기반으로 실험을 시작하고 여러 실험 실행을 시작하거나 예약할 수 있습니다.

실험 실행이 진행 중이거나 완료된 후에 파이프라인 단계에 대해 동적으로 생성된 그래프, 구성 매개변수 및 로그를 검사할 수 있습니다.

후기

  • 사실 전 쿠버플로를 공부할때 가장 고됐던 문제가, 너무 고사양을 요구해서, 설치해서 돌려보는 데에 애를 먹던 시기에서, 이제 제가 파이프라인을 직접 돌리게 됐다는 것이 아직도 신기하기는 합니다,, 너무 많은 기능을 제공해서 다 써보진 못했지만, 이세상에 배울 것이 많다는 것은 확실한 것같네요 🙂

참고 문헌

  1. https://cloud.google.com/blog/products/ai-machine-learning/getting-started-kubeflow-pipelines
  2. https://www.kubeflow.org/docs/components/pipelines/introduction/
  3. https://www.kubeflow.org/docs/started/introduction/
  4. Kubeflow for Machine Learning: From Lab to Production ( https://www.amazon.com/Kubeflow-Machine-Learning-Lab-Production/dp/1492050121)
  5. https://www.kubeflow.org/docs/components/pipelines/introduction/
반응형
댓글
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/05   »
1 2 3 4
5 6 7 8 9 10 11
12 13 14 15 16 17 18
19 20 21 22 23 24 25
26 27 28 29 30 31
글 보관함