Skip to content

Commit 11aa5e4

Browse files
committed
Add producer tests and changes
1 parent daa572b commit 11aa5e4

File tree

5 files changed

+300
-2
lines changed

5 files changed

+300
-2
lines changed

MIGRATION.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,11 @@
220220
#### Semantic and Per-Method Changes
221221

222222

223-
* Changes to subscribe:
223+
* Changes to `subscribe`:
224224
* Regex flags are ignored while passing a topic subscription (like 'i' or 'g').
225225
* Subscribe must be called after `connect`.
226+
* An optional parameter, `replace` is provided. If set to true, the current subscription is replaced with the new one. If set to false, the new subscription is added to the current one.
227+
The default value is false.
226228
* While passing a list of topics to `subscribe`, the `fromBeginning` property is not supported. Instead, the property `auto.offset.reset` needs to be used.
227229
Before:
228230
```javascript

lib/kafkajs/_consumer.js

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ class Consumer {
440440
}
441441

442442
/**
443-
* Starts consumer polling.
443+
* Starts consumer polling. This method returns immediately.
444444
* @param {import("../../types/kafkajs").ConsumerRunConfig} config
445445
*/
446446
async run(config) {
@@ -454,6 +454,12 @@ class Consumer {
454454
{ code: error.ErrorCodes.ERR__NOT_IMPLEMENTED });
455455
}
456456

457+
/* We deliberately don't await this. */
458+
this.#runInternal(config);
459+
}
460+
461+
/* Internal polling loop. It accepts the same config object that `run` accepts. */
462+
async #runInternal(config) {
457463
while (this.#state === ConsumerState.CONNECTED) {
458464
const m = await this.#consumeSingle();
459465

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
const {
2+
secureRandom,
3+
createProducer,
4+
createTopic,
5+
} = require('../testhelpers')
6+
7+
describe('Producer > Transactional producer', () => {
8+
let producer1, producer2, topicName, transactionalId, message;
9+
10+
const newProducer = () =>
11+
createProducer({
12+
idempotent: true,
13+
transactionalId,
14+
transactionTimeout: 1000,
15+
});
16+
17+
beforeEach(async () => {
18+
topicName = `test-topic-${secureRandom()}`;
19+
transactionalId = `transactional-id-${secureRandom()}`;
20+
message = { key: `key-${secureRandom()}`, value: `value-${secureRandom()}` };
21+
22+
await createTopic({ topic: topicName });
23+
})
24+
25+
afterEach(async () => {
26+
producer1 && (await producer1.disconnect());
27+
producer2 && (await producer2.disconnect());
28+
})
29+
30+
describe('when there is an ongoing transaction on connect', () => {
31+
it('retries initProducerId to cancel the ongoing transaction',
32+
async () => {
33+
// Producer 1 will create a transaction and "crash", it will never commit or abort the connection
34+
producer1 = newProducer();
35+
await producer1.connect();
36+
const transaction1 = await producer1.transaction();
37+
await transaction1.send({ topic: topicName, messages: [message] });
38+
39+
// Producer 2 starts with the same transactional id to cause the concurrent transactions error
40+
producer2 = newProducer();
41+
await producer2.connect();
42+
let transaction2;
43+
await expect(producer2.transaction().then(t => (transaction2 = t))).resolves.toBeTruthy();
44+
await transaction2.send({ topic: topicName, messages: [message] });
45+
await transaction2.commit();
46+
}
47+
)
48+
})
49+
})
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
jest.setTimeout(10000)
2+
3+
const {
4+
secureRandom,
5+
createTopic,
6+
waitForMessages,
7+
createProducer,
8+
createConsumer,
9+
} = require('../testhelpers');
10+
const { KafkaJSError } = require('../../../lib').KafkaJS;
11+
12+
describe('Producer > Idempotent producer', () => {
13+
let producer, consumer, topicName, cluster, messages;
14+
15+
beforeAll(async () => {
16+
messages = Array(4)
17+
.fill()
18+
.map((_, i) => {
19+
const value = secureRandom()
20+
return { key: `key-${value}`, value: `${i}` }
21+
})
22+
})
23+
24+
beforeEach(async () => {
25+
topicName = `test-topic-${secureRandom()}`;
26+
producer = createProducer({
27+
idempotent: true,
28+
})
29+
consumer = createConsumer({
30+
groupId: `consumer-group-id-${secureRandom()}`,
31+
maxWaitTimeInMs: 0,
32+
rdKafka: {
33+
topicConfig: {
34+
'auto.offset.reset': 'earliest',
35+
},
36+
}
37+
})
38+
await createTopic({ topic: topicName, partitions: 1 });
39+
await Promise.all([producer.connect(), consumer.connect()]);
40+
await consumer.subscribe({ topic: topicName });
41+
});
42+
43+
afterEach(
44+
async () =>
45+
await Promise.all([
46+
producer && (await producer.disconnect()),
47+
consumer && (await consumer.disconnect()),
48+
])
49+
);
50+
51+
it('sequential produce() calls > all messages are written to the partition once, in order', async () => {
52+
const messagesConsumed = [];
53+
54+
for (const m of messages) {
55+
await producer.send({ topic: topicName, messages: [m] })
56+
}
57+
58+
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) });
59+
await waitForMessages(messagesConsumed, { number: messages.length });
60+
61+
messagesConsumed.forEach(({ message: { value } }, i) =>
62+
expect(value.toString()).toEqual(`${i}`)
63+
);
64+
});
65+
66+
/* Skip as we don't have the mock broker available */
67+
it.skip('sequential produce() calls > where produce() throws a retriable error, all messages are written to the partition once, in order', async () => {
68+
for (const nodeId of [0, 1, 2]) {
69+
const broker = await cluster.findBroker({ nodeId })
70+
71+
const brokerProduce = jest.spyOn(broker, 'produce')
72+
brokerProduce.mockImplementationOnce(() => {
73+
throw new KafkaJSError('retriable error')
74+
})
75+
}
76+
77+
const messagesConsumed = []
78+
79+
for (const m of messages) {
80+
await producer.send({ acks: -1, topic: topicName, messages: [m] })
81+
}
82+
83+
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })
84+
85+
await waitForMessages(messagesConsumed, { number: messages.length })
86+
87+
messagesConsumed.forEach(({ message: { value } }, i) =>
88+
expect(value.toString()).toEqual(`${i}`)
89+
)
90+
});
91+
92+
/* Skip as we don't have the mock broker available */
93+
it.skip('sequential produce() calls > where produce() throws a retriable error after the message is written to the log, all messages are written to the partition once, in order', async () => {
94+
for (const nodeId of [0, 1, 2]) {
95+
const broker = await cluster.findBroker({ nodeId })
96+
const originalCall = broker.produce.bind(broker)
97+
const brokerProduce = jest.spyOn(broker, 'produce')
98+
brokerProduce.mockImplementationOnce()
99+
brokerProduce.mockImplementationOnce()
100+
brokerProduce.mockImplementationOnce(async (...args) => {
101+
await originalCall(...args)
102+
throw new KafkaJSError('retriable error')
103+
})
104+
}
105+
106+
const messagesConsumed = []
107+
108+
for (const m of messages) {
109+
await producer.send({ acks: -1, topic: topicName, messages: [m] })
110+
}
111+
112+
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })
113+
114+
await waitForMessages(messagesConsumed, { number: messages.length })
115+
116+
messagesConsumed.forEach(({ message: { value } }, i) =>
117+
expect(value.toString()).toEqual(`${i}`)
118+
)
119+
})
120+
121+
it('concurrent produce() calls > all messages are written to the partition once', async () => {
122+
const messagesConsumed = []
123+
124+
await Promise.all(
125+
messages.map(m => producer.send({ topic: topicName, messages: [m] }))
126+
)
127+
128+
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })
129+
130+
await waitForMessages(messagesConsumed, { number: messages.length })
131+
expect(messagesConsumed).toHaveLength(messages.length)
132+
});
133+
134+
/* Skip as we don't have the mock broker available */
135+
it.skip('concurrent produce() calls > where produce() throws a retriable error on the first call, all messages are written to the partition once', async () => {
136+
for (const nodeId of [0, 1, 2]) {
137+
const broker = await cluster.findBroker({ nodeId })
138+
139+
const brokerProduce = jest.spyOn(broker, 'produce')
140+
brokerProduce.mockImplementationOnce(async () => {
141+
throw new KafkaJSError('retriable error')
142+
})
143+
}
144+
145+
await Promise.allSettled(
146+
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
147+
)
148+
149+
const messagesConsumed = []
150+
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })
151+
152+
await waitForMessages(messagesConsumed, { number: messages.length })
153+
154+
expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
155+
messages.length
156+
)
157+
})
158+
159+
/* Skip as we don't have the mock broker available */
160+
it.skip('concurrent produce() calls > where produce() throws a retriable error on 2nd call, all messages are written to the partition once', async () => {
161+
for (const nodeId of [0, 1, 2]) {
162+
const broker = await cluster.findBroker({ nodeId })
163+
164+
const brokerProduce = jest.spyOn(broker, 'produce')
165+
brokerProduce.mockImplementationOnce()
166+
brokerProduce.mockImplementationOnce(async () => {
167+
throw new KafkaJSError('retriable error')
168+
})
169+
}
170+
171+
await Promise.allSettled(
172+
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
173+
)
174+
175+
const messagesConsumed = []
176+
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })
177+
178+
await waitForMessages(messagesConsumed, { number: messages.length })
179+
180+
expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
181+
messages.length
182+
)
183+
})
184+
185+
/* Skip as we don't have the mock broker available */
186+
it.skip('concurrent produce() calls > where produce() throws a retriable error after the message is written to the log, all messages are written to the partition once', async () => {
187+
for (const nodeId of [0, 1, 2]) {
188+
const broker = await cluster.findBroker({ nodeId })
189+
const originalCall = broker.produce.bind(broker)
190+
const brokerProduce = jest.spyOn(broker, 'produce')
191+
brokerProduce.mockImplementationOnce(async (...args) => {
192+
await originalCall(...args)
193+
throw new KafkaJSError('retriable error')
194+
})
195+
}
196+
197+
const messagesConsumed = []
198+
199+
await Promise.all(
200+
messages.map(m => producer.send({ acks: -1, topic: topicName, messages: [m] }))
201+
)
202+
203+
await consumer.run({ eachMessage: async message => messagesConsumed.push(message) })
204+
205+
await waitForMessages(messagesConsumed, { number: messages.length })
206+
207+
expect(arrayUnique(messagesConsumed.map(({ message: { value } }) => value))).toHaveLength(
208+
messages.length
209+
)
210+
})
211+
})
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
const { createTopic, createProducer, secureRandom } = require('../testhelpers');
2+
const { ErrorCodes } = require('../../../lib').KafkaJS;
3+
4+
describe('Producer > Producing to invalid topics', () => {
5+
let producer, topicName;
6+
7+
beforeEach(async () => {
8+
topicName = `test-topic-${secureRandom()}`
9+
10+
producer = createProducer({
11+
})
12+
await producer.connect();
13+
await createTopic({ topic: topicName });
14+
})
15+
16+
afterEach(async () => {
17+
producer && (await producer.disconnect())
18+
})
19+
20+
it('it rejects when producing to an invalid topic name, but is able to subsequently produce to a valid topic', async () => {
21+
const message = { key: `key-${secureRandom()}`, value: `value-${secureRandom()}` };
22+
const invalidTopicName = `${topicName}-abc)(*&^%`;
23+
await expect(producer.send({ topic: invalidTopicName, messages: [message] })).rejects.toHaveProperty(
24+
'code',
25+
ErrorCodes.ERR_TOPIC_EXCEPTION,
26+
);
27+
28+
await expect(producer.send({ topic: topicName, messages: [message] })).resolves.toBeTruthy();
29+
});
30+
})

0 commit comments

Comments
 (0)