ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow 기본 내용] DAG 작성 심화
    데이터 엔지니어링/AirFlow 2022. 6. 22. 10:37
    반응형

    DAG 선언 3가지 방법

    • 기본 DAG 선언 with 문
    with DAG(
    	dag_id="my_dag_name",
    	default_args={'owner':'airflow'},
    	start_date=pendulum.datetime(2021,1,1, tz="UTC"),
    	schedule_interval="@daily",
    	catchup=False,
    ) as dag:
    	op = EmptyOperator(task_id="task")
    
    • 표준 생성자
    my_dag = DAG(
    	dag_id="my_dag_name",
    	default_args={'owner':'airflow'},
    	start_date=pendulum.datetime(2021,1,1, tz="UTC"),
    	schedule_interval="@daily",
    	catchup=False,
    )
    op = EmptyOperator(task_id="task", dag=my_dag)
    
    • @dag 데코레이터 사용법
    @dag(
    	dag_id="my_dag_name",
    	default_args={'owner':'airflow'},
    	start_date=pendulum.datetime(2021,1,1, tz="UTC"),
    	schedule_interval="@daily",
    	catchup=False,
    )
    def generate_dag():
    
    	# 아래 같이 작성하면 pythonOperator임 
    	@task(task_id='task1', retries=2)
    	def make_task1():
    		return 'success'
    
    	# 아래와 같이 작성하면 EmailOperator로 선택하여 생성
    	email_notification = EmailOperator(
    		task_id="email_notification",
    		to='airflow@airflow.com',
    		subject='dag_completed',
    		html_
    	)
    
    dag = generate_dag()
    

    Task Dependencies(작업 종속성)

    Taks/Operator는 일반적으로 혼자 작업이 진행하지 않고, 다른작업과 종속성을 가진다.

    • Up_stream, Down_stream
    # down stream 적용 예
    first_task >> second_task
    # up stream 적용 예
    second_task << first_task
    
    # first를 두개의 task를 적용하고 싶을 때 배열로 적용하여 적용가능
    first_task >> [second_task, third_task]
    # 배열에서 하나만 골라서 up stream 적용 하고 싶을 때 
    third_task << fourth_task
    
    # 함수로도 활용 가능
    first_task.set_downstream(second_task, third_task)
    third_task.set_upstream(fourth_task)
    
    • cross_downstream 행열의 형태로 여러개를 동시의 종속성 설정
    from airflow.models.baseoperator import cross_downstream
    
    # 위의 내용은 아래의 형태로도 변경 가능
    [op1, op2] >> op3
    [op1, op2] >> op4
    
    # 위 내용 cross로 변경 했을 때
    cross_downStream([op1, op2], [op3, op4])
    
    • chain을 연속성을 가지는 작업 종속성 설정
    from airflow.models.baseoperator import chain
    
    op1 >> op2 >> op3 >> op4
    
    # 위 내용 체인으로 바꿨을 때 
    chain(op1, op2, op3, op4)
    
    op1 >> op2 >> op4 >> op6
    op1 >> op3 >> op5 >> op6
    chain(op1, [op2, op3], [op4, op5], op6)
    

    schedule_interval

    • cron 형식으로 스케쥴링 가능
    with DAG("my_daily_dag", schedule_interval="@daily")
    
    with DAG("my_daily_dag", schedule_interval="0 * * * *")
    

    Branch 분기 정하기

    분기를 사용하여 task를 진행하지 않고 진행한 task와 task를 진행하고 만나는 task를 route를 정해 줄수 있다.

    # 분기는 xcom과 같이 사용가능 함.
    # xcom이란 task간 정보를 공유하는 장치이다.
    def branch_func(ti):
    	xcom_value = int(ti.xcom_pull(task_ids="start_task"))
    	if xcom_value >= 5:
    		return "continue_task"
    	elif xcom_value >= 3:
    		return "stop_task"
    	else:
    		return None
    
    start_op = BashOperator(
    	task_id="start_task",
    	bash_command="echo 5",
    	xcom_push=True,
    	dag=dag,
    )
    
    branch_op = BranchPythonOperator(
    	task_id="branch_task",
    	python_callable=branch_func,
    	dag=dag,
    )
    
    continue_op = EmptyOperator(task_id="continue_task", dag=dag)
    stop_op = EmptyOperator(task_id="stop_task", dag=dag)
    
    start_op >> branch_op >> [continue_op, stop_op]
    

    BaseBranchOperator 분기 기능을 사용하여 고유한 연산자를 구현하는 경우 BranchPythonOperator를 상속받아 재구성해야한다.

    class MyBranchOperator(BaseBranchOperator):
    	def choose_branch(self, context):
    		if context['data_interval_start'].day == 1:
    			# 'daily_task_id', 'monthly_task_id' 이라는 작업을 분기 형태로 제공할 수 있다.
    			return ['daily_task_id', 'monthly_task_id']
    		elif context['data_interval_start'].day == 2:
    			# 하나의 분기를 제공 할 수도 있다.
    			return 'daily_task_id'
    		else
    			# 하나의 분기도 제공 하지 않을 수 있지만, 위와 같이 조건을 사용한 분기가 하나 이상 있어야한다.
    			return None
    

    LatestOnlyOperator

    LatestOnlyOperator는 ‘최신’ DAG 실행이 아닌 경우 자체 다운스트림의 모든 작업을 건너 뛴다.

    import datetime
    import pendulum
    from airflow import DAG
    from airflow.operators.empty import EmptyOperator
    from airflow.operators.latest_only import LatestOnlyOperator
    from airflow.utils.trigger_rule import TriggerRule
    
    with DAG(
    	dag_id='latest_only_with_trigger',
    	schedule_interval=datetime.timedelta(hours=4),
    	start_date=pendulum.datetime(2021,1,1, tz="UTC"),
    	catchup=False,
    	tags=['example3'],
    )as dag:
    	latest_only = LatestOnlyOperator(task_id='latest_only')
    	task1 = EmptyOperator(task_id='task1')
    	task2 = EmptyOperator(task_id='task2')
    	task3 = EmptyOperator(task_id='task3')
    	task4 = EmptyOperator(task_id='task4', trigger_rule=TriggerRule.ALL_DONE)
    
    	# trigger_rulle이 ALL_DONE으로 정의 되었기 때문에 
    	# task4에 연결되어있는 노드들이 다 작업을 완료해야 task4가 실행된다.
    	latest_only >> task1 >> [task3, task4]
    	task2 >> [task3, task4]
    
    • task1 의 직접 다운스트림이며 latest_only 최신 실행을 제외한 모든 실행에 대해 건너 뛴다.
    • task2 완전히 독립적이며 latest_only 모든 예정된 기간에 실행된다.
    • task3의 다운스트림이며 task1 기본 task2 트리거 규칙 때문에 all_success에서 계단식 건너뛰기를 수신함.
    • task4 task1 및의 다운스트림 이지만 trigger때문에 task2를 건너 뛰지는 않는다.

    Depends_on_past

    DAG 실행에 past 과거 실행 작업이 success 상태 일때만 작업을 실행하게 설정 가능 task에depends_on_past를 Task의 인수를 설정하고 값은 True로 설정한다.

    from airflow import models
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.bash_operator import BashOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime(2018, 10, 31),
        'retries': 3,
        'retry_delay': timedelta(minutes=5)
    }
    
    dag_name = 'dag-bugcheck'
    
    with models.DAG(dag_name,
                    default_args=default_args,
                    schedule_interval='0 0 * * *',
                    catchup=False,
                    max_active_runs=5,
                    ) as dag:
    
        test1 = DummyOperator(
            task_id='test1',
            task_concurrency=10,
        )
    
        test2 = BashOperator(
            task_id='test2',
            bash_command='echo hi',
            depends_on_past=True,    <-----
        )
    
        test3 = BashOperator(
            task_id='test3',
            bash_command='echo hi',
        )
    

    트리거 규칙(trigger rule)

    기본적으로 airflow는 작업을 실행하기 전에 위에 stream을 성공할 때까지 기다린다. 하지만 trigger-rule을 사용하여 작업을 조작 할 수 있다. trigger_rule는 Task에 대한 인수를 사용하여 제어 할 수 있다.

    Trigger-rule

    • all_success(기본값) : 모든 업스트림 작업이 성공 일 때
    • all_failed : 모든 업스트림 작업이 failed 또는 upstream_failed 상태 일 때
    • all_skipped : 모든 업스트림 작업이 skipped 상태 일 때
    • one_failed : 최소한 하나의 업스트림 작업이 실패 했을 때(모든 업스트림 작업이 완료를 기다리지 않음)
    • one_success : 최소한 하나의 업스트림 작업이 성공했을 때(모든 업스트림 작업이 완료를 기다리지 않음)
    • none_failed : 모든 업스트림 작업이 없거나 failed 또는 upstream_failed - 즉, 모든 업스트림 작업이 성공했거나 건너뛰었을 때
    • none_failed_min_one_success : 모든 업스트림 작업에는 failed 또는 upstream_failed 하나 이상의 업스팀 작업이 성공했을 때
    • none_skipped : skipped 상태에 업스트림 작업이 없음. 즉, 모든 업스트림 작업이 success, failed 또는 upstream_failed 상태에 일 때
    • always : 종속성이 전혀 없고, 언제든지 이 작업을 실행
    import datetime 
    
    from airflow.models import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.operators.python_operator import BranchPythonOperator
    
    dag = DAG(
    	dag_id='trigger_test',
    	schedule_interval='@once',
    	start_date=datetime.datetime(2022,3,25)
    )
    
    task_1 = BranchingPythonOperator(
    	task_id='task_1',
    	dag=dag
    	python callable=lamda: 'task_2'
    )
    
    task_2 = BashOperator(
    	task_id='task_2'
    	bash_command='echo "task_2"'
    )
    
    task_3 = BashOperator(
    	task_id='task_3',
    	bash_command='echo "task_3"'
    )
    
    task_4 = BashOperator(
    	task_id='task_4'
    	
    	# trigger rules are set
    	trigger_rule='all=failed'
    )
    
    task_1 >> [task_2, task_3] >> task_4
    

    동적 DAG(Dynamic DAG)

    DAG는 Python 코드로 정의 되므로 순전히 선언적일 필요는 없음. 루프, 함수 등을 자유롭게 사용하여 DAG를 정희 할 수 있음

    with DAG("loop_example") as dag:
    	first = EmptyOperator(task_id="first")
    	last = EmptyOperator(task_id="last")
    
    	options=["branch_a", "branch_b", "branch_c", "branch_d"]
    	for option in options:
    		t = EmptyOperator(task_id=option)
    		first >> t >> last
    

    작업 그룹(Task Group)

    반복적인 작업을 그룹으로 묶거나 하나의 그룹으로 보고 싶을 때 TaskGroup을 사용하여 작성, TaskGroup은 순전히 ui 그룹화 개념임 SubDag와 다르다.

    import pendulum
    
    from airflow.models.dag import DAG
    from airflow.operators.bash import BashOperator
    from airflow.operators.empty import EmptyOperator
    from airflow.utils.task_group import TaskGroup
    
    # [START howto_task_group]
    with DAG(
        dag_id="example_task_group",
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
        tags=["example"],
    ) as dag:
        start = EmptyOperator(task_id="start")
    
        # [section_1 group 설정]
        with TaskGroup("section_1", tooltip="Tasks for section_1") as section_1:
            task_1 = EmptyOperator(task_id="task_1")
            task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
            task_3 = EmptyOperator(task_id="task_3")
    
            task_1 >> [task_2, task_3]
    
        # [section_2 group 설정]
        with TaskGroup("section_2", tooltip="Tasks for section_2") as section_2:
            task_1 = EmptyOperator(task_id="task_1")
    
            # [inner_section_2 설정]
            with TaskGroup("inner_section_2", tooltip="Tasks for inner_section2") as inner_section_2:
                task_2 = BashOperator(task_id="task_2", bash_command='echo 1')
                task_3 = EmptyOperator(task_id="task_3")
                task_4 = EmptyOperator(task_id="task_4")
    
                [task_2, task_3] >> task_4
    
        end = EmptyOperator(task_id='end')
    
        start >> section_1 >> section_2 >> end
    

    가장자리 레이블(Edge Labels)

    group화 뿐만아니라 종속성 으로 가는 가장자리에 레이블을 표시하여 어떤 분기인지 표시가 가능하다. 종속성 설정에(ex >>, << 등) 을 지정한다음 label함수를 이용하여 작성하면 된다.

    from airflow.utils.edgemodifier import Label
    
    # 종속성 표시로 작성하는법 
    my_task >> Label("When empty") >> other_task
    
    # up, down Stream 함수로 작성하는 법 
    my_task.set_downstream(other_task, Label("When empty"))
    

    with DAG(
        "example_branch_labels",
        schedule_interval="@daily",
        start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        catchup=False,
    ) as dag:
        ingest = EmptyOperator(task_id="ingest")
        analyse = EmptyOperator(task_id="analyze")
        check = EmptyOperator(task_id="check_integrity")
        describe = EmptyOperator(task_id="describe_integrity")
        error = EmptyOperator(task_id="email_error")
        save = EmptyOperator(task_id="save")
        report = EmptyOperator(task_id="report")
    
        ingest >> analyse >> check
        check >> Label("No errors") >> save >> report
        check >> Label("Errors found") >> describe >> error >> report
    

    SubDAG

    정확한 동일한 작업 집합을 정기적으로 추가하거나 많은 작업을 하나의 논리적 단위로 그룹화 하려는 경우, 병렬처리를 하는 경우 등 SubDAG를 이용한다.

    아래와 같은 병렬처리를 하는 DAG를 구성한다고 했을 때

    dag를 ~/airflow/dags 폴더에 관리하는 것처럼 subdags들도 ~/airflow/dags/subdags 폴더로 관리 해야한다.

    child_DAG(Sub_DAG)- ~/airflow/dags/subdags/subdag.py

    import pendulum 
    
    from airflow import DAG
    from airflow.operators.empty import EmptyOperator
    
    def subdag(parent_dag_name, child_dag_name, args):
    	"""
    		보통 파라미터를 아래와 같이 3개로 정의하여 사용하곤한다.
    		부모 dag 이름, 자식 dag 이름, 아규먼트
    		return : DAG to use as s subdag
    		rtype : airflow.models.DAG
    	"""
    
    	dag_subdag = DAG(
    		dag_id=f'{parent_dag_name}.{child_dag_name}',
    		default_args=args,
    		start_date=pendulum.datetime(2021,1,1, tz="UTC"),
    		catchup=False,
    		schedule_interval="@daily",
    	)
    
    	for i in range(5):
    		EmptyOperator(
    			task_id=f'{child_dag_name}-task-{i + 1}',
    			default_args=args,
    			dag=dag_subdag,
    		)
    
    	return dag_subdag
    

    parent DAG - ~/airflow/dags/example_subdag.py

    import datetime
    
    from airflow import DAG
    from airflow.example_dags.subdags.subdag import subdag
    from airflow.operators.empty import EmptyOperator
    from airflow.operators.subdag import SubDagOperator
    
    DAG_NAME = 'example_subdag_operator'
    
    with DAG(
        dag_id=DAG_NAME,
        default_args={"retries": 2},
        start_date=datetime.datetime(2022, 1, 1),
        schedule_interval="@once",
        tags=['example'],
    ) as dag:
    
        start = EmptyOperator(
            task_id='start',
        )
    
        section_1 = SubDagOperator(
            task_id='section-1',
            subdag=subdag(DAG_NAME, 'section-1', dag.default_args),
        )
    
        some_other_task = EmptyOperator(
            task_id='some-other-task',
        )
    
        section_2 = SubDagOperator(
            task_id='section-2',
            subdag=subdag(DAG_NAME, 'section-2', dag.default_args),
        )
    
        end = EmptyOperator(
            task_id='end',
        )
    
        start >> section_1 >> some_other_task >> section_2 >> end
    

     

    참고 문헌

     

    DAGs — Airflow Documentation

     

    airflow.apache.org

     

    반응형

    댓글

Designed by Tistory.