ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow 실습] SubDAG 작성하여 DAG관리하기
    데이터 엔지니어링/AirFlow 2022. 6. 23. 15:56
    반응형

    목표

    • SubDAG 개념 알아보기
    • SubDAG 작성법 알아보기
    • SubDAG 확인 하는법 알아보기

    SubDAG 개념 알아보기

    DAG안에 또다른 DAG를 넣을 수 있다. 그게 바로 SubDAG이다. DAG를 만들어서 관리를 하다보면 DAG사이즈가 너무 커져서 감당이 안되거나, 너무 가독성이 없고 관리도 하기 힘들 때가 있는데 그럴때 subDAG를 사용해보는걸 고려해봐도 좋다.

    SubDAG 를 사용하는 이유

    • 모듈처럼 관리가 용이해짐
    • 다른 곳에서도 불러와 사용할 수 있기 때문에 재사용성이 좋음
    • ui에서 하나의 task처럼 보이기 때문에 ui적으로 보기 좋음

    SubDAG 작성법 알아보기

    SubDAG는 작성을 먼저 메인 DAG에 SubDagOperator를 이용하여 DAG 작성

    from airflow.operators.subdag import SubDagOperator
    
    parent_subDAG_sample2 = SubDagOperator(
      task_id='parent_subDAG_sample2',
      subdag=cs2.child_subdag2(parent_dag_name=dag_id, child_dag_name="parent_subDAG_sample2")
    )
    

    subdags폴더를 만들어서 그안에 subDAG 만들기 subDAG는 꼭 함수로 작성하여야 하며 return값을 dag로 보내는 구문이 있어야 한다. dag_id는 꼭 parent_dag의 id값을 받아와서 ‘.’구분자 앞에 두고 구분자 ‘.’ 는 꼭 붙여주고 나서 child dag 이름을 넣어주면 된다. 나머지는 dag와 작성법이 똑같다.

    # subdags/child_subdag1
    kst=Timezone('Asia/Seoul')
    
    def dump() -> None:
        sleep(3)
    
    default_args={
        'owner': 'eddie',
        'retries':1,
        'retry_delay':timedelta(minutes=1),
    }
    
    def child_subdag1(parent_dag_name, child_dag_name) -> DAG:
        with DAG(
            dag_id=f"{parent_dag_name}.{child_dag_name}",
            default_args=default_args,
            schedule_interval="@daily",
            start_date=datetime(2022,6,22, tzinfo=kst),
            tags=['test', 'subdag', 'sample1'],
        ) as dag:
            
            child_subdag1_start = PythonOperator(
                task_id='child_subdag1_start',
                python_callable=dump
            )
    
            child__subdag1_task = PythonOperator(
                task_id='child__subdag1_task',
                python_callable=dump
            )
    
            child__subdag1_end = DummyOperator(
                task_id='child__subdag1_end'
            )
    
            child_subdag1_start >> child__subdag1_task >> child__subdag1_end
    
            return dag
    

    subDAG 확인 하는법 알아보기

    subdag를 확인하기 위해서는 maindag를 꼭 동작을 해야한다. main dag graph에서 클릭을 하게 되면 팝업창이 뜨게 되는데 거기에서 맨 상단에 Zoom into Sub DAG를 클릭하면 Dag의 내용을 자세히 볼 수 있다.

     

     

    완성된 소스

    MainDAG

    from time import sleep
    from pendulum.tz.timezone import Timezone
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.subdag import SubDagOperator
    import subDags.child_subdag1 as cs1
    import subDags.child_subdag2 as cs2
    
    kst=Timezone('Asia/Seoul')
    
    default_args={
        'owner': 'eddie',
        'retries':1,
        'retry_delay':timedelta(minutes=1),
    }
    
    def dump() -> None:
        sleep(3)
    
    dag_id = 'ex_subDAG'
    
    with DAG(
        dag_id=dag_id,
        default_args=default_args,
        start_date=datetime(2022,6,22, tzinfo=kst),
        schedule_interval="@daily",
        tags=['test', 'subDAG']
    ) as dag:
        
        parent_task_start = PythonOperator(
            task_id='parent_task_start',
            python_callable=dump,
        )
    
        parent_subDAG_sample1 = SubDagOperator(
            task_id='parent_subDAG_sample1',
            subdag=cs1.child_subdag1(parent_dag_name=dag_id, child_dag_name="parent_subDAG_sample1")
        )
    
        parent_subDAG_sample2 = SubDagOperator(
            task_id='parent_subDAG_sample2',
            subdag=cs2.child_subdag2(parent_dag_name=dag_id, child_dag_name="parent_subDAG_sample2")
        )
    
        parent_task_end= PythonOperator(
            task_id='parent_task_end',
            python_callable=dump,
        )
    
        parent_task_start >> [parent_subDAG_sample1, parent_subDAG_sample2] >> parent_task_end
    

    SubDAG1

    from time import sleep
    from datetime import datetime, timedelta
    from pendulum.tz.timezone import Timezone
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.dummy import DummyOperator
    
    kst=Timezone('Asia/Seoul')
    
    def dump() -> None:
        sleep(3)
    
    default_args={
        'owner': 'eddie',
        'retries':1,
        'retry_delay':timedelta(minutes=1),
    }
    
    def child_subdag1(parent_dag_name, child_dag_name) -> DAG:
        with DAG(
            dag_id=f"{parent_dag_name}.{child_dag_name}",
            default_args=default_args,
            schedule_interval="@daily",
            start_date=datetime(2022,6,22, tzinfo=kst),
            tags=['test', 'subdag', 'sample1'],
        ) as dag:
            
            child_subdag1_start = PythonOperator(
                task_id='child_subdag1_start',
                python_callable=dump
            )
    
            child__subdag1_task = PythonOperator(
                task_id='child__subdag1_task',
                python_callable=dump
            )
    
            child__subdag1_end = DummyOperator(
                task_id='child__subdag1_end'
            )
    
            child_subdag1_start >> child__subdag1_task >> child__subdag1_end
    
            return dag
    

    SubDAG2

    from time import sleep
    from datetime import datetime, timedelta
    from pendulum.tz.timezone import Timezone
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.operators.dummy import DummyOperator
    
    kst=Timezone('Asia/Seoul')
    
    def dump() -> None:
        sleep(3)
    
    default_args={
        'owner': 'eddie',
        'retries':1,
        'retry_delay':timedelta(minutes=1),
    }
    
    def child_subdag2(parent_dag_name, child_dag_name) -> DAG:
        with DAG(
            dag_id=f"{parent_dag_name}.{child_dag_name}",
            default_args=default_args,
            schedule_interval="@daily",
            start_date=datetime(2022,6,22, tzinfo=kst),
            tags=['test', 'subdag', 'sample2'],
        ) as dag:
            
            child_subdag2_start = PythonOperator(
                task_id='child_subdag2_start',
                python_callable=dump
            )
    
            child_subdag2_task_1 = PythonOperator(
                task_id='child_subdag2_task_1',
                python_callable=dump
            )
    
            child_subdag2_task_2 = PythonOperator(
                task_id='child_subdag2_task_2',
                python_callable=dump
            )
    
            child_subdag2_task_3 = PythonOperator(
                task_id='child_subdag2_task_3',
                python_callable=dump
            )
    
            child_subdag2_end = DummyOperator(
                task_id='child_subdag2_end'
            )
    
            child_subdag2_start >> [child_subdag2_task_1, child_subdag2_task_2, child_subdag2_task_3] >> child_subdag2_end
    
            return dag
    

     

     

    airflow 모든 소스는 github에서 확인 가능

     

    GitHub - hyunseokjoo/prac_airflow

    Contribute to hyunseokjoo/prac_airflow development by creating an account on GitHub.

    github.com

     

    반응형

    댓글

Designed by Tistory.