-
[Airflow]variables를 이용한 전역변수 설정데이터 엔지니어링/AirFlow 2022. 6. 27. 08:45반응형
이전에 xcom을 이용하여 task instance간 데이터를 주고 받는 법을 알아봤는데 이번에는 전역으로 아무 DAGRun에서 데이터를 공유하고 사용하는 방법에 대해 알아 보겠습니다. Variables라는 기능인데 web ui 에서 admin- variables 라는 탭이 있습니다.
variables 생성 방법
variables를 클릭하면 아래와 같이 구성할 수 있는 화면이 나옴 + 버튼을 눌러 생성가능, 아래와 같이 두개를 구성해 줄수 있음 그냥 text도 되고, json도 가능
save를 눌러 저장한 모습
구성 방법
불러오는 방법은 Variable 객체를 import 하고 key-value를 이용하여 값을 가져와 사용하면 된다. json은 json에 맞게 deserialize하여 사용한다. Variable 객체에 옵션으로 주어 사용도 가능하다. Variable 객체의 자세한 내용은 IDE에서 자세히 보기로 확인하자
# 모듈 import from airflow.models import Variable # 일반 값 가져오기 def get_variables1(): # Variable 객체 안에 값이 있음 key-value형식이므로 key값을 이용하여 가져오기 value = Variable.get(key='test1') print(value) # json data 가져오기 def get_variables2(): json = Variable.get(key='test2', deserialize_json=True) value1 = json['value_1'] value2 = json['value_2'] print(value1) print(value2)
완성 소스
from time import sleep from pendulum.tz.timezone import Timezone from datetime import datetime, timedelta from airflow import DAG from airflow.models import Variable from airflow.operators.python import PythonOperator kst = Timezone('Asia/Seoul') default_args = { 'owner' : 'eddie', 'retries' : 1, 'retry_delay': timedelta(minutes=1) } with DAG ( dag_id='ex_variables', default_args=default_args, schedule_interval='@once', start_date=datetime(2022,6,22, tzinfo=kst), tags=['test', 'variables sample'], ) as dag: def dump(): sleep(3) # 일반 값 가져오기 def get_variables1(): value = Variable.get(key='test1') print(value) # json data 가져오기 def get_variables2(): json = Variable.get(key='test2', deserialize_json=True) value1 = json['value_1'] value2 = json['value_2'] print(value1) print(value2) start = PythonOperator( task_id='start', python_callable=dump ) get_test1 = PythonOperator( task_id='get_test1', python_callable=get_variables1 ) get_test2 = PythonOperator( task_id='get_test2', python_callable=get_variables2 ) start >> get_test1 >> get_test2
airflow 관련 모든 소스는 아래 github에 있습니다.
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow]Catch up, Backfill 알아보기 (0) 2022.09.04 [Airflow] Executor의 종류와 사용가이드라인 알아보기(Sequential, Local, Celery, Kubernetes) (0) 2022.08.18 [Airflow] Connection을 이용하여 외부 서비스와 연동하기 (0) 2022.06.26 [Airflow]Pool 을 이용하여 메모리 사용량 제한하기 (0) 2022.06.26 [Airflow]Xcom을 이용한 task 간 데이터 주고 받기 (0) 2022.06.26