데이터 엔지니어링/AirFlow
-
[Airflow] User_defined_macros를 이용하여 jinja template의 사용자 정의 변수 활용하기데이터 엔지니어링/AirFlow 2023. 2. 12. 13:22
안녕하세요 까치입니다. 오랜만에 Airflow 포스팅을 하게 되었습니다. DAG의 모든 task에서 공통 변수를 활용하는 경우가 많은데 이때 어떻게 해야할지 공부하게 되어 포스팅 하게 되었습니다. 먼저 user_defined_macros라는 속성에 변수를 등록하면 그 당시에 활용하는 DAG안에 공통 변수를 설정 할 수 있습니다. 아래 코드는 완성 코드 입니다. MACROS_VAR에 공통 변수로 활용할 값을 설정하고, DAG안에 user_defined_macros라는 속성에 넣었습니다. from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash import BashOperator default_arg..
-
[Airflow] jinja_template을 활용한 날짜 동적 변수 활용 하는 법(동적 datetime, ds변수 UTC안되는 것 해결법)데이터 엔지니어링/AirFlow 2022. 9. 18. 18:15
안녕하세요 오늘은 회사에서 airflow를 사용하고 있는데 쿼리를 execution_date별로 날짜를 동적으로 설정 해야하는 상황이 발생하여 동적 date변수를 조작해 보는 airflow 코드를 만들어 볼 예정입니다. datetime 동적 변수 만드는데 삽질을 좀 많이해서 삽질 과정도 좀 같이 넣어 볼게요. 먼저, 동적 변수를 만드는 첫 번째 과정은 Jinja template을 이용하는 것입니다. Jinja template에 있는 동적 변수 이용하기 먼저 airflow는 python 코드로 되어있기 때문에 jinja template을 내장하고 있습니다. airflow에서는 Jinja_template에 미리 정의 되어있는 변수를 제공합니다. 홈페이지에 가보시면 자세히 나와있습니다. 사용법은 아래 코드를 ..
-
[Airflow] hook이란? (hook을 사용하여 mysql to csv 패턴 실습)데이터 엔지니어링/AirFlow 2022. 9. 4. 21:38
Hook이란? Hook이란? 미리 정의 된 것이 아니라 커스텀하게 로직을 구현하고 싶을 때나 외부 시스템의 푸시 작업 흐름을 만들고 싶을 때도 사용한다. 보통pythonOperator로 작업을 실행 시키고 python 함수 코드 안에서 다른 로직을 쉽게 구현할 수 있게 해주기 위해 사용한다. pip로 다른 플러그인을 다운받아 사용도 하여 굳이 hook을 사용할 필요는 없다. 그냥 쉽게 접근이 가능하게 하는 용도 외에는 없는 것 같다. Mysql 커넥터 만들기 실습을 위해 커넥터 먼저 만들자 mysql 깔려 있다고 가정하고 하는 것이다. + 버튼을 눌러 커넥터 만들기 Provider 설치 apache-airflow-providers-mysql — apache-airflow-providers-mysql Do..
-
[Airflow] airflow설치 방법(with ubuntu, mysql)데이터 엔지니어링/AirFlow 2022. 9. 4. 17:51
mysql 설치 # 시스템 업그레이드 sudo apt update && sudo apt upgrade -y # mysql 서버 설치 sudo apt-get install -y mysql-server # mysql enble 설정 sudo systemctl enable mysql # mysql 서버 sudo systemctl status mysql ● mysql.service - MySQL Community Server Loaded: loaded (/lib/systemd/system/mysql.service; enabled; vendor preset:> Active: active (running) since Sun 2022-09-04 16:05:53 KST; 5s ago Main PID: 3169 (mys..
-
[Airflow]Catch up, Backfill 알아보기데이터 엔지니어링/AirFlow 2022. 9. 4. 14:04
Airflow를 운용하다 보면, 재실행을 하거나 현재 시점 보다 과거의 배치 작업을 주기적으로 진행을 해야하거나, 실행을 했더라도 특정 조건으로 해당하는(실패나, 건너뜀 등)을 골라내서 재실행 하고 싶을 때가 있다. 바로 작업을 주기적으로 하는 관점이기 때문인데 이번 포스팅에서는 이런 작업들을 airflow에서는 어떻게 관리하고 다시 재 작업을 할 수 있을지 알아볼려고 한다. Catch Up python 코드로 DAG를 작성할 때 Dag 속성 안에 Catch up이라는 인수를 둘수 있다. 기본적으로 false가 되어있고, True로 주게 되면 Catch up이 활성화 되게 된다. with DAG( dag_id="example_dag", start_date=datetime(2021, 10, 9), max_..
-
[Airflow] Executor의 종류와 사용가이드라인 알아보기(Sequential, Local, Celery, Kubernetes)데이터 엔지니어링/AirFlow 2022. 8. 18. 20:22
Airflow Executor란? executor란 작업자들에게 작업을 실행 시키는 역할을 한다. 종류로는 SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor등이 있다. LocalExecutor는 병렬처리까지만 가능하고 클러스터 형으로 작업을 나눌수는 없다. CeleryExecutor, KubernetesExecutor는 클러스터를 구성해야 한다. 기본 제공 Executor Airflow 처음 설치를 하면 기본 제공되는 것이 바로 SequentialExecutor이다. 순차적으로 진행하는 것만 할 수 있다는 것이다. Executor에서 일어나는일을 순차적으로만 실행한다면 어떻게 될까 작업이 느려지거나 동시에 일어나서 진행해야하는 작..
-
[Airflow]variables를 이용한 전역변수 설정데이터 엔지니어링/AirFlow 2022. 6. 27. 08:45
이전에 xcom을 이용하여 task instance간 데이터를 주고 받는 법을 알아봤는데 이번에는 전역으로 아무 DAGRun에서 데이터를 공유하고 사용하는 방법에 대해 알아 보겠습니다. Variables라는 기능인데 web ui 에서 admin- variables 라는 탭이 있습니다. variables 생성 방법 variables를 클릭하면 아래와 같이 구성할 수 있는 화면이 나옴 + 버튼을 눌러 생성가능, 아래와 같이 두개를 구성해 줄수 있음 그냥 text도 되고, json도 가능 save를 눌러 저장한 모습 구성 방법 불러오는 방법은 Variable 객체를 import 하고 key-value를 이용하여 값을 가져와 사용하면 된다. json은 json에 맞게 deserialize하여 사용한다. Vari..
-
[Airflow] Connection을 이용하여 외부 서비스와 연동하기데이터 엔지니어링/AirFlow 2022. 6. 26. 20:15
SimpleHttpOperator를 이용한 방법에 대해 알아보자, SimpleHttpOperator를 사용하기 위해서는 provider를 설치해야한다. provider airflow 1.x 버전에서는 내장 되어있었지만 2.x 버전으로 오면서 설치해서 사용하여야한다. Http provider 설치 pip install apache-airflow-providers-http Http provider 사용하기 from airflow.providers.http.operators.http import SimpleHttpOperator # 기본적으로 Http호출하는 Operator extract_gorest = SimpleHttpOperator( task_id='extract_gorest', http_conn_id=..