-
[Airflow 실습] Trigger를 이용하여 다른 DAG를 실행하기데이터 엔지니어링/AirFlow 2022. 6. 24. 11:29반응형
목표
- DAG 작업 진행 시 Trigger를 사용하여 다른 DAG를 호출하는 방법 알아보기
- Trigger 연결 되어있는지 확인 하는 방법 알아보기
- 실행법 알아보기
완성 사진
Trigger를 사용하려면 TriggerDagRunOperator를 사용하여야 한다. TriggerDagRunOperator를 사용하면 DAG Graph에 아래와 같이 왼쪽 상단에 TriggerDagRunOperator를 사용했다고 나오고 하나의 task처럼 표시 된다.
완성 코드
먼저 trigger를 사용하려면 여분의 DAG를 하나 더 만들어야 합니다. DAG는 아래와 같이 여분의 DAG로 만들었습니다.
from time import sleep from datetime import datetime, timedelta from airflow import DAG from pendulum.tz.timezone import Timezone from airflow.operators.python import PythonOperator from airflow.operators.dummy import DummyOperator kst = Timezone('Asia/Seoul') default_args={ 'owner' : 'eddie', 'retries' : 1, 'retry_delay' : timedelta(minutes=1) } def dump() -> None: sleep(3) with DAG( dag_id='standby_trigger', description='trigger 기다리는 DAG입니다.', default_args=default_args, start_date=datetime(2022,6,22, tzinfo=kst), schedule_interval=None, # trigger DAG는 보통 None으로 처리 합니다. tags=['test', 'triggered_by_trigger_DAG_sample'] ) as dag: trigger_start = PythonOperator( task_id='trigger_start', python_callable=dump ) trigger_task_1 = PythonOperator( task_id='trigger_task_1', python_callable=dump ) trigger_end = PythonOperator( task_id='trigger_end', python_callable=dump, ) trigger_start >> trigger_task_1 >> trigger_end
실제로 사용할 코드는 아래와 같습니다.
""" TriggerDagRunOperator 공식 문서 <https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/trigger_dagrun/index.html#airflow.operators.trigger_dagrun.TriggerDagRunOperator> """ from time import sleep from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.trigger_dagrun import TriggerDagRunOperator from datetime import datetime, timedelta from pendulum.tz.timezone import Timezone kst=Timezone('Asia/Seoul') with DAG( dag_id='ex_trigger_sample', description='trigger를 실행하게 해주는 DAG입니다.', schedule_interval='@once', start_date=datetime(2022,6,22, tzinfo=kst), tags=['test', 'call_trigger_sample'] ) as dag: def dump() -> None: sleep(3) start = PythonOperator( task_id='start', python_callable=dump ) call_trigger = TriggerDagRunOperator( task_id='call_trigger', trigger_dag_id='standby_trigger', trigger_run_id=None, execution_date=None, reset_dag_run=False, wait_for_completion=False, poke_interval=60, allowed_states=["success"], failed_states=None, ) # trigger rule을 사용하지 않으면 바로 end task가 실행된다 # 바로 실행 되지 않고 trigger를 기다리고 싶다면 trigger rule을 사용 # email보내는 것과 같이 그냥 보내고 다른 task를 진행하고 싶다면 trigger rule을 사용하지 않고 진행 end = PythonOperator( task_id='end', python_callable=dump ) start >> call_trigger >> end
DAG 작업 진행 시 Trigger를 사용하여 다른 DAG를 호출하는 방법 알아보기
DAG Trigger를 사용하려면 아래와 같은 코드를 소스안에 넣어야 한다. 속성은 공식 문서를 보고 넣으면 된다.
from airflow.operators.trigger_dagrun import TriggerDagRunOperator call_trigger = TriggerDagRunOperator( task_id='call_trigger', trigger_dag_id='standby_trigger', # 호출할 DAG_ID 입력 trigger_run_id=None, # 실행중인 run_id를 입력하면 해당 run_id를 실행한다. None이나 default값은 자동 할당이다. execution_date=None, # 실행 날짜를 지정한다. reset_dag_run=False, # 해당 dag에 dag_run이 있을 경우 초기화 하고 다시 시작함 wait_for_completion=False, # DAG_run이 완료 될때 까지 기다린다. poke_interval=60, # wait_for_completion을 true로 했다면, Trigger가 작동했는지 확인하는 간격을 지정 할 수 있다. 값을 정해주지 않는다면 기본값은 60이다. allowed_states=["success"], # Trigger를 실행할 state를 작성한다.list로 작성해야하며 default는 success이다. failed_states=None, # Trigger를 실행하지 않을 상태를 기입한다. list로 작성한다. )
Trigger 연결 되어있는지 확인 하는 방법 알아보기
main DAG와 Trigger DAG를 작성하여 web ui에 뛰워진걸 확인 했다면, 연결되어있는지 확인 해야한다. 확인 하는 방법은 web ui에서 확인이 가능하다.
Brouser → DAG Dependencies 에서 확인 가능하다.
아래를 보면 DAG간 Dependencies들이 화면에 보일 것이다 왼쪽 상단에 dag trigger sensor들이 있어 DAG간 의존성을 다 확인 할 수 있다. DAGs에서는 DAG간 task dependencies를 확인 가능하기 때문에 DAG간 의존성은 여기서 확인한다. 사이즈가 더 커지면 유용할 것 같다.
실행법 알아보기
trigger 구동을 확인 하려면, trigger 할 DAG를 먼저 실행 시켜주어야 한다. standby_trigger라는 DAG를 만들었으니 먼저 구동 시켜 주고 main이 되는 DAG를 실행 시켜 보면
아래와 같이 DAG_run이 동작하고 있는 것을 볼 수 있다.
참고 문헌
airflow관련 모든 소스는 github에서 확인 가능
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow]Pool 을 이용하여 메모리 사용량 제한하기 (0) 2022.06.26 [Airflow]Xcom을 이용한 task 간 데이터 주고 받기 (0) 2022.06.26 [Airflow 실습] Task group으로 ui 관리하기 (0) 2022.06.23 [Airflow 실습] SubDAG 작성하여 DAG관리하기 (0) 2022.06.23 [Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기 (0) 2022.06.23