티스토리 뷰

반응형

참고 문헌

pipeline

A pipeline is a description of a machine learning (ML) workflow, including all of the components in the workflow and how the components relate to each other in the form of a graph. The pipeline configuration includes the definition of the inputs (parameters) required to run the pipeline and the inputs and outputs of each component.

  • 파이프라인입니다. argoworkflow 에서는 workflow 개념이 유사합니다.
  • 파이프라인은 ML workflow 의 description 이자, 해당 컴포넌트들을 그래프 형태로 컴포넌트간의 의존성을 "그래프"형태로 구성할 수 있습니다.
  • 파이프라인 구성에는 파이프라인을 실행하는 데 필요한 입력(매개 변수)의 정의와 각 구성 요소의 입력 및 출력이 포함됩니다.
  • 가장 핵심 개념이라고 볼 수 있는데, 해당 부분을 동작시키기 위해서 "run"을 생성할 수 있고, "experiment"와 연결지을 수 있습니다. 또한, 해당 파이프라인에 "동작 주기+동작" 등의 크론 개념을 넣으면 , " recurring_run" 이 된다고 보시면 됩니다.

experiment

conceptual overview of experiments in kubeflow pipeline

  • 쿠베플로우 파이프라인의 파라미터 변경을 위해 실험하는 구성요소입니다.
  • 위에 pipeline 에서 파이프라인 구성의 경우, pipeline 을 run 시킬 때, 각각의 컴포넌트들을 위한 input parameter이 정의돼있습니다. "라고 말씀드렸었는데요, 해당 input parameter 에 대한 값들을 변경해 실험해보는 구조라고 보시면 될 것같습니다.
  • 필수 구성요소는 name, namespace 정도 이고, 여러 파이프라인 x experiment 의 m대n 구조가 가능합니다.

pipeline conf

  • Pipeline Config 입니다. pipeline level settings 들이 들어있다고 보시면 되는데요, 주된 내용들은 "run", 혹은 "recurring_run"을 위해, 즉, 쿠버네티스 위에서 파이프라인이 동작하기 위한 구성요소들이 들어갑니다.
  • # https://kubeflow-pipelines.readthedocs.io/en/stable/source/kfp.dsl.html#kfp.dsl.PipelineConf  
    class PipelineConf():
        """PipelineConf contains pipeline level settings."""
        def __init__(self):
              self.image_pull_secrets = [] 
              self.timeout = 0
              self.ttl_seconds_after_finished = -1
              self._pod_disruption_budget_min_available = None
              self.op_transformers = []
              self.default_pod_node_selector = {}
              self.image_pull_policy = None
              self.parallelism = None
              self._data_passing_method = None
              self.dns_config = None

쿠베플로우 파이프라인의 장점은 이렇게 python SDK 돼있다 보니, 해당 구성요소들을 파이썬 코드를 통해 확인할 수 있는 부분이 좋은 것같아요. pythonSDK 문서에서 PipelineConf 클래스 내용을 일부 발쵀했습니다.

구성요소

  1. image_pull_secret
    1. 해당 파이프라인에서 사용하는 도커 이미지에 대한 image pull secet 을 정의합니다.
  2. timeout :
    1. pipeline 레벨에서의 timeout 을 지정합니다. 단위는 seconds 입니다.
  3. ttl_seconds_after_finished
    1. 파이프라인이 완료된 후 ttl(timetolive)을 구성합니다. 단위는 seconds 입니다.
  4. _pod_disruption_budget_min_available
    1. 파이프라인 Pod 허용되는 동시 중단 횟수를 보유하고 있습니다.
    2. 주로 pod distribution 레벨은 노드업그레이드로 인한, 노드 에서 모든 pod 이사할때, 한번에 죽으면 서비스 장애상황으로 이어질 수 있기때문에, 천천히 일정 레벨로 pod 를 죽일 수 있게끔 제한하는 용도로 이용된는데 같은 상황인 것 같습니다.
    3. min_available(Union[int, str]): "pod 삭제" 후에도 "selector"에 의해 선택된 최소 "사용 가능한" Pod가 계속 사용 가능한 경우, 즉 제거된 포드가 없는 경우에도 제거가 허용됩니다. 예를 들어 "100%"를 지정하면 모든 자발적 "pod 삭제"를 방지할 수 있습니다. "minAvailable"은 절대 숫자 또는 백분율일 수 있습니다.
  5. op_transformers
    1. 파이프라인의 모든 ops에 적용할 op_transformers를 구성합니다. ops는 ResourceOp, VolumeOp 또는 ContainerOp일 수 있습니다.
  6. default_pod_node_selector
    1. pipeline을위한 nodeSelector를 지정합니다.
  7. image_pull_policy
    1. default image pull policy 를 지정합니다.
  8. parallelism
    1. 워크플로우에서 동시에 실행할 수 있는 최대 총 병렬 Pod 수를 구성하는데 이용됩니다.
  9. _data_passing_method
    1. 중간 데이터 전달에 사용되는 메서드를 나타내는 object를 설정합니다.
  10. dns_config
    1. 각각의 Pod 에 대한 dnsConfig 입니다.
    2. https://github.com/kubernetes-client/python/blob/master/kubernetes/docs/V1PodDNSConfig.md

pipeline <> PipelineConf

  • PipelineConf 인자는 Pipeline 클래스에서 default로 conf 로 호출 가능합니다. 별도로 따로 PipelineConf 객체를 새로 생성할 필요없이, dsl.get_pipeline_conf() 를 이용해 conf 를 호출해주면 됩니다.
  • pipeline 클래스
    • class Pipeline():
          def __init__(self, name: str):
              """Create a new instance of Pipeline.
      
              Args:
                name: the name of the pipeline. Once deployed, the name will show up in
                  Pipeline System UI.
              """
              self.name = name
              self.ops = {}
              # Add the root group.
              self.groups = [_ops_group.OpsGroup('pipeline', name=name)]
              self.group_id = 0
              self.conf = PipelineConf()
              self._metadata = None
  • example
@kfp.dsl.pipeline(
    name='kubeflow local test',
)
def kubeflow_pipeline(
       ....
):
   ... 
    dsl.get_pipeline_conf().set_timeout(543210)
    dsl.get_pipeline_conf().set_image_pull_secrets([
        k8s.V1LocalObjectReference(name='secret1'),

    ])

run

= experiment+ name+ pipeline(+pipeline conf)

  • single execution of pipeline입니다.
  • 아까 1.1.1. 에서 구성했던 pipeline을 실제 run 하는 것이라 생각하면 됩니다. argo workflow 에서 workflow 가 실제 실행되는 것이라 생각하면 될 것같습니다.
  • 구성 요소 : experiment + 변경불가능한 로그로 구성됩니다.
  • 재현 가능하도록 독립적으로 설계되어있습니다.

recurring_run (=job)

scheduledworkflows 입니다.

= experiment+ name+ trigger(cronexpression)+ pipeline(+pipeline conf)

  • 구성 요소 :
    • copy of pipleline with all paramenter value specified and a run trigger(cron, periodic)
    • 모든 명시된 paramter value 가 있는 파이프라인 복제본과 (cron, 혹은 peridic) 트리거의 결합된 recurring run 이라 볼 수 있습니다.
  • 추가 설정으로 : concurrent run, number of run palleral 등이 가능합니다.

6. kubeflow pipeline 아키텍처

  • 위에서 배운 구성요소들을 이용해서, kubeflow pipeline 이 동작하는 전체 아키텍처입니다.
  • 처음에는 argo workflow 에도 cronworkflow 가 있기때문에, 왜 recurring_run 을 cron workflow 로 구현하지 않았을까 라는 궁금증이 있었는데, argo 는 task driven workflow 를 위한 엔진일뿐 kfp pipeline 컨셉과 별개로 생각해야한다네요, 보시면서 참고하시면 좋을 것같습니다.

https://www.kubeflow.org/docs/components/pipelines/v1/introduction/


반응형
댓글
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
«   2024/04   »
1 2 3 4 5 6
7 8 9 10 11 12 13
14 15 16 17 18 19 20
21 22 23 24 25 26 27
28 29 30
글 보관함