ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Kafka] ksqldb 실습환경 구축해보기
    데이터 엔지니어링/Kafka 2022. 7. 31. 16:50
    반응형

    KSQLDB 실습 환경

    KsqlDB를 실습하면서 배워보는 시간을 가져볼 예정입니다. 먼저 stream이 있어야 하기 때문에 stream을 하나 만들어 실습을 진행하겠습니다. 먼저 진행하기에 앞서 제가 사용하게 될 아키텍처를 설명 하자면 아래와 같습니다.

    Kafka클러스터를 구축 하였고 Broker서버 1개 order라는 토픽을 만들어 메세지를 주고 받겠습니다. KsqlDB는 클러스터 외에 서버를 구축해야 하므로 외부에 설치를 진행하도록 하고 Ksql-cli로 KsqlDB에 접속하여 조작을 할것입니다.

     

    서버를 만들어서 사용하면 좋겠지만, 여건이 안되므로 Docker파일을 작성하여 진행하였습니다.  위의 그림처럼 만들기 위해 설정을 해놨습니다. 아래를 복사해서 사용하시면 되겠습니다.

    version: '3'
    
    services:
      zookeeper:
        image: confluentinc/cp-zookeeper:7.2.0
        hostname: zookeeper
        container_name: zookeeper
        ports:
          - "2181:2181"
        environment:
          ZOOKEEPER_CLIENT_PORT: 2181
          ZOOKEEPER_TICK_TIME: 2000
        volumes:
          - ./data/zookeeper/data:/data
          - ./data/zookeeper/datalog:/datalogco
    
      broker:
        image: confluentinc/cp-kafka:7.2.0
        hostname: broker
        container_name: broker
        depends_on:
          - zookeeper
        ports:
          - "9092:9092"
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
          # INTERNAL은 안에서 사용하는 것 (cluster끼리), EXTERNAL은 외부에서 들어는 것 
          # INTERNAL은 ksqldb, kafdrop 등의 서버와 맞아야 함
          # EXTERNAL는 위에 설정한 port와 맞아야 함
          # 설정에 주의 하자
          KAFKA_ADVERTISED_LISTENERS: INTERNAL://broker:29092, EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT 
          KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL # INTERNAL 주소로 브로커 서버 설정
          KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
          KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
          KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
        volumes:
          - ./data/broker/data:/tmp/kafka-logs
    
      ksqldb-server:
        image: confluentinc/ksqldb-server:0.26.0
        hostname: ksqldb-server
        container_name: ksqldb-server
        depends_on:
          - broker
        ports:
          - "8088:8088"
        environment:
          KSQL_LISTENERS: http://0.0.0.0:8088
          KSQL_BOOTSTRAP_SERVERS: broker:29092
          KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
          KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
    
      ksqldb-cli:
        image: confluentinc/ksqldb-cli:0.26.0
        container_name: ksqldb-cli
        depends_on:
          - broker
          - ksqldb-server
        entrypoint: /bin/sh
        tty: true
    
      kafdrop:
          image: obsidiandynamics/kafdrop
          container_name: kafdrop
          restart: "no"
          ports: 
            - "9000:9000"
          environment:
            KAFKA_BROKER_CONNECT: "broker:29092"
          depends_on:
            - broker

     

    그리고 제가 사용할 Model들인데 이것에 대해 알아두면 프로세싱 할때 좋을 것같아 요약해 보겠습니다. 먼저 맥도날드내용을 정의 하여 모델링을 진행 하였고 등급이 각각 다른 3명이 있고 이 고객들이 메뉴를 주문하는 모델로 진행 하겠습니다. 등급에 따라 할인율이 다르므로 가격이 다르게 측정 될것입니다. 그러니깐 가져다가 group by 나 sql문을 생각해서 테스트 할 때 작성하면 될 것같습니다. 모델과 코드는 아래와 같습니다.

    Producer.py

    """
    @author : 주현석
    @date : 2022.07.18
    @description : Order에 dummy 데이터 보내는 producer입니다.
    """
    
    from kafka import KafkaProducer 
    import datetime, pytz, time, random, json 
    
    TOPIC_NAME = "order"
    brokers = ["localhost:9092"]
    producer = KafkaProducer(bootstrap_servers=brokers)
    
    def get_time_date():
        utc_now = pytz.utc.localize(datetime.datetime.utcnow())
        kst_now = utc_now.astimezone(pytz.timezone("Asia/Seoul"))
        d = kst_now.strftime("%m/%d/%Y")
        t = kst_now.strftime("%H:%M:%S")
        return d, t
    
    def generate_order_data():
        customer_type = random.choice([0, 1, 2])
        category = random.choice([0, 1, 2])
        menu = random.choice([0, 1, 2])
    
        dic_user_id = [1, 2, 3]
        dic_customer_type = ["BRONZE", "SILVER", "PLATINUM"]
    
        dic_menu = [
            ["Classic Burger", "Bigmac Burger", "Sanghai Burger"],
            ["Franch Fries","Coleslaw", "Cheese Sticks"],
            ["McFlurry", "Strawberry Cone", "vanilla shake"]
        ]
    
        dic_price = [
            [5000, 6000, 7000],
            [3000, 2000, 2000],
            [4000, 2000, 4000]
        ]
    
        dic_discount = [1.0, 0.9, 0.8]
    
        choiced_user_id = dic_user_id[customer_type]
        choiced_customer_type = dic_customer_type[customer_type]
        choiced_menu = dic_menu[category][menu]
        choiced_price = dic_price[category][menu] * dic_discount[customer_type]
    
        return choiced_user_id, choiced_customer_type, choiced_menu, choiced_price
    
    
    while True:
        d, t = get_time_date()
        choiced_user_id, choiced_customer_type, choiced_menu, choiced_price  = generate_order_data()
        new_data = {
            "DATE" : d,
            "TIME" : t,
            "USER_ID" : choiced_user_id,
            "CUSTOMER_TYPE" : choiced_customer_type,
            "MENU" : choiced_menu,
            "PRICE" : choiced_price,
        }
    
        producer.send(TOPIC_NAME, json.dumps(new_data).encode("utf-8"))
        print(new_data)
        time.sleep(1)

     

    Docker환경 만들기 

    먼저 도커 컨테이너를 만들어 주시고

    docker-compose up -d

    Topic만들기 

    order라는 토픽을 만들 예정이니 docker에 명령어를 날려 order를 만들어 줍니다.

    docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic order

    여기까지 하면 실습 환경 준비는 끝났습니다. 이제 stream과 table등을 해보면서 실습을 진행 할 예정입니다.

     

     

    소스 코드는 아래 주소에 나와있으니 clone받아 사용하셔도 됩니다.

     

    GitHub - hyunseokjoo/kafka_sample_code

    Contribute to hyunseokjoo/kafka_sample_code development by creating an account on GitHub.

    github.com

     

    반응형

    댓글

Designed by Tistory.