ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기
    데이터 엔지니어링/AirFlow 2022. 6. 23. 13:15
    반응형

    목표

    • DAGRun 보는 법 알아보기
    • Depends_on_past, wait_for_downstream의 의미 및 사용법 알아보기
    • 두개의 속성 차이점 알아보기
    • 실행해보면서 익혀보기

    DagRun 보는 법 알아보기

    DAGRun이란 DAG들의 실행 내역이라고 보면 된다. 아래 그림을 보면 현광색은 실행 중인 상태의 DAG를 보여주며, 초록색은 성공 분홍색은 skip등을 DAG한 실당 한줄씩 보여준다. web ui에서 막대 그래프로 확인 가능하다 막대 그래프 형식은 airflow==2.3.2(2022.06 최신 버전)에서는 바뀌어 있었다.

     

    Depends_on_past, wait_for_downstream의 의미 및 사용법 알아보기

    DAGRun중에서 이전작업에 대해 의존성을 부여하여야 하는 경우가 있다. 예를 들어 ETL작업을 할 때 그전 기록된 내용을 가지고 계산을 하여 오늘 내용을 기록한다고 해보자. 그럼 어제 내용이 꼭 있어야 오늘 내용을 기록 할 수 있는 로직이기 때문에 어제 기록 없이 오늘 기록을 하면 안된다. 그러면 scheduler가 그냥 매일 돌리는 조건만 가지고 계속 하게 되면 어떻게 될까?? 그럼 개발자가 모르는 상태로 scheduler가 계속 돌아가게 될 것이니 나중에 알고 다시 로직을 일괄로 돌리는 경우가 발생한다. 이런 경우를 막기 위해서 airflow에서는 이전 작업 의존성인 Depends_on_past, wait_for_downstream을 제공하고 있다.

    Depends_on_past, wait_for_downstream 의미

    • Depends_on_past : 이전 날짜의 task instance 들 중 하나라도 fail일 때는 다음 DAGrun에서 해당 task가 완료 될때 까지 기다린다
    • Wait_for_downstream : 이전 날짜의 task instance 들 중 fail인 것까지 작동하고 나머지 DAGRun의 task들은 실행하지 않고 no status로 대기한다.

    예를 들어 아래와 같은 소스가 있다고 해보고

    from time import sleep
    from datetime import datetime, timedelta
    from pendulum.tz.timezone import Timezone
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
    
    kst=Timezone('Asia/Seoul')
    
    default_args={
        'owner' : 'eddie',
        'retries' : 1,
        'retry_delay' : timedelta(minutes=1),
    }
    
    with DAG(
        dag_id="ex_dependencies_past",
        description='직전 dag, task를 바라보는 의존성(작업실행 할지 여부) 결정하기',
        default_args=default_args,
        start_date=days_ago(5),
        schedule_interval='@daily', 
        tags=['test', 'depends_on_past', 'wait_for_downstream'],
        catchup=False,
    ) as dag:
    
        t1 = DummyOperator(
            task_id='t1'
        )
        t2 = DummyOperator(
            task_id='t2',
        )
        t3 = DummyOperator(
            task_id='t3'
        )
        t4 = DummyOperator(
            task_id='t4'
        )
    
        t1 >> t2 >> t3 >> t4
    

    이것을 기본적으로 실행해보면 fail이 있어도 실행이 다 되는 것을 볼 수 있다.

    Depends_on_past 적용하기

    이 소스에 Depends_on_past를 적용하려면, Depends_on_past을 살펴보면 아래 코드에 표시해둔 곳에 depends_on_past속성을 True로 변경해주면 된다. 작성하지 않으면 default값은 False이다.

    default_args={
        'owner' : 'eddie',
        'retries' : 1,
        'retry_delay' : timedelta(minutes=1),
        'depends_on_past':False,  <---- 이 부분
        'wait_for_downstream':True,
    }

    이것을 임의로 하나의 fail상태를 기입하고 실행해보면서 DAGRun 내역을 봐보면 이전 성공한 task까지는 성공한 것을 볼수 있고 나머지는 no status로 처리되어있다.

    이 소스에 Depends_on_past를 적용하려면, Depends_on_past을 살펴보면 아래 코드에 표시해둔 곳에 depends_on_past속성을 True로 변경해주면 된다. 작성하지 않으면 default값은 False이다.

    default_args={
        'owner' : 'eddie',
        'retries' : 1,
        'retry_delay' : timedelta(minutes=1),
        'depends_on_past':True,  <---- 이 부분
        'wait_for_downstream':False,
    }

    이것을 임의로 하나의 fail상태를 기입하고 실행해보면서 DAGRun 내역을 봐보면 이전 성공한 task까지는 성공한 것을 볼수 있고 나머지는 no status로 처리되어있다.

    wait_for_downstream 적용하기

    wait_for_downstream을 적용할려면 속성에 True값을 넣어주면 된다.

    default_args={
        'owner' : 'eddie',
        'retries' : 1,
        'retry_delay' : timedelta(minutes=1),
        'depends_on_past': False,  <---- 이 부분
        'wait_for_downstream': True,
    }

    그러고 다시 한번 돌려 주면 아래와 같이 해당 DAGRun의 task가 모두 실행 되지 않는 모습을 볼 수 있다.

    나중에 이전 DAGRun의 의존해서 실행 값을 얻어야 할때 유용하게 쓰일것같다.

     

     

    소스는 아래 github에 모두 보실수 있습니다.

     

    GitHub - hyunseokjoo/prac_airflow

    Contribute to hyunseokjoo/prac_airflow development by creating an account on GitHub.

    github.com

     

    반응형

    댓글

Designed by Tistory.