Skip to content

Commit 1547640

Browse files
authored
Fix header conversion in eachBatch (#130)
1 parent a717346 commit 1547640

File tree

3 files changed

+80
-27
lines changed

3 files changed

+80
-27
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# confluent-kafka-javascript v0.3.1
2+
3+
v0.3.1 is a limited availability maintenance release. It is supported for all usage.
4+
5+
## Enhancements
6+
7+
1. Fixes an issue where headers were not passed correctly to the `eachBatch` callback (#130).
8+
9+
110
# confluent-kafka-javascript v0.3.0
211

312
v0.3.0 is a limited availability feature release. It is supported for all usage.

lib/kafkajs/_consumer.js

Lines changed: 23 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -645,22 +645,15 @@ class Consumer {
645645
}
646646

647647
/**
648-
* Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback.
649-
* @param {import("../..").Message} message
650-
* @returns {import("../../types/kafkajs").EachMessagePayload}
648+
* Converts headers returned by node-rdkafka into a format that can be used by the eachMessage/eachBatch callback.
649+
* @param {import("../..").MessageHeader[] | undefined} messageHeaders
650+
* @returns {import("../../types/kafkajs").IHeaders}
651651
*/
652-
#createPayload(message) {
653-
let key = message.key;
654-
if (typeof key === 'string') {
655-
key = Buffer.from(key);
656-
}
657-
658-
let timestamp = message.timestamp ? String(message.timestamp) : '';
659-
652+
#createHeaders(messageHeaders) {
660653
let headers;
661-
if (message.headers) {
654+
if (messageHeaders) {
662655
headers = {};
663-
for (const header of message.headers) {
656+
for (const header of messageHeaders) {
664657
for (const [key, value] of Object.entries(header)) {
665658
if (!Object.hasOwn(headers, key)) {
666659
headers[key] = value;
@@ -672,6 +665,22 @@ class Consumer {
672665
}
673666
}
674667
}
668+
return headers;
669+
}
670+
671+
/**
672+
* Converts a message returned by node-rdkafka into a message that can be used by the eachMessage callback.
673+
* @param {import("../..").Message} message
674+
* @returns {import("../../types/kafkajs").EachMessagePayload}
675+
*/
676+
#createPayload(message) {
677+
let key = message.key;
678+
if (typeof key === 'string') {
679+
key = Buffer.from(key);
680+
}
681+
682+
let timestamp = message.timestamp ? String(message.timestamp) : '';
683+
const headers = this.#createHeaders(message.headers);
675684

676685
return {
677686
topic: message.topic,
@@ -788,20 +797,7 @@ class Consumer {
788797
}
789798

790799
let timestamp = message.timestamp ? String(message.timestamp) : '';
791-
792-
let headers;
793-
if (message.headers) {
794-
headers = {};
795-
for (const [key, value] of Object.entries(message.headers)) {
796-
if (!Object.hasOwn(headers, key)) {
797-
headers[key] = value;
798-
} else if (headers[key].constructor === Array) {
799-
headers[key].push(value);
800-
} else {
801-
headers[key] = [headers[key], value];
802-
}
803-
}
804-
}
800+
const headers = this.#createHeaders(message.headers);
805801

806802
const messageConverted = {
807803
key,

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,54 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
134134
);
135135
});
136136

137+
it('consume batch of messages with headers', async () => {
138+
await consumer.connect();
139+
await producer.connect();
140+
await consumer.subscribe({ topic: topicName });
141+
142+
const messagesConsumed = [];
143+
consumer.run({
144+
partitionsConsumedConcurrently,
145+
eachBatch: async event => messagesConsumed.push(event)
146+
});
147+
148+
const messages = [{
149+
value: `value-${secureRandom}`,
150+
headers: {
151+
'header-1': 'value-1',
152+
'header-2': 'value-2',
153+
'header-3': ['value-3-1', 'value-3-2', Buffer.from([1, 0, 1, 0, 1])],
154+
'header-4': Buffer.from([1, 0, 1, 0, 1]),
155+
},
156+
partition: 0,
157+
}];
158+
159+
await producer.send({ topic: topicName, messages });
160+
await waitForMessages(messagesConsumed, { number: messages.length });
161+
162+
expect(messagesConsumed[0]).toEqual(
163+
expect.objectContaining({
164+
batch: expect.objectContaining({
165+
topic: topicName,
166+
partition: 0,
167+
messages: [
168+
expect.objectContaining({
169+
value: Buffer.from(messages[0].value),
170+
offset: '0',
171+
headers: {
172+
// Headers are always returned as Buffers from the broker.
173+
'header-1': Buffer.from('value-1'),
174+
'header-2': Buffer.from('value-2'),
175+
'header-3': [Buffer.from('value-3-1'), Buffer.from('value-3-2'), Buffer.from([1, 0, 1, 0, 1])],
176+
'header-4': Buffer.from([1, 0, 1, 0, 1]),
177+
}
178+
}),
179+
]
180+
}),
181+
})
182+
);
183+
});
184+
137185
it.each([[true], [false]])('consumes messages using eachBatch - isAutoResolve: %s', async (isAutoResolve) => {
138186
await consumer.connect();
139187
await producer.connect();

0 commit comments

Comments
 (0)