데이터 엔지니어링
-
[Airflow] Connection을 이용하여 외부 서비스와 연동하기데이터 엔지니어링/AirFlow 2022. 6. 26. 20:15
SimpleHttpOperator를 이용한 방법에 대해 알아보자, SimpleHttpOperator를 사용하기 위해서는 provider를 설치해야한다. provider airflow 1.x 버전에서는 내장 되어있었지만 2.x 버전으로 오면서 설치해서 사용하여야한다. Http provider 설치 pip install apache-airflow-providers-http Http provider 사용하기 from airflow.providers.http.operators.http import SimpleHttpOperator # 기본적으로 Http호출하는 Operator extract_gorest = SimpleHttpOperator( task_id='extract_gorest', http_conn_id=..
-
[Airflow]Pool 을 이용하여 메모리 사용량 제한하기데이터 엔지니어링/AirFlow 2022. 6. 26. 20:11
Airflow 배치 스케쥴을 만들 때 또 좋은 점이 있는데 바로 pool을 이용하여 메모리를 제한 할 수 있다. DAG를 구성할 때 무거운 작업은 메모리를 많이 할당하여 빨리 처리하거나 아니면 너무 무거워 뻑나는 경우를 대비하여 슬롯을 조금 할당 할 수도 있다. Pool설정 방법 task를 설정할 때 pool이라는 속성을 정의 하면 pool을 지정해 줄수 있다. pool_slot을 이용하여 슬롯 숫자도 정희 줄 수 있다. def dump(interval): sleep(interval) ask_heavy = PythonOperator( task_id='task_heavy', python_callable=dump, op_args=[500], pool='single_pool', pool_slot=2, ) Po..
-
[Airflow]Xcom을 이용한 task 간 데이터 주고 받기데이터 엔지니어링/AirFlow 2022. 6. 26. 20:08
XCom이란 cross communcation의 약자로 airflow task간 데이터를 주고 받을 일이 있는데 이 부분을 해결하기 위해 나왔다. Task에서 연산이나 필요한 데이터가 각각의 task마다 변경이 될때가 있는데 이런 경우에 데이터를 처리하기 위해 주로 사용된다. Airflow는 Task Instance간 데이터를 공유하지 않기 때문에 XCom을 이용하여 데이터를 주고 받아야 한다. XCom은 DAG Run 내에서만 존재하고 다른 DAG Run과는 공유하지 않는다. Dataframe과 같은 많은 양의 데이터는 지원하지 않는다. PythonOperator를 사용하면 return값은 자동적으로 XCom변수로 등록된다. XCom 사용하는 몇가지 방법 PythonOperator return값을 이용..
-
[Airflow 실습] Trigger를 이용하여 다른 DAG를 실행하기데이터 엔지니어링/AirFlow 2022. 6. 24. 11:29
목표 DAG 작업 진행 시 Trigger를 사용하여 다른 DAG를 호출하는 방법 알아보기 Trigger 연결 되어있는지 확인 하는 방법 알아보기 실행법 알아보기 완성 사진 Trigger를 사용하려면 TriggerDagRunOperator를 사용하여야 한다. TriggerDagRunOperator를 사용하면 DAG Graph에 아래와 같이 왼쪽 상단에 TriggerDagRunOperator를 사용했다고 나오고 하나의 task처럼 표시 된다. 완성 코드 먼저 trigger를 사용하려면 여분의 DAG를 하나 더 만들어야 합니다. DAG는 아래와 같이 여분의 DAG로 만들었습니다. from time import sleep from datetime import datetime, timedelta from airf..
-
[Airflow 실습] Task group으로 ui 관리하기데이터 엔지니어링/AirFlow 2022. 6. 23. 20:54
목표 group 목적 알아보기 task들을 group으로 묶는법에 대해 알아보기 ui에서 group으로 표시 되어있는 부분 확인하기 group의 목적 알아보기 group을 사용하는 이유는 task의 실행 계획 과는 상관없이 ui에 대한 내용만 정리 하기 위해 사용합니다. 모듈로 빼서 group으로 관리하면 좀 관리나 재사용성이 좋아질 것 같긴합니다. 완성 사진 맨처음 group_1 으로 구성하였으며 그 안에 또 group을 넣어서 전체적으로 group이 두개 group안에 group으로 구성 하였습니다. task들을 group으로 묶는법에 대해 알아보기 먼저 utils task_group이 있으니 확인 하시고 import하면 되겠습니다. Task를 구성하는 것과 같이Task_id가 web ui에서 보이는..
-
[Airflow 실습] SubDAG 작성하여 DAG관리하기데이터 엔지니어링/AirFlow 2022. 6. 23. 15:56
목표 SubDAG 개념 알아보기 SubDAG 작성법 알아보기 SubDAG 확인 하는법 알아보기 SubDAG 개념 알아보기 DAG안에 또다른 DAG를 넣을 수 있다. 그게 바로 SubDAG이다. DAG를 만들어서 관리를 하다보면 DAG사이즈가 너무 커져서 감당이 안되거나, 너무 가독성이 없고 관리도 하기 힘들 때가 있는데 그럴때 subDAG를 사용해보는걸 고려해봐도 좋다. SubDAG 를 사용하는 이유 모듈처럼 관리가 용이해짐 다른 곳에서도 불러와 사용할 수 있기 때문에 재사용성이 좋음 ui에서 하나의 task처럼 보이기 때문에 ui적으로 보기 좋음 SubDAG 작성법 알아보기 SubDAG는 작성을 먼저 메인 DAG에 SubDagOperator를 이용하여 DAG 작성 from airflow.operator..
-
[Airflow 실습] Depends_on_past, wait_for_downstream을 이용하여 직전 dag의 상태 의존성 걸어 실행 계획 처리하기데이터 엔지니어링/AirFlow 2022. 6. 23. 13:15
목표 DAGRun 보는 법 알아보기 Depends_on_past, wait_for_downstream의 의미 및 사용법 알아보기 두개의 속성 차이점 알아보기 실행해보면서 익혀보기 DagRun 보는 법 알아보기 DAGRun이란 DAG들의 실행 내역이라고 보면 된다. 아래 그림을 보면 현광색은 실행 중인 상태의 DAG를 보여주며, 초록색은 성공 분홍색은 skip등을 DAG한 실당 한줄씩 보여준다. web ui에서 막대 그래프로 확인 가능하다 막대 그래프 형식은 airflow==2.3.2(2022.06 최신 버전)에서는 바뀌어 있었다. Depends_on_past, wait_for_downstream의 의미 및 사용법 알아보기 DAGRun중에서 이전작업에 대해 의존성을 부여하여야 하는 경우가 있다. 예를 들어..
-
[Airflow 실습] Task 여러개 병렬처리 하기(db sqlite -> postgre로 변경하기, Gantt를 이용하여 병렬처리 확인하기)데이터 엔지니어링/AirFlow 2022. 6. 22. 15:53
목표 병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경) 의존성을 LIST형식으로 구현하는 방법 알기 Graph를 이용하여 병렬처리 되었는지 확인하는 방법 알아보기 병렬처리를 위한 사전작업 알아보기(postgres준비, config파일 변경) airflow에서 기본 제공하는 db는 sqlite인데 sqlite로 진행 할 시에 병렬처리가 되지 않는다. 병렬처리를 하기 위해서는 mysql이나 postgresql을 이용하여 한다. 그래서 airflow db를 변경하는 방법에 대해 알아보는 좋을 것 같다. 필자는 docker로 했지만, local에 다운받아서 진행해도 무방하다. docker image 받기 docker pull postgres:latest docker container..