-
[Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기데이터 엔지니어링/AirFlow 2022. 6. 23. 13:15반응형
목표
- DAGRun 보는 법 알아보기
- Depends_on_past, wait_for_downstream의 의미 및 사용법 알아보기
- 두개의 속성 차이점 알아보기
- 실행해보면서 익혀보기
DagRun 보는 법 알아보기
DAGRun이란 DAG들의 실행 내역이라고 보면 된다. 아래 그림을 보면 현광색은 실행 중인 상태의 DAG를 보여주며, 초록색은 성공 분홍색은 skip등을 DAG한 실당 한줄씩 보여준다. web ui에서 막대 그래프로 확인 가능하다 막대 그래프 형식은 airflow==2.3.2(2022.06 최신 버전)에서는 바뀌어 있었다.
Depends_on_past, wait_for_downstream의 의미 및 사용법 알아보기
DAGRun중에서 이전작업에 대해 의존성을 부여하여야 하는 경우가 있다. 예를 들어 ETL작업을 할 때 그전 기록된 내용을 가지고 계산을 하여 오늘 내용을 기록한다고 해보자. 그럼 어제 내용이 꼭 있어야 오늘 내용을 기록 할 수 있는 로직이기 때문에 어제 기록 없이 오늘 기록을 하면 안된다. 그러면 scheduler가 그냥 매일 돌리는 조건만 가지고 계속 하게 되면 어떻게 될까?? 그럼 개발자가 모르는 상태로 scheduler가 계속 돌아가게 될 것이니 나중에 알고 다시 로직을 일괄로 돌리는 경우가 발생한다. 이런 경우를 막기 위해서 airflow에서는 이전 작업 의존성인 Depends_on_past, wait_for_downstream을 제공하고 있다.
Depends_on_past, wait_for_downstream 의미
- Depends_on_past : 이전 날짜의 task instance 들 중 하나라도 fail일 때는 다음 DAGrun에서 해당 task가 완료 될때 까지 기다린다
- Wait_for_downstream : 이전 날짜의 task instance 들 중 fail인 것까지 작동하고 나머지 DAGRun의 task들은 실행하지 않고 no status로 대기한다.
예를 들어 아래와 같은 소스가 있다고 해보고
from time import sleep from datetime import datetime, timedelta from pendulum.tz.timezone import Timezone from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago kst=Timezone('Asia/Seoul') default_args={ 'owner' : 'eddie', 'retries' : 1, 'retry_delay' : timedelta(minutes=1), } with DAG( dag_id="ex_dependencies_past", description='직전 dag, task를 바라보는 의존성(작업실행 할지 여부) 결정하기', default_args=default_args, start_date=days_ago(5), schedule_interval='@daily', tags=['test', 'depends_on_past', 'wait_for_downstream'], catchup=False, ) as dag: t1 = DummyOperator( task_id='t1' ) t2 = DummyOperator( task_id='t2', ) t3 = DummyOperator( task_id='t3' ) t4 = DummyOperator( task_id='t4' ) t1 >> t2 >> t3 >> t4
이것을 기본적으로 실행해보면 fail이 있어도 실행이 다 되는 것을 볼 수 있다.
Depends_on_past 적용하기
이 소스에 Depends_on_past를 적용하려면, Depends_on_past을 살펴보면 아래 코드에 표시해둔 곳에 depends_on_past속성을 True로 변경해주면 된다. 작성하지 않으면 default값은 False이다.
default_args={ 'owner' : 'eddie', 'retries' : 1, 'retry_delay' : timedelta(minutes=1), 'depends_on_past':False, <---- 이 부분 'wait_for_downstream':True, }
이것을 임의로 하나의 fail상태를 기입하고 실행해보면서 DAGRun 내역을 봐보면 이전 성공한 task까지는 성공한 것을 볼수 있고 나머지는 no status로 처리되어있다.
이 소스에 Depends_on_past를 적용하려면, Depends_on_past을 살펴보면 아래 코드에 표시해둔 곳에 depends_on_past속성을 True로 변경해주면 된다. 작성하지 않으면 default값은 False이다.
default_args={ 'owner' : 'eddie', 'retries' : 1, 'retry_delay' : timedelta(minutes=1), 'depends_on_past':True, <---- 이 부분 'wait_for_downstream':False, }
이것을 임의로 하나의 fail상태를 기입하고 실행해보면서 DAGRun 내역을 봐보면 이전 성공한 task까지는 성공한 것을 볼수 있고 나머지는 no status로 처리되어있다.
wait_for_downstream 적용하기
wait_for_downstream을 적용할려면 속성에 True값을 넣어주면 된다.
default_args={ 'owner' : 'eddie', 'retries' : 1, 'retry_delay' : timedelta(minutes=1), 'depends_on_past': False, <---- 이 부분 'wait_for_downstream': True, }
그러고 다시 한번 돌려 주면 아래와 같이 해당 DAGRun의 task가 모두 실행 되지 않는 모습을 볼 수 있다.
나중에 이전 DAGRun의 의존해서 실행 값을 얻어야 할때 유용하게 쓰일것같다.
소스는 아래 github에 모두 보실수 있습니다.
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow 실습] Task group으로 ui 관리하기 (0) 2022.06.23 [Airflow 실습] SubDAG 작성하여 DAG관리하기 (0) 2022.06.23 [Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기) (0) 2022.06.22 [Airflow 실습] 여러개의 Task 한번에 돌리기 (0) 2022.06.22 [Airflow 실습]가장 기본 DAG 만들어 실행해보기 (0) 2022.06.22