-
[Airflow 실습] SubDAG 작성하여 DAG관리하기데이터 엔지니어링/AirFlow 2022. 6. 23. 15:56반응형
목표
- SubDAG 개념 알아보기
- SubDAG 작성법 알아보기
- SubDAG 확인 하는법 알아보기
SubDAG 개념 알아보기
DAG안에 또다른 DAG를 넣을 수 있다. 그게 바로 SubDAG이다. DAG를 만들어서 관리를 하다보면 DAG사이즈가 너무 커져서 감당이 안되거나, 너무 가독성이 없고 관리도 하기 힘들 때가 있는데 그럴때 subDAG를 사용해보는걸 고려해봐도 좋다.
SubDAG 를 사용하는 이유
- 모듈처럼 관리가 용이해짐
- 다른 곳에서도 불러와 사용할 수 있기 때문에 재사용성이 좋음
- ui에서 하나의 task처럼 보이기 때문에 ui적으로 보기 좋음
SubDAG 작성법 알아보기
SubDAG는 작성을 먼저 메인 DAG에 SubDagOperator를 이용하여 DAG 작성
from airflow.operators.subdag import SubDagOperator parent_subDAG_sample2 = SubDagOperator( task_id='parent_subDAG_sample2', subdag=cs2.child_subdag2(parent_dag_name=dag_id, child_dag_name="parent_subDAG_sample2") )
subdags폴더를 만들어서 그안에 subDAG 만들기 subDAG는 꼭 함수로 작성하여야 하며 return값을 dag로 보내는 구문이 있어야 한다. dag_id는 꼭 parent_dag의 id값을 받아와서 ‘.’구분자 앞에 두고 구분자 ‘.’ 는 꼭 붙여주고 나서 child dag 이름을 넣어주면 된다. 나머지는 dag와 작성법이 똑같다.
# subdags/child_subdag1 kst=Timezone('Asia/Seoul') def dump() -> None: sleep(3) default_args={ 'owner': 'eddie', 'retries':1, 'retry_delay':timedelta(minutes=1), } def child_subdag1(parent_dag_name, child_dag_name) -> DAG: with DAG( dag_id=f"{parent_dag_name}.{child_dag_name}", default_args=default_args, schedule_interval="@daily", start_date=datetime(2022,6,22, tzinfo=kst), tags=['test', 'subdag', 'sample1'], ) as dag: child_subdag1_start = PythonOperator( task_id='child_subdag1_start', python_callable=dump ) child__subdag1_task = PythonOperator( task_id='child__subdag1_task', python_callable=dump ) child__subdag1_end = DummyOperator( task_id='child__subdag1_end' ) child_subdag1_start >> child__subdag1_task >> child__subdag1_end return dag
subDAG 확인 하는법 알아보기
subdag를 확인하기 위해서는 maindag를 꼭 동작을 해야한다. main dag graph에서 클릭을 하게 되면 팝업창이 뜨게 되는데 거기에서 맨 상단에 Zoom into Sub DAG를 클릭하면 Dag의 내용을 자세히 볼 수 있다.
완성된 소스
MainDAG
from time import sleep from pendulum.tz.timezone import Timezone from datetime import datetime, timedelta from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.subdag import SubDagOperator import subDags.child_subdag1 as cs1 import subDags.child_subdag2 as cs2 kst=Timezone('Asia/Seoul') default_args={ 'owner': 'eddie', 'retries':1, 'retry_delay':timedelta(minutes=1), } def dump() -> None: sleep(3) dag_id = 'ex_subDAG' with DAG( dag_id=dag_id, default_args=default_args, start_date=datetime(2022,6,22, tzinfo=kst), schedule_interval="@daily", tags=['test', 'subDAG'] ) as dag: parent_task_start = PythonOperator( task_id='parent_task_start', python_callable=dump, ) parent_subDAG_sample1 = SubDagOperator( task_id='parent_subDAG_sample1', subdag=cs1.child_subdag1(parent_dag_name=dag_id, child_dag_name="parent_subDAG_sample1") ) parent_subDAG_sample2 = SubDagOperator( task_id='parent_subDAG_sample2', subdag=cs2.child_subdag2(parent_dag_name=dag_id, child_dag_name="parent_subDAG_sample2") ) parent_task_end= PythonOperator( task_id='parent_task_end', python_callable=dump, ) parent_task_start >> [parent_subDAG_sample1, parent_subDAG_sample2] >> parent_task_end
SubDAG1
from time import sleep from datetime import datetime, timedelta from pendulum.tz.timezone import Timezone from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator kst=Timezone('Asia/Seoul') def dump() -> None: sleep(3) default_args={ 'owner': 'eddie', 'retries':1, 'retry_delay':timedelta(minutes=1), } def child_subdag1(parent_dag_name, child_dag_name) -> DAG: with DAG( dag_id=f"{parent_dag_name}.{child_dag_name}", default_args=default_args, schedule_interval="@daily", start_date=datetime(2022,6,22, tzinfo=kst), tags=['test', 'subdag', 'sample1'], ) as dag: child_subdag1_start = PythonOperator( task_id='child_subdag1_start', python_callable=dump ) child__subdag1_task = PythonOperator( task_id='child__subdag1_task', python_callable=dump ) child__subdag1_end = DummyOperator( task_id='child__subdag1_end' ) child_subdag1_start >> child__subdag1_task >> child__subdag1_end return dag
SubDAG2
from time import sleep from datetime import datetime, timedelta from pendulum.tz.timezone import Timezone from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator kst=Timezone('Asia/Seoul') def dump() -> None: sleep(3) default_args={ 'owner': 'eddie', 'retries':1, 'retry_delay':timedelta(minutes=1), } def child_subdag2(parent_dag_name, child_dag_name) -> DAG: with DAG( dag_id=f"{parent_dag_name}.{child_dag_name}", default_args=default_args, schedule_interval="@daily", start_date=datetime(2022,6,22, tzinfo=kst), tags=['test', 'subdag', 'sample2'], ) as dag: child_subdag2_start = PythonOperator( task_id='child_subdag2_start', python_callable=dump ) child_subdag2_task_1 = PythonOperator( task_id='child_subdag2_task_1', python_callable=dump ) child_subdag2_task_2 = PythonOperator( task_id='child_subdag2_task_2', python_callable=dump ) child_subdag2_task_3 = PythonOperator( task_id='child_subdag2_task_3', python_callable=dump ) child_subdag2_end = DummyOperator( task_id='child_subdag2_end' ) child_subdag2_start >> [child_subdag2_task_1, child_subdag2_task_2, child_subdag2_task_3] >> child_subdag2_end return dag
airflow 모든 소스는 github에서 확인 가능
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow 실습] Trigger를 이용하여 다른 DAG를 실행하기 (0) 2022.06.24 [Airflow 실습] Task group으로 ui 관리하기 (0) 2022.06.23 [Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기 (0) 2022.06.23 [Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기) (0) 2022.06.22 [Airflow 실습] 여러개의 Task 한번에 돌리기 (0) 2022.06.22