Skip to content

Commit ea487f9

Browse files
committed
Add tests for cache and non-compatibility mode
1 parent 1b035c2 commit ea487f9

File tree

2 files changed

+492
-0
lines changed

2 files changed

+492
-0
lines changed
Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
jest.setTimeout(30000)
2+
3+
const {
4+
secureRandom,
5+
createTopic,
6+
waitFor,
7+
createProducer,
8+
createConsumer,
9+
waitForMessages,
10+
sleep,
11+
} = require('../testhelpers');
12+
13+
describe('Consumer message cache', () => {
14+
let topicName, groupId, producer, consumer;
15+
16+
beforeEach(async () => {
17+
topicName = `test-topic-${secureRandom()}`
18+
groupId = `consumer-group-id-${secureRandom()}`
19+
20+
await createTopic({ topic: topicName, partitions: 3 })
21+
22+
producer = createProducer({});
23+
24+
consumer = createConsumer({
25+
groupId,
26+
maxWaitTimeInMs: 100,
27+
fromBeginning: true,
28+
});
29+
});
30+
31+
afterEach(async () => {
32+
consumer && (await consumer.disconnect())
33+
producer && (await producer.disconnect())
34+
});
35+
36+
it('is cleared on pause', async () => {
37+
await consumer.connect();
38+
await producer.connect();
39+
await consumer.subscribe({ topic: topicName })
40+
41+
const messagesConsumed = [];
42+
consumer.run({
43+
eachMessage: async event => {
44+
messagesConsumed.push(event);
45+
if (event.partition === 0 && (+event.message.offset) === 1023) {
46+
consumer.pause([{ topic: topicName, partitions: [0] }]);
47+
}
48+
}
49+
});
50+
51+
/* Evenly distribute 1024*9 messages across 3 partitions */
52+
let i = 0;
53+
const messages = Array(1024 * 9)
54+
.fill()
55+
.map(() => {
56+
const value = secureRandom()
57+
return { value: `value-${value}`, partition: ((i++) % 3) }
58+
})
59+
60+
await producer.send({ topic: topicName, messages })
61+
62+
// Wait for the messages.
63+
// We consume 1024 messages from partition 0, and 1024*3 from partition 1 and 2.
64+
await waitForMessages(messagesConsumed, { number: 1024 * 7 });
65+
66+
// We should not consume even one more message than that.
67+
await sleep(1000);
68+
expect(messagesConsumed.length).toEqual(1024 * 7);
69+
70+
// check if all offsets are present
71+
// partition 0
72+
expect(messagesConsumed.filter(m => m.partition === 0).map(m => m.message.offset)).toEqual(Array(1024).fill().map((_, i) => `${i}`));
73+
// partition 1
74+
expect(messagesConsumed.filter(m => m.partition === 1).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
75+
// partition 2
76+
expect(messagesConsumed.filter(m => m.partition === 2).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
77+
});
78+
79+
it('is cleared on seek', async () => {
80+
await consumer.connect();
81+
await producer.connect();
82+
await consumer.subscribe({ topic: topicName })
83+
84+
const messagesConsumed = [];
85+
let hasBeenSeeked = false;
86+
consumer.run({
87+
eachMessage: async event => {
88+
messagesConsumed.push(event);
89+
if (event.partition === 0 && (+event.message.offset) === 1023 && !hasBeenSeeked) {
90+
consumer.seek({ topic: topicName, partition: 0, offset: 0 });
91+
hasBeenSeeked = true;
92+
}
93+
}
94+
});
95+
96+
/* Evenly distribute 1024*9 messages across 3 partitions */
97+
let i = 0;
98+
const messages = Array(1024 * 9)
99+
.fill()
100+
.map(() => {
101+
const value = secureRandom()
102+
return { value: `value-${value}`, partition: ((i++) % 3) }
103+
})
104+
105+
await producer.send({ topic: topicName, messages })
106+
107+
// Wait for the messages.
108+
// We consume 1024*4 messages from partition 0, and 1024*3 from partition 1 and 2.
109+
await waitForMessages(messagesConsumed, { number: 1024 * 10 });
110+
111+
// We should not consume even one more message than that.
112+
await sleep(1000);
113+
expect(messagesConsumed.length).toEqual(1024 * 10);
114+
115+
// check if all offsets are present
116+
// partition 0
117+
expect(messagesConsumed.filter(m => m.partition === 0).map(m => m.message.offset))
118+
.toEqual(Array(1024 * 4).fill().map((_, i) => i < 1024 ? `${i}` : `${i - 1024}`));
119+
// partition 1
120+
expect(messagesConsumed.filter(m => m.partition === 1).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
121+
// partition 2
122+
expect(messagesConsumed.filter(m => m.partition === 2).map(m => m.message.offset)).toEqual(Array(1024 * 3).fill().map((_, i) => `${i}`));
123+
});
124+
125+
it('is cleared before rebalance', async () => {
126+
const consumer2 = createConsumer({
127+
groupId,
128+
maxWaitTimeInMs: 100,
129+
fromBeginning: true,
130+
});
131+
132+
await consumer.connect();
133+
await producer.connect();
134+
await consumer.subscribe({ topic: topicName })
135+
136+
const messagesConsumed = [];
137+
const messagesConsumedConsumer1 = [];
138+
const messagesConsumedConsumer2 = [];
139+
let consumer2ConsumeRunning = false;
140+
141+
consumer.run({
142+
eachMessage: async event => {
143+
messagesConsumed.push(event);
144+
messagesConsumedConsumer1.push(event);
145+
146+
/* Until the second consumer joins, consume messages slowly so as to not consume them all
147+
* before the rebalance triggers. */
148+
if (messagesConsumed.length > 1024 && !consumer2ConsumeRunning) {
149+
await sleep(10);
150+
}
151+
}
152+
});
153+
154+
/* Evenly distribute 1024*9 messages across 3 partitions */
155+
let i = 0;
156+
const messages = Array(1024 * 10)
157+
.fill()
158+
.map(() => {
159+
const value = secureRandom()
160+
return { value: `value-${value}`, partition: (i++) % 3 }
161+
})
162+
163+
await producer.send({ topic: topicName, messages })
164+
165+
// Wait for the messages - some of them, before starting the
166+
// second consumer.
167+
await waitForMessages(messagesConsumed, { number: 1024 });
168+
169+
await consumer2.connect();
170+
await consumer2.subscribe({ topic: topicName });
171+
consumer2.run({
172+
eachMessage: async event => {
173+
messagesConsumed.push(event);
174+
messagesConsumedConsumer2.push(event);
175+
}
176+
});
177+
178+
await waitFor(() => consumer2.assignment().length > 0, () => null);
179+
consumer2ConsumeRunning = true;
180+
181+
/* Now that both consumers have joined, wait for all msgs to be consumed */
182+
await waitForMessages(messagesConsumed, { number: 1024 * 10 });
183+
184+
/* No extra messages should be consumed. */
185+
await sleep(1000);
186+
expect(messagesConsumed.length).toEqual(1024 * 10);
187+
188+
/* Check if all messages were consumed. */
189+
expect(messagesConsumed.map(event => (+event.message.offset)).sort((a, b) => a - b))
190+
.toEqual(Array(1024 * 10).fill().map((_, i) => Math.floor(i / 3)));
191+
192+
/* Consumer2 should have consumed at least one message. */
193+
expect(messagesConsumedConsumer2.length).toBeGreaterThan(0);
194+
195+
await consumer2.disconnect();
196+
});
197+
198+
it('does not hold up polling', async () => {
199+
/* This consumer has a low max.poll.interval.ms */
200+
const impatientConsumer = createConsumer({
201+
groupId,
202+
maxWaitTimeInMs: 100,
203+
fromBeginning: true,
204+
rebalanceTimeout: 10000,
205+
sessionTimeout: 10000,
206+
clientId: "impatientConsumer",
207+
});
208+
209+
await producer.connect();
210+
await impatientConsumer.connect();
211+
await impatientConsumer.subscribe({ topic: topicName })
212+
213+
const messagesConsumed = [];
214+
let impatientConsumerMessages = [];
215+
let consumer1Messages = [];
216+
let consumer1TryingToJoin = false;
217+
218+
impatientConsumer.run({
219+
eachMessage: async event => {
220+
messagesConsumed.push(event);
221+
impatientConsumerMessages.push(event);
222+
/* When the second consumer is joining, deliberately slow down message consumption.
223+
* We should still have a rebalance very soon, since we must expire the cache and
224+
* trigger a rebalance before max.poll.interval.ms.
225+
*/
226+
if (consumer1TryingToJoin) {
227+
await sleep(1000);
228+
}
229+
}
230+
});
231+
232+
/* Distribute 1024*10 messages across 3 partitions */
233+
let i = 0;
234+
const messages = Array(1024 * 10)
235+
.fill()
236+
.map(() => {
237+
const value = secureRandom()
238+
return { value: `value-${value}`, partition: (i++) % 3 }
239+
})
240+
241+
await producer.send({ topic: topicName, messages })
242+
243+
/* Wait for the messages - some of them, before starting the
244+
* second consumer. */
245+
await waitForMessages(messagesConsumed, { number: 1024 });
246+
247+
await consumer.connect();
248+
await consumer.subscribe({ topic: topicName });
249+
consumer.run({
250+
eachMessage: async event => {
251+
messagesConsumed.push(event);
252+
consumer1Messages.push(event);
253+
}
254+
});
255+
consumer1TryingToJoin = true;
256+
await waitFor(() => consumer.assignment().length > 0, () => null);
257+
consumer1TryingToJoin = false;
258+
259+
/* Now that both consumers have joined, wait for all msgs to be consumed */
260+
await waitForMessages(messagesConsumed, { number: 1024 * 10 });
261+
262+
// No extra messages should be consumed.
263+
await sleep(1000);
264+
expect(messagesConsumed.length).toEqual(1024 * 10);
265+
266+
/* Each consumer should have consumed at least one message. */
267+
expect(consumer1Messages.length).toBeGreaterThan(0);
268+
expect(impatientConsumerMessages.length).toBeGreaterThan(0);
269+
270+
await impatientConsumer.disconnect();
271+
});
272+
});

0 commit comments

Comments
 (0)