전체 글
-
[Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기)데이터 엔지니어링/AirFlow 2022. 6. 22. 15:53
목표 병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경) 의존성을 LIST형식으로 구현하는 방법 알기 Graph를 이용하여 병렬처리 되었는지 확인하는 방법 알아보기 병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경) airflow에서 기본 제공하는 db는 sqlite인데 sqlite로 진행 할 시에 병렬처리가 되지 않는다. 병렬처리를 하기 위해서는 mysql이나 postgresql을 이용하여 한다. 그래서 airflow db를 변경하는 방법에 대해 알아보는 좋을 것 같다. 필자는 docker로 했지만, local에 다운받아서 진행해도 무방하다. docker image 받기 docker pull postgres:latest docker container..
-
[Airflow 실습] 여러개의 Task 한번에 돌리기데이터 엔지니어링/AirFlow 2022. 6. 22. 15:48
목표 DAG 생성 의존성을 연결하여 Task를 동시에 호출 할 수 있는지 파악 graph를 활용하여 동시에 작업 실행 되는지 확인 완성된 flow 완성된 소스 from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators.python import PythonOperator from pendulum.tz.timezone import Timezone default_args = { 'owner' : 'eddie', 'retries' : 1, 'retry_delay': timedelta(minutes=1) } with DAG( dag_id='ex_multi_tasks', des..
-
[Airflow 실습]가장 기본 DAG 만들어 실행해보기데이터 엔지니어링/AirFlow 2022. 6. 22. 15:46
목표 DAG를 하나 생성하여 실행하고 hello world가 찍힌 것을 확인하기 완성 사진 코딩 순서 timezone 설정 default_args 설정 DAG 설정 작성 task 작성 Dependencies 설정 완성 코드 import pendulum from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator # timezone 한국시간으로 변경 kst = pendulum.timezone("Asia/Seoul") # 기본 args 생성 default_arg..
-
[Airflow 기본 내용] DAG 작성 심화데이터 엔지니어링/AirFlow 2022. 6. 22. 10:37
DAG 선언 3가지 방법 기본 DAG 선언 with 문 with DAG( dag_id="my_dag_name", default_args={'owner':'airflow'}, start_date=pendulum.datetime(2021,1,1, tz="UTC"), schedule_interval="@daily", catchup=False, ) as dag: op = EmptyOperator(task_id="task") 표준 생성자 my_dag = DAG( dag_id="my_dag_name", default_args={'owner':'airflow'}, start_date=pendulum.datetime(2021,1,1, tz="UTC"), schedule_interval="@daily", catchup=..
-
[Airflow 기본 내용] 기본적인 DAG 작성법데이터 엔지니어링/AirFlow 2022. 6. 22. 08:13
기본 DAG 작성법 순서 module 추가 Defulat arguments 추가 DAG 작성(id, args, schedul_interval 등) Task 정의(필요에 따라 doc_md 작성 및 추가) Dependencies 연결 Module 추가하기 # airflow DAG 모듈 from airflow import DAG # 날짜 관리 모듈 from datetime import datetime, timedelta # 날짜 시간 간편하게 관리할 수 있게 도와주는 모듈 import pendulum # 사용할 operator import from airflow.operators.python_operator import PythonOperator Default Arguments default_args = { ..
-
[Airflow 기본 내용] Operator, task란?데이터 엔지니어링/AirFlow 2022. 6. 22. 08:09
Operator의 특성 single task를 정의 해야한다. 멱등성을 유지해야한다. 다른 작업간에 겹침 현상을 제거하여 오류제거를 위함이다. 자동 재시도를 작성하여 자동으로 재시도 해줄수 있다. 하나의 task는 하나의 Operator 클래스로 구성되어 생성이 되어진다. Operator 종류 Operator는 어떤 작업을 할지 정의 해주는 것이기 때문에 많은 종류의 Operator들이 있다. BashOperators, pythonOperator, EmailOperator, MySqlOperator,SqliteOperator, PostgreOperator 등등 아래 링크에서 확인 가능하다 이것을 보고 DAG를 작성하면 된다. airflow.operators - Airflow Documentation Op..
-
[Airflow 기본 내용] Airflow란? 구성요소, 구동원리, 개념, 용어 정리데이터 엔지니어링/AirFlow 2022. 6. 21. 08:22
Airflow란? airflow는 airbnb에서 만든 workflow를 만드는 오픈소스 프로젝트이다. python 코드로 워크플로우(workflow)를 작성하고, 스케쥴링, 모니터링 하는 플랫폼임. 스케쥴링을 하고 모니터링을 코드로 작성하기 때문에 더 세분화 할 수 있으므로, 더 정교화된 파이프라인을 구성 할 수 있음. 예약된 워크플로를 트리거 하고 실행 할 실행기에 Task을 제출하는 두가지를 모두 처리하는 스케줄러이다. Airflow 동작 원리 구성요소 User interface / Web Server : airflow는 설치 만으로 web server를 구성 할 수 있음 DAG Directory: 작업스케쥴을 만들어주는 스케쥴링 단위를 작성한 DAG를 보관하는 장소 python 코드로 작성한 DA..
-
[Airflow 설치방법] Airflow Conda 환경에서의 설치 방법데이터 엔지니어링/AirFlow 2022. 6. 21. 08:16
airflow 설치하기 (conda 환경 하에서 진행) $ pip install apache-airflow ~~~~~ 설치 관련 log 출력 $ 버전으로 설치하고 싶다면 $ pip install apache-airflow=={airflow-version} $ which airflow /Users/planit/opt/anaconda3/bin/airflow 기본 내용 확인 # airflow 에서 제공하는 명령어 확인 $ airflow usage: airflow [-h] GROUP_OR_COMMAND ... positional arguments: GROUP_OR_COMMAND Groups: celery Celery components ... Commands: cheat-sheet Display cheat sh..