-
[Kafka] ksqldb 실습환경 구축해보기데이터 엔지니어링/Kafka 2022. 7. 31. 16:50반응형
KSQLDB 실습 환경
KsqlDB를 실습하면서 배워보는 시간을 가져볼 예정입니다. 먼저 stream이 있어야 하기 때문에 stream을 하나 만들어 실습을 진행하겠습니다. 먼저 진행하기에 앞서 제가 사용하게 될 아키텍처를 설명 하자면 아래와 같습니다.
Kafka클러스터를 구축 하였고 Broker서버 1개 order라는 토픽을 만들어 메세지를 주고 받겠습니다. KsqlDB는 클러스터 외에 서버를 구축해야 하므로 외부에 설치를 진행하도록 하고 Ksql-cli로 KsqlDB에 접속하여 조작을 할것입니다.
서버를 만들어서 사용하면 좋겠지만, 여건이 안되므로 Docker파일을 작성하여 진행하였습니다. 위의 그림처럼 만들기 위해 설정을 해놨습니다. 아래를 복사해서 사용하시면 되겠습니다.
version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:7.2.0 hostname: zookeeper container_name: zookeeper ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 volumes: - ./data/zookeeper/data:/data - ./data/zookeeper/datalog:/datalogco broker: image: confluentinc/cp-kafka:7.2.0 hostname: broker container_name: broker depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' # INTERNAL은 안에서 사용하는 것 (cluster끼리), EXTERNAL은 외부에서 들어는 것 # INTERNAL은 ksqldb, kafdrop 등의 서버와 맞아야 함 # EXTERNAL는 위에 설정한 port와 맞아야 함 # 설정에 주의 하자 KAFKA_ADVERTISED_LISTENERS: INTERNAL://broker:29092, EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT, EXTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL # INTERNAL 주소로 브로커 서버 설정 KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 volumes: - ./data/broker/data:/tmp/kafka-logs ksqldb-server: image: confluentinc/ksqldb-server:0.26.0 hostname: ksqldb-server container_name: ksqldb-server depends_on: - broker ports: - "8088:8088" environment: KSQL_LISTENERS: http://0.0.0.0:8088 KSQL_BOOTSTRAP_SERVERS: broker:29092 KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true" KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true" ksqldb-cli: image: confluentinc/ksqldb-cli:0.26.0 container_name: ksqldb-cli depends_on: - broker - ksqldb-server entrypoint: /bin/sh tty: true kafdrop: image: obsidiandynamics/kafdrop container_name: kafdrop restart: "no" ports: - "9000:9000" environment: KAFKA_BROKER_CONNECT: "broker:29092" depends_on: - broker
그리고 제가 사용할 Model들인데 이것에 대해 알아두면 프로세싱 할때 좋을 것같아 요약해 보겠습니다. 먼저 맥도날드내용을 정의 하여 모델링을 진행 하였고 등급이 각각 다른 3명이 있고 이 고객들이 메뉴를 주문하는 모델로 진행 하겠습니다. 등급에 따라 할인율이 다르므로 가격이 다르게 측정 될것입니다. 그러니깐 가져다가 group by 나 sql문을 생각해서 테스트 할 때 작성하면 될 것같습니다. 모델과 코드는 아래와 같습니다.
Producer.py
""" @author : 주현석 @date : 2022.07.18 @description : Order에 dummy 데이터 보내는 producer입니다. """ from kafka import KafkaProducer import datetime, pytz, time, random, json TOPIC_NAME = "order" brokers = ["localhost:9092"] producer = KafkaProducer(bootstrap_servers=brokers) def get_time_date(): utc_now = pytz.utc.localize(datetime.datetime.utcnow()) kst_now = utc_now.astimezone(pytz.timezone("Asia/Seoul")) d = kst_now.strftime("%m/%d/%Y") t = kst_now.strftime("%H:%M:%S") return d, t def generate_order_data(): customer_type = random.choice([0, 1, 2]) category = random.choice([0, 1, 2]) menu = random.choice([0, 1, 2]) dic_user_id = [1, 2, 3] dic_customer_type = ["BRONZE", "SILVER", "PLATINUM"] dic_menu = [ ["Classic Burger", "Bigmac Burger", "Sanghai Burger"], ["Franch Fries","Coleslaw", "Cheese Sticks"], ["McFlurry", "Strawberry Cone", "vanilla shake"] ] dic_price = [ [5000, 6000, 7000], [3000, 2000, 2000], [4000, 2000, 4000] ] dic_discount = [1.0, 0.9, 0.8] choiced_user_id = dic_user_id[customer_type] choiced_customer_type = dic_customer_type[customer_type] choiced_menu = dic_menu[category][menu] choiced_price = dic_price[category][menu] * dic_discount[customer_type] return choiced_user_id, choiced_customer_type, choiced_menu, choiced_price while True: d, t = get_time_date() choiced_user_id, choiced_customer_type, choiced_menu, choiced_price = generate_order_data() new_data = { "DATE" : d, "TIME" : t, "USER_ID" : choiced_user_id, "CUSTOMER_TYPE" : choiced_customer_type, "MENU" : choiced_menu, "PRICE" : choiced_price, } producer.send(TOPIC_NAME, json.dumps(new_data).encode("utf-8")) print(new_data) time.sleep(1)
Docker환경 만들기
먼저 도커 컨테이너를 만들어 주시고
docker-compose up -d
Topic만들기
order라는 토픽을 만들 예정이니 docker에 명령어를 날려 order를 만들어 줍니다.
docker exec -it broker kafka-topics --bootstrap-server broker:9092 --create --topic order
여기까지 하면 실습 환경 준비는 끝났습니다. 이제 stream과 table등을 해보면서 실습을 진행 할 예정입니다.
소스 코드는 아래 주소에 나와있으니 clone받아 사용하셔도 됩니다.
반응형'데이터 엔지니어링 > Kafka' 카테고리의 다른 글