Skip to content

Commit 938817b

Browse files
tunniclmMike Tunnicliffe
andauthored
217 fix super stream consumer property (#218)
* Set super-stream consumer property if needed When a stream consumer attaches to a stream that is part of a super stream it should set the "super-stream" property to the name of the super stream in order to ensure the partition index will be setup enabling the consumers to be balanced and rebalanced across the partitions. * Add test for partitioned super stream consumer * Fix typescript compiler and lint errors --------- Co-authored-by: Mike Tunnicliffe <[email protected]>
1 parent 2a4c663 commit 938817b

File tree

3 files changed

+168
-91
lines changed

3 files changed

+168
-91
lines changed

src/client.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,11 @@ export class Client {
184184
return res.ok
185185
}
186186

187-
public async declareConsumer(params: DeclareConsumerParams, handle: ConsumerFunc): Promise<Consumer> {
187+
public async declareConsumer(
188+
params: DeclareConsumerParams,
189+
handle: ConsumerFunc,
190+
superStreamConsumer?: SuperStreamConsumer
191+
): Promise<Consumer> {
188192
const connection = await this.getConnection(params.stream, "consumer", params.connectionClosedListener)
189193
const consumerId = connection.getNextConsumerId()
190194

@@ -211,7 +215,7 @@ export class Client {
211215
await this.closeConsumer(consumer.extendedId)
212216
})
213217
this.consumers.set(consumer.extendedId, { connection, consumer, params })
214-
await this.declareConsumerOnConnection(params, consumerId, connection)
218+
await this.declareConsumerOnConnection(params, consumerId, connection, superStreamConsumer?.superStream)
215219
this.logger.info(
216220
`New consumer created with stream name ${params.stream}, consumer id ${consumerId} and offset ${params.offset.type}`
217221
)
@@ -245,6 +249,7 @@ export class Client {
245249
): Promise<SuperStreamConsumer> {
246250
const partitions = await this.queryPartitions({ superStream })
247251
return SuperStreamConsumer.create(handle, {
252+
superStream,
248253
locator: this,
249254
consumerRef: consumerRef || `${superStream}-${randomUUID()}`,
250255
offset: offset || Offset.first(),
@@ -468,7 +473,12 @@ export class Client {
468473
}
469474
}
470475

471-
private async declareConsumerOnConnection(params: DeclareConsumerParams, consumerId: number, connection: Connection) {
476+
private async declareConsumerOnConnection(
477+
params: DeclareConsumerParams,
478+
consumerId: number,
479+
connection: Connection,
480+
superStream?: string
481+
) {
472482
const properties: Record<string, string> = {}
473483
if (params.singleActive && !params.consumerRef) {
474484
throw new Error("consumerRef is mandatory when declaring a single active consumer")
@@ -477,6 +487,9 @@ export class Client {
477487
properties["single-active-consumer"] = "true"
478488
properties["name"] = params.consumerRef!
479489
}
490+
if (superStream) {
491+
properties["super-stream"] = superStream
492+
}
480493
if (params.filter) {
481494
for (let i = 0; i < params.filter.values.length; i++) {
482495
properties[`filter.${i}`] = params.filter.values[i]

src/super_stream_consumer.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,22 @@ import { Offset } from "./requests/subscribe_request"
55
export class SuperStreamConsumer {
66
private consumers: Map<string, Consumer> = new Map<string, Consumer>()
77
public consumerRef: string
8+
readonly superStream: string
89
private locator: Client
910
private partitions: string[]
1011
private offset: Offset
1112

1213
private constructor(
1314
readonly handle: ConsumerFunc,
1415
params: {
16+
superStream: string
1517
locator: Client
1618
partitions: string[]
1719
consumerRef: string
1820
offset: Offset
1921
}
2022
) {
23+
this.superStream = params.superStream
2124
this.consumerRef = params.consumerRef
2225
this.locator = params.locator
2326
this.partitions = params.partitions
@@ -29,7 +32,8 @@ export class SuperStreamConsumer {
2932
this.partitions.map(async (p) => {
3033
const partitionConsumer = await this.locator.declareConsumer(
3134
{ stream: p, consumerRef: this.consumerRef, offset: this.offset, singleActive: true },
32-
this.handle
35+
this.handle,
36+
this
3337
)
3438
this.consumers.set(p, partitionConsumer)
3539
return
@@ -40,6 +44,7 @@ export class SuperStreamConsumer {
4044
static async create(
4145
handle: ConsumerFunc,
4246
params: {
47+
superStream: string
4348
locator: Client
4449
partitions: string[]
4550
consumerRef: string

test/e2e/superstream_consumer.test.ts

Lines changed: 146 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { expect } from "chai"
22
import { Client, Offset } from "../../src"
3-
import { Message } from "../../src/publisher"
3+
import { Message, MessageOptions } from "../../src/publisher"
44
import { range } from "../../src/util"
55
import { createClient, createStreamName } from "../support/fake_data"
66
import { Rabbit } from "../support/rabbit"
@@ -17,8 +17,6 @@ describe("super stream consumer", () => {
1717
beforeEach(async () => {
1818
client = await createClient(username, password)
1919
superStreamName = createStreamName()
20-
noOfPartitions = await rabbit.createSuperStream(superStreamName)
21-
sender = await messageSender(client, superStreamName)
2220
})
2321

2422
afterEach(async () => {
@@ -30,118 +28,161 @@ describe("super stream consumer", () => {
3028
} catch (e) {}
3129
})
3230

33-
it("querying partitions - return the same number of partitions", async () => {
34-
const partitions = await client.queryPartitions({ superStream: superStreamName })
35-
36-
expect(partitions.length).to.be.equal(noOfPartitions)
37-
})
38-
39-
it("querying partitions - return the name of the streams making up the superstream", async () => {
40-
const partitions = await client.queryPartitions({ superStream: superStreamName })
31+
describe("random partitioning", () => {
32+
beforeEach(async () => {
33+
noOfPartitions = await rabbit.createSuperStream(superStreamName)
34+
sender = await messageSender(client, superStreamName)
35+
})
4136

42-
expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
43-
})
37+
it("querying partitions - return the same number of partitions", async () => {
38+
const partitions = await client.queryPartitions({ superStream: superStreamName })
4439

45-
it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
46-
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
47-
return
40+
expect(partitions.length).to.be.equal(noOfPartitions)
4841
})
49-
})
5042

51-
it("declaring a super stream consumer on an existing super stream - read a message", async () => {
52-
await sender(1)
53-
const messages: Message[] = []
43+
it("querying partitions - return the name of the streams making up the superstream", async () => {
44+
const partitions = await client.queryPartitions({ superStream: superStreamName })
5445

55-
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
56-
messages.push(message)
46+
expect(range(noOfPartitions).map((i) => `${superStreamName}-${i}`)).to.deep.eq(partitions)
5747
})
5848

59-
await eventually(() => {
60-
expect(messages).to.have.length(1)
61-
const [message] = messages
62-
expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
49+
it("declaring a super stream consumer on an existing super stream - no error is thrown", async () => {
50+
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_message: Message) => {
51+
return
52+
})
6353
})
64-
})
6554

66-
it("for a consumer the number of connections should be equals to the partitions' number", async () => {
67-
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
68-
return
69-
})
55+
it("declaring a super stream consumer on an existing super stream - read a message", async () => {
56+
await sender(1)
57+
const messages: Message[] = []
7058

71-
await eventually(() => {
72-
expect(client.consumerCounts()).to.be.eql(noOfPartitions)
59+
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
60+
messages.push(message)
61+
})
62+
63+
await eventually(() => {
64+
expect(messages).to.have.length(1)
65+
const [message] = messages
66+
expect(message.content.toString()).to.be.eq(`${testMessageContent}-0`)
67+
})
7368
})
74-
})
7569

76-
it("reading multiple messages - each message should be read only once", async () => {
77-
const noOfMessages = 20
78-
await sender(noOfMessages)
79-
const messages: Message[] = []
70+
it("for a consumer the number of connections should be equals to the partitions' number", async () => {
71+
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
72+
return
73+
})
8074

81-
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
82-
messages.push(message)
75+
await eventually(() => {
76+
expect(client.consumerCounts()).to.be.eql(noOfPartitions)
77+
})
8378
})
8479

85-
await eventually(() => {
86-
expect(messages).to.have.length(noOfMessages)
80+
it("reading multiple messages - each message should be read only once", async () => {
81+
const noOfMessages = 20
82+
await sender(noOfMessages)
83+
const messages: Message[] = []
84+
85+
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (message: Message) => {
86+
messages.push(message)
87+
})
88+
89+
await eventually(() => {
90+
expect(messages).to.have.length(noOfMessages)
91+
})
8792
})
88-
})
8993

90-
it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
91-
const noOfMessages = 20
92-
const messages: Message[] = []
94+
it("multiple composite consumers with same consumerRef - each message should be read only once", async () => {
95+
const noOfMessages = 20
96+
const messages: Message[] = []
9397

94-
await client.declareSuperStreamConsumer(
95-
{ superStream: superStreamName, consumerRef: "counting-messages" },
96-
(message: Message) => messages.push(message)
97-
)
98-
await client.declareSuperStreamConsumer(
99-
{ superStream: superStreamName, consumerRef: "counting-messages" },
100-
(message: Message) => messages.push(message)
101-
)
98+
await client.declareSuperStreamConsumer(
99+
{ superStream: superStreamName, consumerRef: "counting-messages" },
100+
(message: Message) => messages.push(message)
101+
)
102+
await client.declareSuperStreamConsumer(
103+
{ superStream: superStreamName, consumerRef: "counting-messages" },
104+
(message: Message) => messages.push(message)
105+
)
102106

103-
await sender(noOfMessages)
107+
await sender(noOfMessages)
104108

105-
await eventually(() => {
106-
expect(messages).to.have.length(noOfMessages)
109+
await eventually(() => {
110+
expect(messages).to.have.length(noOfMessages)
111+
})
107112
})
113+
114+
it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
115+
const noOfMessages = 20
116+
await sender(5)
117+
const sleepingTime = 5000
118+
await sleep(sleepingTime)
119+
await sender(noOfMessages)
120+
const messages: Message[] = []
121+
122+
await client.declareSuperStreamConsumer(
123+
{
124+
superStream: superStreamName,
125+
offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
126+
},
127+
(message: Message) => {
128+
messages.push(message)
129+
}
130+
)
131+
132+
await eventually(() => {
133+
expect(messages).to.have.length(noOfMessages)
134+
})
135+
}).timeout(10000)
136+
137+
it("closing the locator closes all connections", async () => {
138+
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
139+
return
140+
})
141+
142+
await client.close()
143+
144+
await eventually(async () => {
145+
const connections = await rabbit.getConnections()
146+
expect(connections).to.have.length(0)
147+
}, 5000)
148+
}).timeout(5000)
108149
})
109150

110-
it("reading multiple messages - get messages only at a specific consuming point timestamp", async () => {
111-
const noOfMessages = 20
112-
await sender(5)
113-
const sleepingTime = 5000
114-
await sleep(sleepingTime)
115-
await sender(noOfMessages)
116-
const messages: Message[] = []
117-
118-
await client.declareSuperStreamConsumer(
119-
{
120-
superStream: superStreamName,
121-
offset: Offset.timestamp(new Date(Date.now() - (sleepingTime - 1000))),
122-
},
123-
(message: Message) => {
124-
messages.push(message)
151+
describe("deterministic partitioning", () => {
152+
beforeEach(async () => {
153+
noOfPartitions = await rabbit.createSuperStream(superStreamName, 2)
154+
sender = await roundRobinSender(client, superStreamName, 2)
155+
})
156+
157+
it("multiple composite consumers with same consumerRef and deterministic partition key - each consumer should read only messages for its partition", async () => {
158+
const noOfMessagesPerPartition = 10
159+
const messages: Message[][] = [[], []]
160+
161+
const allPartitionKeysAreTheSame = (messagesToCheck: Message[]) => {
162+
const partitionKeys = messagesToCheck.map((m) => m.applicationProperties?.["partition-key"])
163+
return partitionKeys.every((k) => k === partitionKeys[0])
125164
}
126-
)
127165

128-
await eventually(() => {
129-
expect(messages).to.have.length(noOfMessages)
130-
})
131-
}).timeout(10000)
166+
await client.declareSuperStreamConsumer(
167+
{ superStream: superStreamName, consumerRef: "message-partitioning" },
168+
(message: Message) => messages[0].push(message)
169+
)
170+
await client.declareSuperStreamConsumer(
171+
{ superStream: superStreamName, consumerRef: "message-partitioning" },
172+
(message: Message) => messages[1].push(message)
173+
)
132174

133-
it("closing the locator closes all connections", async () => {
134-
await client.declareSuperStreamConsumer({ superStream: superStreamName }, (_) => {
135-
return
136-
})
175+
await sender(noOfMessagesPerPartition)
137176

138-
await client.close()
177+
await eventually(() => {
178+
expect(messages[0]).to.have.length(noOfMessagesPerPartition)
179+
expect(allPartitionKeysAreTheSame(messages[0]))
139180

140-
await eventually(async () => {
141-
const connections = await rabbit.getConnections()
142-
expect(connections).to.have.length(0)
143-
}, 5000)
144-
}).timeout(5000)
181+
expect(messages[1]).to.have.length(noOfMessagesPerPartition)
182+
expect(allPartitionKeysAreTheSame(messages[1]))
183+
})
184+
})
185+
})
145186
})
146187

147188
const testMessageContent = "test message"
@@ -158,6 +199,24 @@ const messageSender = async (client: Client, superStreamName: string) => {
158199
return sendMessages
159200
}
160201

202+
const roundRobinSender = async (client: Client, superStreamName: string, partitions: number) => {
203+
const routingKeyExtractor = (_content: string, msgOptions: MessageOptions) =>
204+
msgOptions.applicationProperties?.["partition-key"]?.toString()
205+
const publisher = await client.declareSuperStreamPublisher({ superStream: superStreamName }, routingKeyExtractor)
206+
207+
const sendMessages = async (noOfMessagesPerPartition: number) => {
208+
for (let i = 0; i < noOfMessagesPerPartition; i++) {
209+
for (let p = 0; p < partitions; p++) {
210+
await publisher.send(Buffer.from(`${testMessageContent}-${i * partitions + p}`), {
211+
applicationProperties: { "partition-key": `${p}` },
212+
})
213+
}
214+
}
215+
}
216+
217+
return sendMessages
218+
}
219+
161220
const sleep = (ms: number) => {
162221
return new Promise((res) => {
163222
setTimeout(() => {

0 commit comments

Comments
 (0)