File tree Expand file tree Collapse file tree 4 files changed +86
-3
lines changed
Expand file tree Collapse file tree 4 files changed +86
-3
lines changed Original file line number Diff line number Diff line change 1+ # Kafka Example
2+
3+ ## Summary
4+ This is an example on how to use the cloudevents javascript sdk with Kafka in NodeJs.
5+
6+
7+ ## Description
8+ A simple cli application sending user input as a cloudevent message through a kafka producer to a topic. And eventually, the cloudevent message is handled and deserialized correctly by a consumer within a consumer group subscribed to the same topic.
9+
10+ ## Dependencies
11+ - NodeJS (>18)
12+ - Kafka running locally or remotely
13+
14+ ## Local Kafka Setup with Docker
15+
16+ #### Option 1: Run Zookeeper and Kafka Dccker Images sequentially with these commands
17+
18+ ``` bash
19+ docker run -d \
20+ --name zookeeper \
21+ -e ZOOKEEPER_CLIENT_PORT=2181 \
22+ -e ZOOKEEPER_TICK_TIME=2000 \
23+ confluentinc/cp-zookeeper:7.3.2
24+
25+ ```
26+ ``` bash
27+ docker run -d \
28+ --name kafka \
29+ -p 9092:9092 \
30+ -e KAFKA_BROKER_ID=1 \
31+ -e KAFKA_ZOOKEEPER_CONNECT=localhost:2181 \
32+ -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
33+ -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
34+ -e KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS=0 \
35+ --link zookeeper:zookeeper \
36+ confluentinc/cp-kafka:7.3.2
37+
38+ ```
39+
40+ #### Option 2: Run both images using the docker compose file
41+
42+ ``` bash
43+ cd ${directory of the docker compose file}
44+
45+ docker compose up -d
46+ ```
47+
48+ ## Then, run the producer (cli) and consumer
49+
50+ #### To Start the Producer
51+ ``` bash
52+ npm run start:producer
53+ ```
54+
55+ #### To Start the Consumer
56+ ``` bash
57+ npm run start:consumer ${groupId}
58+ ```
Original file line number Diff line number Diff line change 1+ ---
2+ version : ' 3'
3+ services :
4+ zookeeper :
5+ image : confluentinc/cp-zookeeper:7.3.2
6+ container_name : zookeeper
7+ environment :
8+ ZOOKEEPER_CLIENT_PORT : 2181
9+ ZOOKEEPER_TICK_TIME : 2000
10+
11+ kafka :
12+ image : confluentinc/cp-kafka:7.3.2
13+ container_name : kafka
14+ depends_on :
15+ - zookeeper
16+ ports :
17+ - " 9092:9092"
18+ environment :
19+ KAFKA_BROKER_ID : 1
20+ KAFKA_ZOOKEEPER_CONNECT : ' zookeeper:2181'
21+ KAFKA_ADVERTISED_LISTENERS : PLAINTEXT://localhost:9092
22+ KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR : 1
23+ KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS : 0
Original file line number Diff line number Diff line change 11/* eslint-disable */
2+
23import { Headers , Kafka , Message } from "cloudevents" ;
34import kafka from "./client" ;
45
@@ -21,10 +22,10 @@ const groupId = process.argv[2];
2122 } ) ;
2223
2324 try {
24- let newHeaders : Headers = { } ;
25+ const newHeaders : Headers = { } ;
2526 Object . keys ( message . headers as Headers ) . forEach ( ( key ) => {
2627 // this is needed here because the headers are buffer values
27- // when it gets to the consumer which is invalid for the
28+ // when it gets to the consumer and the buffer headers are not valid for the
2829 // toEvent api from cloudevents, so this converts each key value to a string
2930 // as expected by the toEvent api
3031 newHeaders [ key ] = message ! . headers ! [ key ] ?. toString ( ) ?? "" ;
Original file line number Diff line number Diff line change 11/* eslint-disable */
2+
23import { CloudEvent , Kafka } from "cloudevents" ;
34import readline from "readline" ;
45import kafka from "./client" ;
@@ -12,7 +13,7 @@ const rl = readline.createInterface({
1213 const producer = kafka . producer ( ) ;
1314 await producer . connect ( ) ;
1415
15- rl . setPrompt ( "> " ) ;
16+ rl . setPrompt ( "Enter message > " ) ;
1617 rl . prompt ( ) ;
1718 rl . on ( "line" , async ( line ) => {
1819 const event = new CloudEvent ( {
You can’t perform that action at this time.
0 commit comments