-
[Kafka] KsqlDB 실습하기 - create Kstream, Ktable데이터 엔지니어링/Kafka 2022. 7. 31. 17:14반응형
필수 개념 이해와 실습 환경 설정
KsqlDB 필수 개념을 숙지 하지 않으신 분께서는 아래의 링크로 들어가 확인후 다시 와서 봐주시기 바랍니다.
작업환경은 이해 하지 않아도 무관하나 먼저 작업환경을 알아두면 이해하기 좋을 것같습니다. 아래 글 확인
Producer에 메세지 보내기
먼저 위 포스팅에서 도커 환경을 만들었다면 producer에 메세지를 보내 보겠습니다. 제가 만든 코드를 실행해 보면 아래와 같이 맥도날드의 메뉴가격과 등급에 따른 할인율이 적용된 것을 볼 수 있습니다.
python producer-order.py
KsqlDB 접속하기
그럼 KsqlDB로 들어가 Stream과 Table을 만들어 보고 확인해 보겠습니다. ksqlDB서버에 접속
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Starting Offset 설정하기
그리고 처음 들어온 메세지도 확인하기 위해 명령어를 쳐줍니다. 아래와 같이 Successfully changed가 나오면 처음 들어온 메세지 부터 조회가 가능합니다.
docker exec -it ksqldb-cli ksql http://ksqldb-server:8088
Create Stream
Stream은 데이터가 무한하다고 생각하고 히스토리 형식으로 들어온 메세지를 보는 객체라고 보면 좋을 것같습니다. 만드는 방법은 위와 같이 두개의 방법이 있습니다. Schema를 사용하여 만드는 방법과 Select문을 사용하여 생성하는 방법이 2가지 있습니다. Schema제약 조건은 아래와 같으니 신경써서 작성하시면 되겠습니다.
그럼 먼저 schema를 사용하여 생성해 보겠습니다. 먼저 위에 데이터를 보면 date, time, user_id, customer_type, menu, price에 맞게 설정하여 만들었습니다. 아래의 명령어를 KsqlDB cli에 명령어를 치면
CREATE STREAM order_stream (date varchar, time varchar, user_id bigint, customer_type varchar, menu varchar, price int) WITH (kafka_topic='order', value_format='json', partitions=1);
create 된것을 볼 수 있습니다. 제약 조건이 맞지 않으면, 오류가 나니 오류 내용을 읽고 대응하면 되겠습니다. show streams; 명령어를 쳐보면 stream이 생긴것을 확인 할 수 있습니다. 먼저 조회 쿼리를 날려 보면 아래와 같이 select문이 잘 동작하는 것을 볼 수 있습니다.
그럼 select문으로 stream을 생성해 보겠습니다. 만들 때 그냥 만들면 차이점을 잘 모르니깐 PLATINUM등급만 조회 되는 stream을 만들어 보겠습니다.
create stream order_stream_sel_platinum as select * from order_stream where customer_type = 'PLATINUM';
조회해 보면 PLATINUM등급만 잘 나오는 것을 볼 수 있습니다.
Create Table
Create Table 도 똑같습니다. schema와 select문으로 생성 할 수 있습니다.
CREATE STREAM order_stream( date varchar, time varchar, user_id bigint, customer_type varchar, menu varchar, price int) WITH ( kafka_topic='order', value_format='json', partitions=1);
kafka_topic을 stream에서 사용했기때문에 메세지가 소모 되서 똑같은 topic으로 create하면 데이터가 나오지 않습니다. create table schema는 하지 않고 바로 select로 진행해 보겠습니다. 그리고 보통 create with topic 은 시퀀스 이기 때문에 stream을 이용하여 만든다고 생각이 들었습니다.
select 문을 사용하여 생성 하기 stream은 데이터가 들어오는 시퀀스라고 말씀드렸고, table은 상태 기반 현재 값을 나타낸다고 하였습니다. 그래서 select문으로 table을 만들 때는 꼭 상태 기반으로 만들어 주어야 합니다. 그래서 저는 group by 를 하여 menu별로 결제 금액을 나타내는 상태 기반 table을 만들었습니다. 그리고 show tables; 명령어를 이용하여 생성 된것을 확인 할 수 있습니다.
CREATE TABLE order_table_groupby_menu as SELECT menu, sum(price) as sum_price FROM order_stream group by menu;
조회해 보면 아래와 같이 합계가 잘 나오는 것을 볼 수 있습니다. 상태 기반으로 나오는 것을 알 수 있습니다.
이번 글은 Create Stream과 Table에 대해 알아 보았습니다. Stateless와 Stateful을 생각하며 만들면 좋은 결과가 있을 것같습니다. 다음에는 push와 pull query에대해 실습해 보겠습니다.
반응형'데이터 엔지니어링 > Kafka' 카테고리의 다른 글