-
[Airflow] hook이란? (hook을 사용하여 mysql to csv 패턴 실습)데이터 엔지니어링/AirFlow 2022. 9. 4. 21:38반응형
Hook이란?
Hook이란? 미리 정의 된 것이 아니라 커스텀하게 로직을 구현하고 싶을 때나 외부 시스템의 푸시 작업 흐름을 만들고 싶을 때도 사용한다. 보통pythonOperator로 작업을 실행 시키고 python 함수 코드 안에서 다른 로직을 쉽게 구현할 수 있게 해주기 위해 사용한다. pip로 다른 플러그인을 다운받아 사용도 하여 굳이 hook을 사용할 필요는 없다. 그냥 쉽게 접근이 가능하게 하는 용도 외에는 없는 것 같다.
Mysql 커넥터 만들기
실습을 위해 커넥터 먼저 만들자 mysql 깔려 있다고 가정하고 하는 것이다.
+ 버튼을 눌러 커넥터 만들기
Provider 설치
airflow가 깔려 있는 서버에서 아래 명령어를 실행하면 provider가 깔린다. 미리 mysql용으로 airflow를 깔았다면 설치 하지 않아도 된다. 홈페이지에 provider를 pip로 설치하는 법이 나와있으니 참고하자 provider를 설치하면 hook도 같이 설치가 완료 된다.
pip install apache-airflow-providers-mysql
DAG 작성
import csv import logging from datetime import datetime, timedelta from tempfile import NamedTemporaryFile from airflow import DAG from airflow.operators.python import PythonOperator from airflow.providers.mysql.operators.mysql import MySqlOperator, MySqlHook default_args = { 'owner': 'hsjoo', 'retries': 5, 'retry_delay': timedelta(minutes=10) } def mysql_hook(): # hook 내용 작성 logging.info("Started mysql_hook") hook = MySqlHook.get_hook(conn_id="local-mysql") # 미리 정의한 mysql connection 적용 conn = hook.get_conn() # connection 하기 cursor = conn.cursor() # cursor객체 만들기 cursor.execute("use testDB") # sql문 수행 cursor.execute("select * from user") # csv파일 만들기 with open("dags/get_data_mysql_to_csv.txt", "w") as f: csv_writer = csv.writer(f) csv_writer.writerow([i[0] for i in cursor.description]) csv_writer.writerows(cursor) f.flush() cursor.close() conn.close() logging.info("Saved data in text file: %s", "dags/get_data_mysql_hook.txt") with DAG( dag_id="ex_mysql_to_csv", default_args=default_args, start_date=datetime(2022, 4, 30), schedule_interval='@once' ) as dag: # task1 = PythonOperator( task_id="mysql_to_csv", python_callable=mysql_hook ) task1
불러올 데이터 확인
mysql에 들어있는 데이터는 아래와 같다.
실행 결과 확인 하기
아래 그림과 같이 잘 작성 된것을 확인 할 수 있다.
마치며
굳이 hook을 사용할 필요는 없고 pymysql을 다운받아 pythonoperator를 로직으로 정의하여 사용하여도 된다. 그런데 pythonoperator로 하게 되면 excute query부분과 Connection하는 부분을 자신이 정의하여 사용해야 한다. 솔직히 별 차이는 없다. 몇줄 줄일 뿐
반응형'데이터 엔지니어링 > AirFlow' 카테고리의 다른 글
[Airflow] User_defined_macros를 이용하여 jinja template의 사용자 정의 변수 활용하기 (1) 2023.02.12 [Airflow] jinja_template을 활용한 날짜 동적 변수 활용 하는 법(동적 datetime, ds변수 UTC안되는 것 해결법) (0) 2022.09.18 [Airflow] airflow설치 방법(with ubuntu, mysql) (0) 2022.09.04 [Airflow]Catch up, Backfill 알아보기 (0) 2022.09.04 [Airflow] Executor의 종류와 사용가이드라인 알아보기(Sequential, Local, Celery, Kubernetes) (0) 2022.08.18