ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow]Catch up, Backfill 알아보기
    데이터 엔지니어링/AirFlow 2022. 9. 4. 14:04
    반응형

    Airflow를 운용하다 보면, 재실행을 하거나 현재 시점 보다 과거의 배치 작업을 주기적으로 진행을 해야하거나, 실행을 했더라도 특정 조건으로 해당하는(실패나, 건너뜀 등)을 골라내서 재실행 하고 싶을 때가 있다. 바로 작업을 주기적으로 하는 관점이기 때문인데 이번 포스팅에서는 이런 작업들을 airflow에서는 어떻게 관리하고 다시 재 작업을 할 수 있을지 알아볼려고 한다.

    Catch Up

    python 코드로 DAG를 작성할 때 Dag 속성 안에 Catch up이라는 인수를 둘수 있다. 기본적으로 false가 되어있고, True로 주게 되면 Catch up이 활성화 되게 된다.

    with DAG(
        dag_id="example_dag",
        start_date=datetime(2021, 10, 9), 
        max_active_runs=1,
        timetable=UnevenIntervalsTimetable(),
        default_args={
            "retries": 1,
            "retry_delay": timedelta(minutes=3),
        },
        catchup=False
    ) as dag:
    

    Catch up은 언제쓰이냐면, 보통 start date가 현재로 1년전 20210902이라고 하면 Start Date부터 스케쥴링 된 부분의 DagRun들이 비게 된다. Catch up 을 True로 두게 되면, 이 부분이 순차적으로 Dag Run을 실행 시켜 주어 Catchup 부분의 빈공간을 채워주고 Scehduling된 부분은 시간에 맞춰 진행 되게 된다. 이런 것을 활용하여 시간 변수를 두어 과거 시점부터 스케쥴링 형식으로 작업을 배치 형식으로 진행해야 하는 경우 예를 들어 데이터를 이관 할 때 과거 날짜기준으로 일정 주기 마다 데이터 집계 하여이관한다고 하면 Catchup을 사용해도 좋을 것같다.

    또한 과거에 스케쥴되었던 Dag가 잠깐 중단 하다가 나중에 다시 시작하는 경우 또한 Catch up을 사용하여 다시 Dag Run을 채워주는 것이 가능하다.

     

    Catchup을 할 때 주의 사항이 DagRun이 한번에 실행 되기 때문에 서버에 과부하가 올 수 있다. 이부분을 방지 하기 위한 속성값들을 알아야 한다.

    Catch up 속성값

    • max_active_runs : DAG수준에서 설정 되며(위에 작성한 코드), Catch up 중에 DAGrun이 얼마나 실행 될 수 있는지를 설정 한다.
    • depends_on_past : 작업 수준에 설정 되며, 가장 최근에 DAG에서 동일한 작업이 수행 되었을 때 작업이 수행 될 수 있도록 제약을 걸 수 있다.
    • wait_for_downstream : DAG 수준에서 설정되며 다음 DAG를 실행 하려면 전체 task들이 수행 되어야 실행 되도록 한다.
    • catchup_by_default : config파일에서 설정이 가능하며 DAG를 만들 때 기본 값 True, False를 정할 수 있다.

    Backfill

    Backfill은 DAG가 이미 배포되어 실행 중이며 해당 DAG를 사용하여 DAG의 시작 날짜 이전에 데이터를 처리하기 원할 때 사용한다. Backfill은 지정한 기간동안 DAG 다시 재시작, 지정한 기간동안 전체 재시작, 지정한 기간 지정한 상태 동안 전체 재시작을 지정 하여 사용이 가능하다. 또한 Start_date이전의 날짜를 명시하여 실행 할 수도 있다.

    CLI를 사용하여 Backfill을 사용할 수 있다.

    airflow dags backfill [-h] [-c CONF] [--delay-on-limit DELAY_ON_LIMIT] [-x]
                          [-n] [-e END_DATE] [-i] [-I] [-l] [-m] [--pool POOL]
                          [--rerun-failed-tasks] [--reset-dagruns] [-B]
                          [-s START_DATE] [-S SUBDIR] [-t TASK_REGEX] [-v] [-y]
                          dag_id
    
    # 예시 
    airflow dags backfill -s 2021-11-01 -e 2021-11-02 example_dag
    인수  설명
    dag_id dag의 id
    -c Dagrun의 conf속성을 피클
    --continue-on-failure 설정하면 일부 작업이 실패해도 백필 계속 진행
    --delay-on-limit dag 실행을 다시 실행하기 전에 최대 활성 Dag실행 제한에 도달했을 때 대기하는 시간
    --end-date end-date yyyy-mm-dd
    -i 업스트림 작업을 건너뛰고 정규표현식과 일치하는 작업만 실행
    -I dependencies_on_past속성 무시
    -l LocalExecutor로 실행
    -m 작업을 실행하지 않고 성공한 것으로 표시
    --pool 사용할 리소스 풀
    --verbose 더 자세한 로깅 출력 만들기
    -s start_date yyyy-mm-dd로 정의
    --reset-dagruns 설정된 경우 백필은 기존 팩필 관련 DAG실행을 삭제하고 DAG 새로 시작
    --rerun-failed-tasks 설정된 경우 백필은 예외를 throw하는 대신 백필 날짜 범위에 대해 실패한 모든 작업을 자동으로 다시 실행

    Backfill은 보통 전체 재시작을 하는데 사용하지 않고, 일정 기간동안 실패한 작업에 대해 재실행을 하는데 사용된다고 보면 된다.

    # 지정한 기간동안 backfill 수행하지 않을 날짜만 수행
    airflow dags backfill --start-date {date} --end-date {date} dag_id
    # 지정한 기간동안 backfill 모든 재실행
    airflow dags backfill --start-date {date} --end-date {date} --reset-dagruns dag_id
    # 지정한 기간동안 실패한 task들만 재실행
    airflow dags backfill --start-date {date} --end-date {date} --rerun-failed-tasks
    

     

     

     

    참고문헌 

    https://docs.astronomer.io/learn/rerunning-dags

    https://medium.com/nerd-for-tech/airflow-catchup-backfill-demystified-355def1b6f92

    반응형

    댓글

Designed by Tistory.