ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Apache NIFI] ExecuteStreamCommand Processor 알아보기(Flow 도중 Bash, Script NIFI에서 실행하기)
    데이터 엔지니어링/Nifi 2022. 5. 7. 14:45
    반응형

    ExecuteStreamCommand?

    • Stream(Flow가 실행하는 중)도중에 Command(bash shell)라인 실행하는 프로세서임
    • nifi에서 해결할 수 없는 python, go , javascript등을 실행하게 도와주는 역할을 함 
    • 기본 적으로 실행하는 명령어는 bash로 되어있는데 #!/usr/bin/python3 ( = bash sheBang )를 이용하여 bash 내부가 python, go등으로 실행 할 수 있도록 환경 변경이 가능하기 때문
    • Processor를 생성을 하면 아래와 같은 모양을 가지며, Stream 도중에 발생하는 프로세스 이기 때문에 아래와 같이 꼭 선행 Processor relationship이 있어야 한다.

     

    Property 설정 하기

    • Command Arguments로 커맨드 라인에 system인자를 넘길 수 있다
    • Argument Delimiter는 Command Arguments에 설정한 인자를 ;라는 구분자로 나뉘어 넣겟다는 것이다 1;2;3;4 이렇게 쓰면 인자는 1 2 3 4 총 4개가 된다.
    • Command Path는 실행하고 싶은 bash파일 이 있는 주소를 작성한다. 저자는 log를 남기는 걸 할려고 하여 log.py라는 python파일을 만들고 적당한 위치에 옮겼다.
    • 즉, 아래와 같이 적으면 아래와 같음 bash 명령어가 실행 되는 것과 똑같다.
    $ bash yourDirPath/log.py 1 2 3 4

    • 그리고 이렇게 사용하고 싶다면 권한 문제 때문에 권한을 적용해 주어야 하는데 chmod +x 주소/파일명 를 이용한다. chmod를 모른다면 따로 공부하여 권한을 주자

     

     

    python Script작성하기 

    #!/usr/bin/python3
    
    """
    2022-05-04
    python Script created by eddie
    인자 
    1 : date - format('yyyy-MM-dd') - 파일 명으로 사용
    2 : date - format('yyyy-MM-dd HH:mm:ss') - 해당 내용 실행 일시로 사용
    3 : TAG - ([INFO], [WARN], [ERROR], [SQL] 등 편한대로 구성하여 넘겨줌) - TAG로 사용 
    4 : MESSAGE - 실행 쿼리나 log남길 내용  - 해당 중요 내용 남김
    
    Log 출력 예시 
    [INFO][2022-05-03 15:28:22] - AI downSampling
    [SQL][2022-05-03 15:28:22] - SELECT * FROM mysql.patient
    """
    
    import sys 
    import os
    
    #들어온 내용 argument 정리 
    today = str(sys.argv[1])
    todayWithSecond = str(sys.argv[2])
    tag = str(sys.argv[3])
    message = str(sys.argv[4])
    
    print(today+todayWithSecond+tag+message)
    
    # 파일 경로 생성 
    # 해당 위치는 nifi 부모 디렉토리로 워킹 디렉토리 출력이 됨
    # 아래 와 깉이 설정하면 nifi설치폴더/scripts/LOG/ 폴더 알에 working dir 설정
    pwd = os.getcwd()
    fileDir = pwd + "/scripts/LOG/"
    fileName = today + ".txt"
    fileFullName = fileDir + fileName
    
    # 디렉토리 있는지 체크
    if not os.path.exists(fileDir):
        os.makedirs(fileDir)
    
    # 파일 생성되었는지 체크
    writeType = "w"
    if not os.path.isfile(fileFullName):   
        writeType = "w"
    else:
        writeType = "a"
    
    # 파일 생성 및 어펜드
    f = open(fileFullName , writeType)
    f.write("["+ tag + "]" + "["+ todayWithSecond + "]" + "-" + message + "\n")
    print(  "["+ tag + "]" + "["+ todayWithSecond + "]" + "-" + message)
    f.close
    • 위와 같이 작성 하면 LOG폴더 아래에 20220503.txt이렇게 날짜 별로 진행이 가능 하게 된다. 넘겨주는 파라미터 인자는 nifi에서 함수나 내장 함수로 작성하여 넘겨주었다.

     

     

    이런 일련의 과정으로 nifi에서 script를 실행할 수 있다. 끝~~~

     

    반응형

    댓글

Designed by Tistory.