-
[Airflow 실습] 여러개의 Task 한번에 돌리기데이터 엔지니어링/AirFlow 2022. 6. 22. 15:48반응형
목표
- DAG 생성
- 의존성을 연결하여 Task를 동시에 호출 할 수 있는지 파악
- graph를 활용하여 동시에 작업 실행 되는지 확인
완성된 flow
완성된 소스
from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators.python import PythonOperator from pendulum.tz.timezone import Timezone default_args = { 'owner' : 'eddie', 'retries' : 1, 'retry_delay': timedelta(minutes=1) } with DAG( dag_id='ex_multi_tasks', description='다중 작업 순차 처리 예제입니다(병렬처리 x)', default_args=default_args, start_date=datetime(2022,6,21, tzinfo=Timezone("Asia/Seoul")), schedule_interval="@daily", tags=['test', 'multi tasks'] ) as dag: # 잠깐 멈추는 함수 만들기 def dump() -> None: sleep(3) # 시작 함수 start = PythonOperator(task_id='start', python_callable=dump) # 여러 태스크 동시 실행 할 수 있게 생성 task_1 = PythonOperator(task_id='task_1', python_callable=dump) task_2 = PythonOperator(task_id='task_2', python_callable=dump) task_3 = PythonOperator(task_id='task_3', python_callable=dump) task_4 = PythonOperator(task_id='task_4', python_callable=dump) task_5 = PythonOperator(task_id='task_5', python_callable=dump) # 의존성 부여 start >> task_1 >> task_2 >> task_5 start >> task_3 >> task_4 >> task_5
UI에서 확인 하기
위와 같이 작성하면 30초 후에 ex_multi_tasks가 올라 온것을 볼 수 있다.
의존성을 봤을 때 start가 되는 것이 두번 이루어지기 때문에 두갈래로 나눠져 있는 것을 볼 수 있다.
실행을 시켜 보면 start가 먼저 successrk 되는 것을 볼 수 있고 그다음 task1,3 그다음 task 2, 4 그다음 task5가 실행되는 것을 볼 수 있다.
소스는 github에서 자세히 보실 수 있습니다.
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기 (0) 2022.06.23 [Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기) (0) 2022.06.22 [Airflow 실습]가장 기본 DAG 만들어 실행해보기 (0) 2022.06.22 [Airflow 기본 내용] DAG 작성 심화 (0) 2022.06.22 [Airflow 기본 내용] 기본적인 DAG 작성법 (0) 2022.06.22