ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow]Xcom을 이용한 task 간 데이터 주고 받기
    데이터 엔지니어링/AirFlow 2022. 6. 26. 20:08
    반응형

    XCom이란

    • cross communcation의 약자로 airflow task간 데이터를 주고 받을 일이 있는데 이 부분을 해결하기 위해 나왔다.
    • Task에서 연산이나 필요한 데이터가 각각의 task마다 변경이 될때가 있는데 이런 경우에 데이터를 처리하기 위해 주로 사용된다.
    • Airflow는 Task Instance간 데이터를 공유하지 않기 때문에 XCom을 이용하여 데이터를 주고 받아야 한다.
    • XCom은 DAG Run 내에서만 존재하고 다른 DAG Run과는 공유하지 않는다.
    • Dataframe과 같은 많은 양의 데이터는 지원하지 않는다.
    • PythonOperator를 사용하면 return값은 자동적으로 XCom변수로 등록된다.

    XCom 사용하는 몇가지 방법

    • PythonOperator return값을 이용한 XCom
    • push-pull을 이용한 XCom
    • jinja template을 이용한 XCom

    Xcom 사용하기 준비

    먼저 xcom을 사용하기 위해서는 xcom객체에 대해 이해를 해야하는데 xcom은 위에서 DAGRun에서만 존재 한다고 하였습니다. 메모리를 공유하지 않기 때문인데 그럼 DAGRun이 어떻게 돌아 가는지 알아야 합니다.

    위에 보다 싶이 DAGRun에 대한 실행 내역입니다. 위에는 DAGRun 이고 DAGRun안에 6개의 task가 있습니다. 이 task들은 task Instance 들이라고 합니다. 각각의 실행 내역을 저장하기 위한 객체로 활용되는데 task instance안에 xcom이 하나하나 설정되어 들어가게 됩니다. 즉, xcom을 사용하기 위해서는 task instance 객체를 임포트 시켜 사용하여야한다는 것입니다.

    from airflow.models import TaskInstance
    

    위의 문구를 먼저 import 시켜 줍니다.그리고 아래는 xcom을 활용하는 3가지 예시입니다.

    PythonOperator return값을 이용한 XCom

    pythonOperator를 사용하면 return값이 바로 xcom으로 들어가게 된다. 아래와 같이 작성하면 xcom을 사용가능 하다.

    def pythonOperator_return_xcom():
      return "python은 return 값이 xcom으로 바로 반환됨"
    
    def get_pythonOperator_xcom(**context):
    	res = context['task_instance'].xcom_pull(task_ids=f'pythonOperator_return_xcom')
    	print(res)
    
    pythonOperator_return_xcom = PythonOperator(
      task_id='pythonOperator_return_xcom',
      python_callable=pythonOperator_return_xcom,
    )
    

    push-pull을 이용한 XCom

    pythonOperator를 바로 되지만 실제로 객체를 생성해서 push하고 pull로 당겨와서 확인하는 경우도 있다.

    def push_value(**context):
        task_instance = context['task_instance']
        task_instance.xcom_push(key='pushedValue', value="값이 넣어 졌지")
    
    def pull_value(**context):
        res = context['task_instance'].xcom_pull(key='pushedValue')
        print(res)
    
    pushed_value = PythonOperator(
        task_id='pushed_value',
        python_callable=push_value
    )
    
    pulled_value = PythonOperator(
        task_id='pulled_value',
        python_callable=pull_value
    )
    

    jinja template을 이용한 xcom

    jinja template을 이용하여 xcom을 구성할 수도 있다.

    def pull_value_from_jinja(**context):
        res = context['task_instance'].xcom_pull(key='jinja')
        print(res)
    
    jinja_template = BashOperator(
        task_id='jinja_template',
        bash_command='echo "{{task_instance.xcom_push(key="jinja", value="airflow")}}"'
    )
    
    pulled_value_jinja = PythonOperator(
        task_id='pulled_value',
        python_callable=pull_value_from_jinja
    )
    

    완성 사진

    완성 소스

    from datetime import datetime, timedelta
    from time import sleep
    from airflow import DAG
    from airflow.models import TaskInstance
    from airflow.operators.bash import BashOperator
    from airflow.operators.python import PythonOperator
    from pendulum.tz.timezone import Timezone
    
    kst = Timezone('Asia/Seoul')
    
    default_args = {
        'owner':'eddie',
        'retries' : 1,
        'retriy_delay':timedelta(minutes=1),
    }
    
    with DAG(
        dag_id="ex_xcoms",    
        description="xcom으로 task간 데이터 주고 받는 실습",
        default_args=default_args,
        schedule_interval="@once",
        start_date=datetime(2022,6,22, tzinfo=kst),
        tags=['test','xcom sample']
    ) as dag:
    
        def dump():
            sleep(3)
        
        start = PythonOperator(
            task_id="start",
            python_callable=dump
        )
    
    		# xcom 방법 1 pythonoperator
        def pythonOperator_return_xcom():
            return "python은 return 값이 xcom으로 바로 반환됨"
    
        
        def get_pythonOperator_xcom(**context):
            res = context['task_instance'].xcom_pull(task_ids=f'pythonOperator_return_xcom')
            print(res)
    
        pythonOperator_return_xcom = PythonOperator(
            task_id='pythonOperator_return_xcom',
            python_callable=pythonOperator_return_xcom,
        )
    
    		# xcom 방법 2 push pull 이용
        def push_value(**context):
            task_instance = context['task_instance']
            task_instance.xcom_push(key='pushedValue', value="값이 넣어 졌지")
    
        def pull_value(**context):
            res = context['task_instance'].xcom_pull(key='pushedValue')
            print(res)
    
        pushed_value = PythonOperator(
            task_id='pushed_value',
            python_callable=push_value
        )
    
        pulled_value_python = PythonOperator(
            task_id='pulled_value',
            python_callable=pull_value
        )
    
    		# xcom 방법 3 jinja template 이용
        def pull_value_from_jinja(**context):
            res = context['task_instance'].xcom_pull(key='jinja')
            print(res)
    
        jinja_template = BashOperator(
            task_id='jinja_template',
            bash_command='echo "{{task_instance.xcom_push(key="jinja", value="템플릿 멋져")}}"'
        )
    
        pulled_value_jinja = PythonOperator(
            task_id='pulled_value_jinja',
            python_callable=pull_value_from_jinja
        )
    
        start >> pythonOperator_return_xcom
        start >> pushed_value >> pulled_value_python
        start >> jinja_template >> pulled_value_jinja
    

    값 확인하기

    
    

    반응형

    댓글

Designed by Tistory.