Skip to content

Commit fd4d24a

Browse files
authored
Changed default value of maxWaitTimeInMs (#363)
* Changed default value of maxWaitTimeInMs * Added test * Add link
1 parent 3b90792 commit fd4d24a

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
# confluent-kafka-javascript 1.5.0
2+
3+
v1.5.0 is a feature release. It is supported for all usage.
4+
5+
## Fixes
6+
7+
1. Fix issue of delay of up to 5s in receiving messages after pause and resume, or seek (#285, #363).
8+
19
# confluent-kafka-javascript v1.4.1
210

311
v1.4.1 is a maintenance release. It is supported for all usage.

MIGRATION.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ producerRun().then(consumerRun).catch(console.error);
222222
| allowAutoTopicCreation | true | Determines if a topic should be created if it doesn't exist while consuming. |
223223
| **maxBytesPerPartition** | 1048576 (1MB) | determines how many bytes can be fetched in one request from a single partition. There is a change in semantics, this size grows dynamically if a single message larger than this is encountered, and the client does not get stuck. |
224224
| minBytes | 1 | Minimum number of bytes the broker responds with (or wait until `maxWaitTimeInMs`) |
225+
| **maxWaitTimeInMs** | 500 | Maximum time the broker may wait to fill the Fetch response with minBytes of messages. Its default value has been changed to match librdkafka's configuration.
225226
| maxBytes | 10485760 (10MB) | Maximum number of bytes the broker responds with. |
226227
| **retry** | object | Identical to `retry` in the common configuration. This takes precedence over the common config retry. |
227228
| readUncommitted | false | If true, consumer will read transactional messages which have not been committed. |

lib/kafkajs/_consumer.js

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -563,8 +563,6 @@ class Consumer {
563563

564564
if (Object.hasOwn(kjsConfig, 'maxWaitTimeInMs')) {
565565
rdKafkaConfig['fetch.wait.max.ms'] = kjsConfig.maxWaitTimeInMs;
566-
} else {
567-
rdKafkaConfig['fetch.wait.max.ms'] = 5000;
568566
}
569567

570568
if (Object.hasOwn(kjsConfig, 'minBytes')) {
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/* eslint-disable no-unused-vars */
2+
jest.setTimeout(30000);
3+
4+
const process = require('process');
5+
const {
6+
secureRandom,
7+
createTopic,
8+
waitFor,
9+
createProducer,
10+
createConsumer,
11+
sleep,
12+
} = require('../testhelpers');
13+
14+
// See https://github.com/confluentinc/confluent-kafka-javascript/issues/285
15+
describe('Consumer commit', () => {
16+
let topicName, groupId, producer, consumer;
17+
18+
beforeEach(async () => {
19+
topicName = `test-topic-${secureRandom()}`;
20+
groupId = `consumer-group-id-${secureRandom()}`;
21+
await createTopic({ topic: topicName, partitions: 1 });
22+
producer = createProducer({});
23+
consumer = createConsumer({
24+
groupId,
25+
fromBeginning: true,
26+
autoCommit: false,
27+
});
28+
});
29+
30+
afterEach(async () => {
31+
if (consumer) await consumer.disconnect();
32+
if (producer) await producer.disconnect();
33+
});
34+
35+
it('should not have significant latency after pause-seek-resume', async () => {
36+
const initialMessages = Array(30).fill().map(() => ({ value: `value-${secureRandom()}` }));
37+
await producer.connect();
38+
await producer.send({ topic: topicName, messages: initialMessages });
39+
await producer.flush();
40+
41+
let msgCount = 0;
42+
let maxLatencyMs = 0;
43+
let startTime = process.hrtime.bigint();
44+
45+
await consumer.connect();
46+
await consumer.subscribe({ topic: topicName });
47+
48+
await consumer.run({
49+
eachMessage: async ({ topic, partition, message }) => {
50+
const endTime = process.hrtime.bigint();
51+
const latencyMs = Number((endTime - startTime) / BigInt(1e6));
52+
if (msgCount > 0) maxLatencyMs = Math.max(maxLatencyMs, latencyMs);
53+
54+
msgCount++;
55+
await sleep(10);
56+
startTime = process.hrtime.bigint();
57+
}
58+
});
59+
60+
// Wait for 5 messages to be processed
61+
await waitFor(() => msgCount >= 5, () => null, { delay: 100, timeout: 10000 });
62+
63+
await consumer.pause([{ topic: topicName, partition: 0 }]);
64+
await consumer.seek({ topic: topicName, partition: 0, offset: '0' });
65+
await consumer.resume([{ topic: topicName, partition: 0 }]);
66+
startTime = process.hrtime.bigint(); // Reset timer after resume
67+
68+
// Wait for all messages to be processed (30 original + 5 after seek)
69+
await waitFor(() => msgCount >= 35, () => null, { delay: 100, timeout: 10000 });
70+
71+
// Assert latency is reasonable (e.g., < 4000ms)
72+
expect(maxLatencyMs).toBeLessThan(4000);
73+
74+
// Assert all messages processed
75+
expect(msgCount).toBeGreaterThanOrEqual(35);
76+
});
77+
});

0 commit comments

Comments
 (0)