Skip to content

Commit 4e0767a

Browse files
committed
Consumer prototype
1 parent 63f8bcc commit 4e0767a

File tree

7 files changed

+887
-202
lines changed

7 files changed

+887
-202
lines changed

examples/kafkajs/consumer.js

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
const { Kafka } = require('../..').KafkaJS
2+
//const { Kafka } = require('kafkajs')
3+
4+
async function consumerStart() {
5+
const kafka = new Kafka({
6+
brokers: ['pkc-8w6ry7.us-west-2.aws.devel.cpdev.cloud:9092'],
7+
ssl: true,
8+
sasl: {
9+
mechanism: 'plain',
10+
username: '<fill>',
11+
password: '<fill>',
12+
}
13+
});
14+
15+
const consumer = kafka.consumer({ groupId: 'test-group' });
16+
17+
await consumer.connect();
18+
console.log("Connected successfully");
19+
20+
const disconnect = () =>
21+
consumer.disconnect().then(() => {
22+
console.log("Disconnected successfully");
23+
});
24+
process.on('SIGINT', disconnect);
25+
process.on('SIGTERM', disconnect);
26+
27+
consumer.subscribe({
28+
topics: [
29+
"topic2"
30+
]
31+
})
32+
33+
await consumer.run({
34+
eachMessage: async ({ topic, partition, message }) => {
35+
console.log({
36+
topic,
37+
partition,
38+
offset: message.offset,
39+
key: message.key?.toString(),
40+
value: message.value.toString(),
41+
})
42+
},
43+
});
44+
}
45+
46+
consumerStart()

examples/kafkajs/producer.js

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
const { Kafka } = require('../..').KafkaJS
2+
//const { Kafka } = require('kafkajs')
3+
4+
async function producerStart() {
5+
const kafka = new Kafka({
6+
brokers: ['pkc-8w6ry7.us-west-2.aws.devel.cpdev.cloud:9092'],
7+
ssl: true,
8+
sasl: {
9+
mechanism: 'plain',
10+
username: '<fill>',
11+
password: '<fill>',
12+
}
13+
});
14+
15+
const producer = kafka.producer();
16+
17+
await producer.connect();
18+
19+
console.log("Connected successfully");
20+
21+
const res = []
22+
for(let i = 0; i < 50; i++) {
23+
res.push(producer.send({
24+
topic: 'topic2',
25+
messages: [
26+
{value: 'v222', partition: 0},
27+
{value: 'v11', partition: 0, key: 'x'},
28+
]
29+
}));
30+
}
31+
await Promise.allSettled(res);
32+
33+
await producer.disconnect();
34+
35+
console.log("Disconnected successfully");
36+
}
37+
38+
producerStart();

lib/kafkajs/_common.js

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
function kafkaJSToRdKafkaConfig(config) {
2+
const ret = {
3+
'allow.auto.create.topics': 'false'
4+
}
5+
ret['bootstrap.servers'] = config['brokers'].join(',');
6+
7+
let withSASL = false;
8+
9+
if (config.sasl) {
10+
const sasl = config.sasl;
11+
if (sasl.mechanism === 'plain' &&
12+
typeof sasl.username === 'string' &&
13+
typeof sasl.password === 'string') {
14+
ret['sasl.mechanism'] = 'PLAIN';
15+
ret['sasl.username'] = sasl.username;
16+
ret['sasl.password'] = sasl.password;
17+
withSASL = true;
18+
}
19+
}
20+
21+
if (config.ssl === true && withSASL) {
22+
ret['security.protocol'] = 'sasl_ssl';
23+
} else if (withSASL) {
24+
ret['security.protocol'] = 'sasl_plaintext';
25+
}
26+
27+
return ret;
28+
}
29+
30+
module.exports = { kafkaJSToRdKafkaConfig }

lib/kafkajs/_consumer.js

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
const RdKafka = require('../rdkafka');
2+
const { kafkaJSToRdKafkaConfig } = require('./_common');
3+
4+
const ConsumerState = Object.freeze({
5+
INIT: 0,
6+
CONNECTING: 1,
7+
CONNECTED: 2,
8+
DISCONNECTING: 3,
9+
DISCONNECTED: 4,
10+
});
11+
12+
class Consumer {
13+
#config = {}
14+
#internalClient = null;
15+
#connectPromiseFunc = {};
16+
#state = ConsumerState.INIT;
17+
18+
constructor(config) {
19+
this.#config = kafkaJSToRdKafkaConfig(config);
20+
if (config.groupId != null) {
21+
this.#config["group.id"] = config.groupId;
22+
}
23+
}
24+
25+
#readyCb(arg) {
26+
if (this.#state !== ConsumerState.CONNECTING) {
27+
// I really don't know how to handle this now.
28+
return;
29+
}
30+
this.#state = ConsumerState.CONNECTED;
31+
32+
// Resolve the promise.
33+
this.#connectPromiseFunc["resolve"]();
34+
}
35+
36+
#errorCb(args) {
37+
console.log('error', args);
38+
if (this.#state === ConsumerState.CONNECTING) {
39+
this.#connectPromiseFunc["reject"](args);
40+
} else {
41+
// do nothing for now.
42+
}
43+
}
44+
45+
#notImplemented() {
46+
throw new Error("Not implemented");
47+
}
48+
49+
#createPayload(message) {
50+
var key = message.key == null ? null : message.key;
51+
if (typeof key === 'string') {
52+
key = Buffer.from(key);
53+
}
54+
55+
let timestamp = message.timestamp ? new Date(message.timestamp).toISOString()
56+
: "";
57+
58+
var headers = undefined;
59+
if (message.headers) {
60+
headers = {}
61+
for (const [key, value] of Object.entries(message.headers)) {
62+
if (!headers[key]) {
63+
headers[key] = value;
64+
} else if (headers[key].constructor === Array) {
65+
headers[key].push(value);
66+
} else {
67+
headers[key] = [headers[key], value];
68+
}
69+
}
70+
}
71+
72+
return {
73+
topic: message.topic,
74+
partition: message.partition,
75+
message: {
76+
key,
77+
value: message.value,
78+
timestamp,
79+
attributes: 0,
80+
offset: message.offset,
81+
size: message.size,
82+
headers
83+
},
84+
heartbeat: async () => {},
85+
pause: () => {}
86+
}
87+
}
88+
89+
async #consumeSingle() {
90+
return new Promise((resolve, reject) => {
91+
this.#internalClient.consume(1, function(err, messages) {
92+
if (err)
93+
reject(`Consume error code ${err.code}`);
94+
95+
const message = messages[0];
96+
resolve(message);
97+
});
98+
});
99+
}
100+
101+
connect() {
102+
if (this.#state !== ConsumerState.INIT) {
103+
return Promise.reject("Connect has already been called elsewhere.");
104+
}
105+
106+
this.#state = ConsumerState.CONNECTING;
107+
this.#internalClient = new RdKafka.KafkaConsumer(this.#config);
108+
this.#internalClient.on('ready', this.#readyCb.bind(this));
109+
this.#internalClient.on('event.error', this.#errorCb.bind(this));
110+
this.#internalClient.on('event.log', console.log);
111+
112+
return new Promise((resolve, reject) => {
113+
this.#connectPromiseFunc = {resolve, reject};
114+
console.log("Connecting....");
115+
this.#internalClient.connect();
116+
console.log("connect() called");
117+
});
118+
}
119+
120+
subscribe(subscription) {
121+
this.#internalClient.subscribe(subscription.topics);
122+
}
123+
124+
stop() {
125+
this.#notImplemented();
126+
}
127+
128+
async run(config) {
129+
if (this.#state !== ConsumerState.CONNECTED) {
130+
throw new Error("Run must be called in state CONNECTED.");
131+
}
132+
133+
while (this.#state === ConsumerState.CONNECTED) {
134+
let m = await this.#consumeSingle();
135+
if (m) {
136+
await config.eachMessage(
137+
this.#createPayload(m)
138+
)
139+
}
140+
}
141+
}
142+
143+
commitOffsets(topicPartitions) {
144+
this.#notImplemented();
145+
}
146+
147+
seek(topicPartitionOffset) {
148+
this.#notImplemented();
149+
}
150+
151+
describeGroup() {
152+
this.#notImplemented();
153+
}
154+
155+
pause(topics) {
156+
this.#notImplemented();
157+
}
158+
159+
paused() {
160+
this.#notImplemented();
161+
}
162+
163+
resume(topics) {
164+
this.#notImplemented();
165+
}
166+
167+
on(eventName, listener) {
168+
this.#notImplemented();
169+
}
170+
171+
logger() {
172+
this.#notImplemented();
173+
}
174+
175+
get events() {
176+
this.#notImplemented();
177+
}
178+
179+
async disconnect() {
180+
if (this.#state >= ConsumerState.DISCONNECTING) {
181+
return;
182+
}
183+
this.#state = ConsumerState.DISCONNECTING;
184+
await new Promise((resolve, reject) => {
185+
const cb = (err) => {
186+
err ? reject(err) : resolve();
187+
this.#state = ConsumerState.DISCONNECTED;
188+
}
189+
this.#internalClient.disconnect(cb);
190+
});
191+
}
192+
}
193+
194+
module.exports = { Consumer }

0 commit comments

Comments
 (0)