Skip to content

Commit 269edf3

Browse files
committed
Add highwatermark offsets to eachBatch callback
1 parent f23e8f2 commit 269edf3

File tree

5 files changed

+89
-3
lines changed

5 files changed

+89
-3
lines changed

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
# confluent-kafka-javascript v1.4.0
2+
3+
v1.4.0 is a feature release. It is supported for all usage.
4+
5+
## Enhancements
6+
7+
1. Adds support for `highWatermark`, `offsetLag()`, and `offsetLagLow()` in `eachBatch` callback (#).
8+
9+
110
# confluent-kafka-javascript v1.3.0
211

312
v1.3.0 is a feature release. It is supported for all usage.

examples/typescript/kafkajs.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ async function runConsumer() {
4444
});
4545

4646
const consumer = kafka.consumer({
47+
'auto.offset.reset': 'earliest',
4748
kafkaJS: {
4849
groupId: 'test-group' + Math.random(),
4950
fromBeginning: true,

lib/kafkajs/_consumer.js

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -863,6 +863,18 @@ class Consumer {
863863
#createBatchPayload(messages) {
864864
const topic = messages[0].topic;
865865
const partition = messages[0].partition;
866+
const watermarkOffsets = this.#internalClient.getWatermarkOffsets(topic, partition);
867+
let highWatermark = '-1001';
868+
let offsetLag_ = -1;
869+
let offsetLagLow_ = -1;
870+
871+
if (Number.isInteger(watermarkOffsets.highOffset)) {
872+
highWatermark = watermarkOffsets.highOffset.toString();
873+
/* While calculating lag, we subtract 1 from the high offset
874+
* for compability reasons with KafkaJS's API */
875+
offsetLag_ = (watermarkOffsets.highOffset - 1) - messages[messages.length - 1].offset;
876+
offsetLagLow_ = (watermarkOffsets.highOffset - 1) - messages[0].offset;
877+
}
866878

867879
const messagesConverted = [];
868880
for (let i = 0; i < messages.length; i++) {
@@ -892,13 +904,13 @@ class Consumer {
892904
const batch = {
893905
topic,
894906
partition,
895-
highWatermark: '-1001', /* We don't fetch it yet. We can call committed() to fetch it but that might incur network calls. */
907+
highWatermark,
896908
messages: messagesConverted,
897909
isEmpty: () => false,
898910
firstOffset: () => (messagesConverted[0].offset).toString(),
899911
lastOffset: () => (messagesConverted[messagesConverted.length - 1].offset).toString(),
900-
offsetLag: () => notImplemented(),
901-
offsetLagLow: () => notImplemented(),
912+
offsetLag: () => offsetLag_.toString(),
913+
offsetLagLow: () => offsetLagLow_.toString(),
902914
};
903915

904916
const returnPayload = {

test/promisified/consumer/consumeMessages.spec.js

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -858,4 +858,66 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit
858858
await waitFor(() => (messagesConsumed === messages.length || batchesCountExceeds1), () => { }, 100);
859859
expect(batchesCountExceeds1).toBe(false);
860860
});
861+
862+
it('shows the correct high watermark and lag for partition', async () => {
863+
await producer.connect();
864+
865+
let messages0 = Array(10).fill().map(() => {
866+
const value = secureRandom();
867+
return { value: `value-${value}`, partition: 0 };
868+
});
869+
let partition0ProducedMessages = messages0.length;
870+
871+
const messages1 = Array(5).fill().map(() => {
872+
const value = secureRandom();
873+
return { value: `value-${value}`, partition: 1 };
874+
});
875+
876+
const messages2 = Array(2).fill().map(() => {
877+
const value = secureRandom();
878+
return { value: `value-${value}`, partition: 2 };
879+
});
880+
881+
for (const messages of [messages0, messages1, messages2]) {
882+
await producer.send({
883+
topic: topicName,
884+
messages: messages,
885+
});
886+
}
887+
888+
await consumer.connect();
889+
await consumer.subscribe({ topic: topicName });
890+
891+
let messagesConsumed = 0;
892+
consumer.run({
893+
partitionsConsumedConcurrently,
894+
eachBatch: async ({ batch }) => {
895+
if (batch.partition === 0) {
896+
expect(batch.highWatermark).toEqual(String(partition0ProducedMessages));
897+
} else if (batch.partition === 1) {
898+
expect(batch.highWatermark).toEqual(String(messages1.length));
899+
} else if (batch.partition === 2) {
900+
expect(batch.highWatermark).toEqual(String(messages2.length));
901+
}
902+
expect(batch.offsetLag()).toEqual(String(+batch.highWatermark - 1 - +batch.lastOffset()));
903+
expect(batch.offsetLagLow()).toEqual(String(+batch.highWatermark - 1 - +batch.firstOffset()));
904+
messagesConsumed += batch.messages.length;
905+
}
906+
});
907+
await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100);
908+
909+
/* Add some more messages to partition 0 to make sure high watermark is updated. */
910+
messages0 = Array(15).fill().map(() => {
911+
const value = secureRandom();
912+
return { value: `value-${value}`, partition: 0 };
913+
});
914+
partition0ProducedMessages += messages0.length;
915+
await producer.send({
916+
topic: topicName,
917+
messages: messages0,
918+
});
919+
920+
await waitFor(() => (messagesConsumed === (partition0ProducedMessages + messages1.length + messages2.length)), () => { }, 100);
921+
});
922+
861923
});

types/kafkajs.d.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ export type Batch = {
269269
isEmpty(): boolean
270270
firstOffset(): string | null
271271
lastOffset(): string
272+
offsetLag(): string
273+
offsetLagLow(): string
272274
}
273275

274276
export type KafkaMessage = MessageSetEntry | RecordBatchEntry

0 commit comments

Comments
 (0)