데이터 엔지니어링
-
[Airflow 실습] 여러개의 Task 한번에 돌리기데이터 엔지니어링/AirFlow 2022. 6. 22. 15:48
목표 DAG 생성 의존성을 연결하여 Task를 동시에 호출 할 수 있는지 파악 graph를 활용하여 동시에 작업 실행 되는지 확인 완성된 flow 완성된 소스 from datetime import datetime, timedelta from time import sleep from airflow import DAG from airflow.operators.python import PythonOperator from pendulum.tz.timezone import Timezone default_args = { 'owner' : 'eddie', 'retries' : 1, 'retry_delay': timedelta(minutes=1) } with DAG( dag_id='ex_multi_tasks', des..
-
[Airflow 실습]가장 기본 DAG 만들어 실행해보기데이터 엔지니어링/AirFlow 2022. 6. 22. 15:46
목표 DAG를 하나 생성하여 실행하고 hello world가 찍힌 것을 확인하기 완성 사진 코딩 순서 timezone 설정 default_args 설정 DAG 설정 작성 task 작성 Dependencies 설정 완성 코드 import pendulum from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator # timezone 한국시간으로 변경 kst = pendulum.timezone("Asia/Seoul") # 기본 args 생성 default_arg..
-
[Airflow 기본 내용] DAG 작성 심화데이터 엔지니어링/AirFlow 2022. 6. 22. 10:37
DAG 선언 3가지 방법 기본 DAG 선언 with 문 with DAG( dag_id="my_dag_name", default_args={'owner':'airflow'}, start_date=pendulum.datetime(2021,1,1, tz="UTC"), schedule_interval="@daily", catchup=False, ) as dag: op = EmptyOperator(task_id="task") 표준 생성자 my_dag = DAG( dag_id="my_dag_name", default_args={'owner':'airflow'}, start_date=pendulum.datetime(2021,1,1, tz="UTC"), schedule_interval="@daily", catchup=..
-
[Airflow 기본 내용] 기본적인 DAG 작성법데이터 엔지니어링/AirFlow 2022. 6. 22. 08:13
기본 DAG 작성법 순서 module 추가 Defulat arguments 추가 DAG 작성(id, args, schedul_interval 등) Task 정의(필요에 따라 doc_md 작성 및 추가) Dependencies 연결 Module 추가하기 # airflow DAG 모듈 from airflow import DAG # 날짜 관리 모듈 from datetime import datetime, timedelta # 날짜 시간 간편하게 관리할 수 있게 도와주는 모듈 import pendulum # 사용할 operator import from airflow.operators.python_operator import PythonOperator Default Arguments default_args = { ..
-
[Airflow 기본 내용] Operator, task란?데이터 엔지니어링/AirFlow 2022. 6. 22. 08:09
Operator의 특성 single task를 정의 해야한다. 멱등성을 유지해야한다. 다른 작업간에 겹침 현상을 제거하여 오류제거를 위함이다. 자동 재시도를 작성하여 자동으로 재시도 해줄수 있다. 하나의 task는 하나의 Operator 클래스로 구성되어 생성이 되어진다. Operator 종류 Operator는 어떤 작업을 할지 정의 해주는 것이기 때문에 많은 종류의 Operator들이 있다. BashOperators, pythonOperator, EmailOperator, MySqlOperator,SqliteOperator, PostgreOperator 등등 아래 링크에서 확인 가능하다 이것을 보고 DAG를 작성하면 된다. airflow.operators - Airflow Documentation Op..
-
[Airflow 기본 내용] Airflow란? 구성요소, 구동원리, 개념, 용어 정리데이터 엔지니어링/AirFlow 2022. 6. 21. 08:22
Airflow란? airflow는 airbnb에서 만든 workflow를 만드는 오픈소스 프로젝트이다. python 코드로 워크플로우(workflow)를 작성하고, 스케쥴링, 모니터링 하는 플랫폼임. 스케쥴링을 하고 모니터링을 코드로 작성하기 때문에 더 세분화 할 수 있으므로, 더 정교화된 파이프라인을 구성 할 수 있음. 예약된 워크플로를 트리거 하고 실행 할 실행기에 Task을 제출하는 두가지를 모두 처리하는 스케줄러이다. Airflow 동작 원리 구성요소 User interface / Web Server : airflow는 설치 만으로 web server를 구성 할 수 있음 DAG Directory: 작업스케쥴을 만들어주는 스케쥴링 단위를 작성한 DAG를 보관하는 장소 python 코드로 작성한 DA..
-
[Airflow 설치방법] Airflow Conda 환경에서의 설치 방법데이터 엔지니어링/AirFlow 2022. 6. 21. 08:16
airflow 설치하기 (conda 환경 하에서 진행) $ pip install apache-airflow ~~~~~ 설치 관련 log 출력 $ 버전으로 설치하고 싶다면 $ pip install apache-airflow=={airflow-version} $ which airflow /Users/planit/opt/anaconda3/bin/airflow 기본 내용 확인 # airflow 에서 제공하는 명령어 확인 $ airflow usage: airflow [-h] GROUP_OR_COMMAND ... positional arguments: GROUP_OR_COMMAND Groups: celery Celery components ... Commands: cheat-sheet Display cheat sh..
-
[Spark]RDD 이해하기데이터 엔지니어링/Spark 2022. 6. 19. 23:23
데이터 연산 방식 이해하기 RDD를 들어가기 전에 먼저 데이터 연산 방식을 이해 해야한다. 이를 이해하면 RDD가 왜 나왔는지 Spark에서는 RDD가 왜 사용돼는지 간접적으로 이해 할 수 있다. 데이터 연산 방식은 병렬처리 방식 분산 처리 방식이 있다. 병렬(Data-Parallel) 처리 방식 데이터를 여러개로 쪼갠다. 여러 쓰레드에 쪼갠 데이터를 넣어 task를 적용한다 각자 만든 결과값을 반환 받아 합친다. 위와 같은 일렬의 과정으로 결과값을 얻는다. 분산처리 방식(Distributed Data-Parallel) 데이터를 여러개로 쪼개서 여러 노드로 보낸다. 여러 노드에서 각자 독립적인 task를 적용한다. 각자 만든 결과값을 합치는 과정을 한다. 위와 같은 일렬의 과정을 통해 결과값을 얻어 반환..