-
[Airflow 실습] Task group으로 ui 관리하기데이터 엔지니어링/AirFlow 2022. 6. 23. 20:54반응형
목표
- group 목적 알아보기
- task들을 group으로 묶는법에 대해 알아보기
- ui에서 group으로 표시 되어있는 부분 확인하기
group의 목적 알아보기
group을 사용하는 이유는 task의 실행 계획 과는 상관없이 ui에 대한 내용만 정리 하기 위해 사용합니다. 모듈로 빼서 group으로 관리하면 좀 관리나 재사용성이 좋아질 것 같긴합니다.
완성 사진
맨처음 group_1 으로 구성하였으며 그 안에 또 group을 넣어서 전체적으로 group이 두개 group안에 group으로 구성 하였습니다.
task들을 group으로 묶는법에 대해 알아보기
먼저 utils task_group이 있으니 확인 하시고 import하면 되겠습니다. Task를 구성하는 것과 같이Task_id가 web ui에서 보이는 명칭이며 as뒤에 쓰는 명칭이 python script에서 사용하는 명칭입니다. 그리고 as 뒤에 있는 group 명칭을 의존성 연결할 때 사용하면 끝납니다. 이렇게 하면 group 설정하는 법은 끝나니다.
# utills 안에 TaskGroup from airflow.utils.task_group import TaskGroup with DAG (...) as dag: start = ... # group_id가 ui 상에서 보이는 명칭, as 뒤에 명칭이 python script에서 사용하는 명칭 with TaskGroup(group_id='inner_group_1') as innerGroup: task_5 = PythonOperator(task_id='tast_5', python_callable=dump) task_6 = PythonOperator(task_id='tast_6', python_callable=dump) task_7 = PythonOperator(task_id='tast_7', python_callable=dump) task_5 >> task_7 task_5 >> task_6 end = ... # as 뒤에 있는 명칭으로 의존성 연결 start >> innerGroup >> end
완성 소스
from time import sleep from datetime import datetime, timedelta from pendulum.tz.timezone import Timezone from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.task_group import TaskGroup kst = Timezone('Asia/Seoul') default_args={ 'owner' : 'eddie', 'retries' : 1, 'retry_delay' :timedelta(minutes=1) } def dump() -> None: sleep(3) with DAG( dag_id='ex_group', description='group 샘플입니다.', default_args=default_args, start_date=datetime(2022,6,22, tzinfo=kst), schedule_interval='@once', tags=['test', 'group_sample'], ) as dag: start = PythonOperator( task_id='start', python_callable=dump ) with TaskGroup(group_id='group_1') as sampleGroup: task_1 = PythonOperator(task_id='tast_1', python_callable=dump) task_2 = PythonOperator(task_id='tast_2', python_callable=dump) task_3 = PythonOperator(task_id='tast_3', python_callable=dump) task_4 = PythonOperator(task_id='tast_4', python_callable=dump) with TaskGroup(group_id='inner_group_1') as innerGroup: task_5 = PythonOperator(task_id='tast_5', python_callable=dump) task_6 = PythonOperator(task_id='tast_6', python_callable=dump) task_7 = PythonOperator(task_id='tast_7', python_callable=dump) task_5 >> task_7 task_5 >> task_6 task_1 >> innerGroup >> [task_2, task_3] >> task_4 end = PythonOperator( task_id='end', python_callable=dump ) start >> sampleGroup >> end
ui에서 group으로 표시 되어있는 부분 확인하기
ui에서 내용을 확인 할려면 group_1 을 클릭만 하면 펼쳐지면서 화면에 보이게 됩니다.
airflow 관련 모든 소스는 github에 있습니다.
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow]Xcom을 이용한 task 간 데이터 주고 받기 (0) 2022.06.26 [Airflow 실습] Trigger를 이용하여 다른 DAG를 실행하기 (0) 2022.06.24 [Airflow 실습] SubDAG 작성하여 DAG관리하기 (0) 2022.06.23 [Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기 (0) 2022.06.23 [Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기) (0) 2022.06.22