-
[Airflow 기본 내용] DAG 작성 심화데이터 엔지니어링/AirFlow 2022. 6. 22. 10:37반응형
DAG 선언 3가지 방법
- 기본 DAG 선언 with 문
with DAG( dag_id="my_dag_name", default_args={'owner':'airflow'}, start_date=pendulum.datetime(2021,1,1, tz="UTC"), schedule_interval="@daily", catchup=False, ) as dag: op = EmptyOperator(task_id="task")
- 표준 생성자
my_dag = DAG( dag_id="my_dag_name", default_args={'owner':'airflow'}, start_date=pendulum.datetime(2021,1,1, tz="UTC"), schedule_interval="@daily", catchup=False, ) op = EmptyOperator(task_id="task", dag=my_dag)
- @dag 데코레이터 사용법
@dag( dag_id="my_dag_name", default_args={'owner':'airflow'}, start_date=pendulum.datetime(2021,1,1, tz="UTC"), schedule_interval="@daily", catchup=False, ) def generate_dag(): # 아래 같이 작성하면 pythonOperator임 @task(task_id='task1', retries=2) def make_task1(): return 'success' # 아래와 같이 작성하면 EmailOperator로 선택하여 생성 email_notification = EmailOperator( task_id="email_notification", to='airflow@airflow.com', subject='dag_completed', html_ ) dag = generate_dag()
Task Dependencies(작업 종속성)
Taks/Operator는 일반적으로 혼자 작업이 진행하지 않고, 다른작업과 종속성을 가진다.
- Up_stream, Down_stream
# down stream 적용 예 first_task >> second_task # up stream 적용 예 second_task << first_task # first를 두개의 task를 적용하고 싶을 때 배열로 적용하여 적용가능 first_task >> [second_task, third_task] # 배열에서 하나만 골라서 up stream 적용 하고 싶을 때 third_task << fourth_task # 함수로도 활용 가능 first_task.set_downstream(second_task, third_task) third_task.set_upstream(fourth_task)
- cross_downstream 행열의 형태로 여러개를 동시의 종속성 설정
from airflow.models.baseoperator import cross_downstream # 위의 내용은 아래의 형태로도 변경 가능 [op1, op2] >> op3 [op1, op2] >> op4 # 위 내용 cross로 변경 했을 때 cross_downStream([op1, op2], [op3, op4])
- chain을 연속성을 가지는 작업 종속성 설정
from airflow.models.baseoperator import chain op1 >> op2 >> op3 >> op4 # 위 내용 체인으로 바꿨을 때 chain(op1, op2, op3, op4) op1 >> op2 >> op4 >> op6 op1 >> op3 >> op5 >> op6 chain(op1, [op2, op3], [op4, op5], op6)
schedule_interval
- cron 형식으로 스케쥴링 가능
with DAG("my_daily_dag", schedule_interval="@daily") with DAG("my_daily_dag", schedule_interval="0 * * * *")
Branch 분기 정하기
분기를 사용하여 task를 진행하지 않고 진행한 task와 task를 진행하고 만나는 task를 route를 정해 줄수 있다.
# 분기는 xcom과 같이 사용가능 함. # xcom이란 task간 정보를 공유하는 장치이다. def branch_func(ti): xcom_value = int(ti.xcom_pull(task_ids="start_task")) if xcom_value >= 5: return "continue_task" elif xcom_value >= 3: return "stop_task" else: return None start_op = BashOperator( task_id="start_task", bash_command="echo 5", xcom_push=True, dag=dag, ) branch_op = BranchPythonOperator( task_id="branch_task", python_callable=branch_func, dag=dag, ) continue_op = EmptyOperator(task_id="continue_task", dag=dag) stop_op = EmptyOperator(task_id="stop_task", dag=dag) start_op >> branch_op >> [continue_op, stop_op]
BaseBranchOperator 분기 기능을 사용하여 고유한 연산자를 구현하는 경우 BranchPythonOperator를 상속받아 재구성해야한다.
class MyBranchOperator(BaseBranchOperator): def choose_branch(self, context): if context['data_interval_start'].day == 1: # 'daily_task_id', 'monthly_task_id' 이라는 작업을 분기 형태로 제공할 수 있다. return ['daily_task_id', 'monthly_task_id'] elif context['data_interval_start'].day == 2: # 하나의 분기를 제공 할 수도 있다. return 'daily_task_id' else # 하나의 분기도 제공 하지 않을 수 있지만, 위와 같이 조건을 사용한 분기가 하나 이상 있어야한다. return None
LatestOnlyOperator
LatestOnlyOperator는 ‘최신’ DAG 실행이 아닌 경우 자체 다운스트림의 모든 작업을 건너 뛴다.
import datetime import pendulum from airflow import DAG from airflow.operators.empty import EmptyOperator from airflow.operators.latest_only import LatestOnlyOperator from airflow.utils.trigger_rule import TriggerRule with DAG( dag_id='latest_only_with_trigger', schedule_interval=datetime.timedelta(hours=4), start_date=pendulum.datetime(2021,1,1, tz="UTC"), catchup=False, tags=['example3'], )as dag: latest_only = LatestOnlyOperator(task_id='latest_only') task1 = EmptyOperator(task_id='task1') task2 = EmptyOperator(task_id='task2') task3 = EmptyOperator(task_id='task3') task4 = EmptyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE) # trigger_rulle이 ALL_DONE으로 정의 되었기 때문에 # task4에 연결되어있는 노드들이 다 작업을 완료해야 task4가 실행된다. latest_only >> task1 >> [task3, task4] task2 >> [task3, task4]
- task1 의 직접 다운스트림이며 latest_only 최신 실행을 제외한 모든 실행에 대해 건너 뛴다.
- task2 완전히 독립적이며 latest_only 모든 예정된 기간에 실행된다.
- task3의 다운스트림이며 task1 기본 task2 트리거 규칙 때문에 all_success에서 계단식 건너뛰기를 수신함.
- task4 task1 및의 다운스트림 이지만 trigger때문에 task2를 건너 뛰지는 않는다.
Depends_on_past
DAG 실행에 past 과거 실행 작업이 success 상태 일때만 작업을 실행하게 설정 가능 task에depends_on_past를 Task의 인수를 설정하고 값은 True로 설정한다.
from airflow import models from airflow.operators.dummy_operator import DummyOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'start_date': datetime(2018, 10, 31), 'retries': 3, 'retry_delay': timedelta(minutes=5) } dag_name = 'dag-bugcheck' with models.DAG(dag_name, default_args=default_args, schedule_interval='0 0 * * *', catchup=False, max_active_runs=5, ) as dag: test1 = DummyOperator( task_id='test1', task_concurrency=10, ) test2 = BashOperator( task_id='test2', bash_command='echo hi', depends_on_past=True, <----- ) test3 = BashOperator( task_id='test3', bash_command='echo hi', )
트리거 규칙(trigger rule)
기본적으로 airflow는 작업을 실행하기 전에 위에 stream을 성공할 때까지 기다린다. 하지만 trigger-rule을 사용하여 작업을 조작 할 수 있다. trigger_rule는 Task에 대한 인수를 사용하여 제어 할 수 있다.
Trigger-rule
- all_success(기본값) : 모든 업스트림 작업이 성공 일 때
- all_failed : 모든 업스트림 작업이 failed 또는 upstream_failed 상태 일 때
- all_skipped : 모든 업스트림 작업이 skipped 상태 일 때
- one_failed : 최소한 하나의 업스트림 작업이 실패 했을 때(모든 업스트림 작업이 완료를 기다리지 않음)
- one_success : 최소한 하나의 업스트림 작업이 성공했을 때(모든 업스트림 작업이 완료를 기다리지 않음)
- none_failed : 모든 업스트림 작업이 없거나 failed 또는 upstream_failed - 즉, 모든 업스트림 작업이 성공했거나 건너뛰었을 때
- none_failed_min_one_success : 모든 업스트림 작업에는 failed 또는 upstream_failed 하나 이상의 업스팀 작업이 성공했을 때
- none_skipped : skipped 상태에 업스트림 작업이 없음. 즉, 모든 업스트림 작업이 success, failed 또는 upstream_failed 상태에 일 때
- always : 종속성이 전혀 없고, 언제든지 이 작업을 실행
import datetime from airflow.models import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import BranchPythonOperator dag = DAG( dag_id='trigger_test', schedule_interval='@once', start_date=datetime.datetime(2022,3,25) ) task_1 = BranchingPythonOperator( task_id='task_1', dag=dag python callable=lamda: 'task_2' ) task_2 = BashOperator( task_id='task_2' bash_command='echo "task_2"' ) task_3 = BashOperator( task_id='task_3', bash_command='echo "task_3"' ) task_4 = BashOperator( task_id='task_4' # trigger rules are set trigger_rule='all=failed' ) task_1 >> [task_2, task_3] >> task_4
동적 DAG(Dynamic DAG)
DAG는 Python 코드로 정의 되므로 순전히 선언적일 필요는 없음. 루프, 함수 등을 자유롭게 사용하여 DAG를 정희 할 수 있음
with DAG("loop_example") as dag: first = EmptyOperator(task_id="first") last = EmptyOperator(task_id="last") options=["branch_a", "branch_b", "branch_c", "branch_d"] for option in options: t = EmptyOperator(task_id=option) first >> t >> last
작업 그룹(Task Group)
반복적인 작업을 그룹으로 묶거나 하나의 그룹으로 보고 싶을 때 TaskGroup을 사용하여 작성, TaskGroup은 순전히 ui 그룹화 개념임 SubDag와 다르다.
import pendulum from airflow.models.dag import DAG from airflow.operators.bash import BashOperator from airflow.operators.empty import EmptyOperator from airflow.utils.task_group import TaskGroup # [START howto_task_group] with DAG( dag_id="example_task_group", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, tags=["example"], ) as dag: start = EmptyOperator(task_id="start") # [section_1 group 설정] with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1: task_1 = EmptyOperator(task_id="task_1") task_2 = BashOperator(task_id="task_2", bash_command='echo 1') task_3 = EmptyOperator(task_id="task_3") task_1 >> [task_2, task_3] # [section_2 group 설정] with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2: task_1 = EmptyOperator(task_id="task_1") # [inner_section_2 설정] with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2: task_2 = BashOperator(task_id="task_2", bash_command='echo 1') task_3 = EmptyOperator(task_id="task_3") task_4 = EmptyOperator(task_id="task_4") [task_2, task_3] >> task_4 end = EmptyOperator(task_id='end') start >> section_1 >> section_2 >> end
가장자리 레이블(Edge Labels)
group화 뿐만아니라 종속성 으로 가는 가장자리에 레이블을 표시하여 어떤 분기인지 표시가 가능하다. 종속성 설정에(ex >>, << 등) 을 지정한다음 label함수를 이용하여 작성하면 된다.
from airflow.utils.edgemodifier import Label # 종속성 표시로 작성하는법 my_task >> Label("When empty") >> other_task # up, down Stream 함수로 작성하는 법 my_task.set_downstream(other_task, Label("When empty"))
with DAG( "example_branch_labels", schedule_interval="@daily", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, ) as dag: ingest = EmptyOperator(task_id="ingest") analyse = EmptyOperator(task_id="analyze") check = EmptyOperator(task_id="check_integrity") describe = EmptyOperator(task_id="describe_integrity") error = EmptyOperator(task_id="email_error") save = EmptyOperator(task_id="save") report = EmptyOperator(task_id="report") ingest >> analyse >> check check >> Label("No errors") >> save >> report check >> Label("Errors found") >> describe >> error >> report
SubDAG
정확한 동일한 작업 집합을 정기적으로 추가하거나 많은 작업을 하나의 논리적 단위로 그룹화 하려는 경우, 병렬처리를 하는 경우 등 SubDAG를 이용한다.
아래와 같은 병렬처리를 하는 DAG를 구성한다고 했을 때
dag를 ~/airflow/dags 폴더에 관리하는 것처럼 subdags들도 ~/airflow/dags/subdags 폴더로 관리 해야한다.
child_DAG(Sub_DAG)- ~/airflow/dags/subdags/subdag.py
import pendulum from airflow import DAG from airflow.operators.empty import EmptyOperator def subdag(parent_dag_name, child_dag_name, args): """ 보통 파라미터를 아래와 같이 3개로 정의하여 사용하곤한다. 부모 dag 이름, 자식 dag 이름, 아규먼트 return : DAG to use as s subdag rtype : airflow.models.DAG """ dag_subdag = DAG( dag_id=f'{parent_dag_name}.{child_dag_name}', default_args=args, start_date=pendulum.datetime(2021,1,1, tz="UTC"), catchup=False, schedule_interval="@daily", ) for i in range(5): EmptyOperator( task_id=f'{child_dag_name}-task-{i + 1}', default_args=args, dag=dag_subdag, ) return dag_subdag
parent DAG - ~/airflow/dags/example_subdag.py
import datetime from airflow import DAG from airflow.example_dags.subdags.subdag import subdag from airflow.operators.empty import EmptyOperator from airflow.operators.subdag import SubDagOperator DAG_NAME = 'example_subdag_operator' with DAG( dag_id=DAG_NAME, default_args={"retries": 2}, start_date=datetime.datetime(2022, 1, 1), schedule_interval="@once", tags=['example'], ) as dag: start = EmptyOperator( task_id='start', ) section_1 = SubDagOperator( task_id='section-1', subdag=subdag(DAG_NAME, 'section-1', dag.default_args), ) some_other_task = EmptyOperator( task_id='some-other-task', ) section_2 = SubDagOperator( task_id='section-2', subdag=subdag(DAG_NAME, 'section-2', dag.default_args), ) end = EmptyOperator( task_id='end', ) start >> section_1 >> some_other_task >> section_2 >> end
참고 문헌
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow 실습] 여러개의 Task 한번에 돌리기 (0) 2022.06.22 [Airflow 실습]가장 기본 DAG 만들어 실행해보기 (0) 2022.06.22 [Airflow 기본 내용] 기본적인 DAG 작성법 (0) 2022.06.22 [Airflow 기본 내용] Operator, task란? (0) 2022.06.22 [Airflow 기본 내용] Airflow란? 구성요소, 구동원리, 개념, 용어 정리 (0) 2022.06.21