-
[Airflow]Xcom을 이용한 task 간 데이터 주고 받기데이터 엔지니어링/AirFlow 2022. 6. 26. 20:08반응형
XCom이란
- cross communcation의 약자로 airflow task간 데이터를 주고 받을 일이 있는데 이 부분을 해결하기 위해 나왔다.
- Task에서 연산이나 필요한 데이터가 각각의 task마다 변경이 될때가 있는데 이런 경우에 데이터를 처리하기 위해 주로 사용된다.
- Airflow는 Task Instance간 데이터를 공유하지 않기 때문에 XCom을 이용하여 데이터를 주고 받아야 한다.
- XCom은 DAG Run 내에서만 존재하고 다른 DAG Run과는 공유하지 않는다.
- Dataframe과 같은 많은 양의 데이터는 지원하지 않는다.
- PythonOperator를 사용하면 return값은 자동적으로 XCom변수로 등록된다.
XCom 사용하는 몇가지 방법
- PythonOperator return값을 이용한 XCom
- push-pull을 이용한 XCom
- jinja template을 이용한 XCom
Xcom 사용하기 준비
먼저 xcom을 사용하기 위해서는 xcom객체에 대해 이해를 해야하는데 xcom은 위에서 DAGRun에서만 존재 한다고 하였습니다. 메모리를 공유하지 않기 때문인데 그럼 DAGRun이 어떻게 돌아 가는지 알아야 합니다.
위에 보다 싶이 DAGRun에 대한 실행 내역입니다. 위에는 DAGRun 이고 DAGRun안에 6개의 task가 있습니다. 이 task들은 task Instance 들이라고 합니다. 각각의 실행 내역을 저장하기 위한 객체로 활용되는데 task instance안에 xcom이 하나하나 설정되어 들어가게 됩니다. 즉, xcom을 사용하기 위해서는 task instance 객체를 임포트 시켜 사용하여야한다는 것입니다.
from airflow.models import TaskInstance
위의 문구를 먼저 import 시켜 줍니다.그리고 아래는 xcom을 활용하는 3가지 예시입니다.
PythonOperator return값을 이용한 XCom
pythonOperator를 사용하면 return값이 바로 xcom으로 들어가게 된다. 아래와 같이 작성하면 xcom을 사용가능 하다.
def pythonOperator_return_xcom(): return "python은 return 값이 xcom으로 바로 반환됨" def get_pythonOperator_xcom(**context): res = context['task_instance'].xcom_pull(task_ids=f'pythonOperator_return_xcom') print(res) pythonOperator_return_xcom = PythonOperator( task_id='pythonOperator_return_xcom', python_callable=pythonOperator_return_xcom, )
push-pull을 이용한 XCom
pythonOperator를 바로 되지만 실제로 객체를 생성해서 push하고 pull로 당겨와서 확인하는 경우도 있다.
def push_value(**context): task_instance = context['task_instance'] task_instance.xcom_push(key='pushedValue', value="값이 넣어 졌지") def pull_value(**context): res = context['task_instance'].xcom_pull(key='pushedValue') print(res) pushed_value = PythonOperator( task_id='pushed_value', python_callable=push_value ) pulled_value = PythonOperator( task_id='pulled_value', python_callable=pull_value )
jinja template을 이용한 xcom
jinja template을 이용하여 xcom을 구성할 수도 있다.
def pull_value_from_jinja(**context): res = context['task_instance'].xcom_pull(key='jinja') print(res) jinja_template = BashOperator( task_id='jinja_template', bash_command='echo "{{task_instance.xcom_push(key="jinja", value="airflow")}}"' ) pulled_value_jinja = PythonOperator( task_id='pulled_value', python_callable=pull_value_from_jinja )
완성 사진
완성 소스
from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.models import TaskInstance from airflow.operators.bash import BashOperator from airflow.operators.python import PythonOperator from pendulum.tz.timezone import Timezone kst = Timezone('Asia/Seoul') default_args = { 'owner':'eddie', 'retries' : 1, 'retriy_delay':timedelta(minutes=1), } with DAG( dag_id="ex_xcoms", description="xcom으로 task간 데이터 주고 받는 실습", default_args=default_args, schedule_interval="@once", start_date=datetime(2022,6,22, tzinfo=kst), tags=['test','xcom sample'] ) as dag: def dump(): sleep(3) start = PythonOperator( task_id="start", python_callable=dump ) # xcom 방법 1 pythonoperator def pythonOperator_return_xcom(): return "python은 return 값이 xcom으로 바로 반환됨" def get_pythonOperator_xcom(**context): res = context['task_instance'].xcom_pull(task_ids=f'pythonOperator_return_xcom') print(res) pythonOperator_return_xcom = PythonOperator( task_id='pythonOperator_return_xcom', python_callable=pythonOperator_return_xcom, ) # xcom 방법 2 push pull 이용 def push_value(**context): task_instance = context['task_instance'] task_instance.xcom_push(key='pushedValue', value="값이 넣어 졌지") def pull_value(**context): res = context['task_instance'].xcom_pull(key='pushedValue') print(res) pushed_value = PythonOperator( task_id='pushed_value', python_callable=push_value ) pulled_value_python = PythonOperator( task_id='pulled_value', python_callable=pull_value ) # xcom 방법 3 jinja template 이용 def pull_value_from_jinja(**context): res = context['task_instance'].xcom_pull(key='jinja') print(res) jinja_template = BashOperator( task_id='jinja_template', bash_command='echo "{{task_instance.xcom_push(key="jinja", value="템플릿 멋져")}}"' ) pulled_value_jinja = PythonOperator( task_id='pulled_value_jinja', python_callable=pull_value_from_jinja ) start >> pythonOperator_return_xcom start >> pushed_value >> pulled_value_python start >> jinja_template >> pulled_value_jinja
값 확인하기
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow] Connection을 이용하여 외부 서비스와 연동하기 (0) 2022.06.26 [Airflow]Pool 을 이용하여 메모리 사용량 제한하기 (0) 2022.06.26 [Airflow 실습] Trigger를 이용하여 다른 DAG를 실행하기 (0) 2022.06.24 [Airflow 실습] Task group으로 ui 관리하기 (0) 2022.06.23 [Airflow 실습] SubDAG 작성하여 DAG관리하기 (0) 2022.06.23