Skip to content

Commit 6ef1c96

Browse files
l4mbymagne
andauthored
wip: single active consumer offset tracking (coders51#247)
* wip: single active consumer offset tracking * feat: add callback to customize single active consumer beahviour during activation * chore: update jsdoc on consumer interface * chore: fix example * chore: add error log and update readme * chore: update stream name in example --------- Co-authored-by: magne <[email protected]>
1 parent be82855 commit 6ef1c96

File tree

6 files changed

+203
-5
lines changed

6 files changed

+203
-5
lines changed

README.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,25 @@ const consumer = await client.declareConsumer(consumerOptions, (message: Message
245245
// ...
246246
```
247247

248+
Optionally a single active consumer can have a callback which returns an offset that will be used once the consumer becomes active
249+
250+
```typescript
251+
const consumerOptions = {
252+
stream: "stream-name",
253+
offset: Offset.next(),
254+
singleActive: true,
255+
consumerRef: "my-consumer-ref",
256+
consumerUpdateListener: async (consumerReference, streamName) => {
257+
const offset = await client.queryOffset({ reference: consumerReference, stream: streamName })
258+
return rabbit.Offset.offset(offset)
259+
},
260+
}
261+
262+
// ...
263+
```
264+
265+
Check `example/single_active_consumer_update_example.js` for a basic usage example.
266+
248267
### Custom Policy
249268

250269
By default the client uses the `creditsOnChunkCompleted(1, 1)` policy. This policy grants that messages will be processed in order, as a new chunk will only be requested once the current chunk has been processed. It is possible to override this policy by passing `creditPolicy` to the consumer options. Be aware that modifying this policy can lead to out-of-order message processing.

example/package-lock.json

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
const rabbit = require("rabbitmq-stream-js-client")
2+
const crypto = require("crypto")
3+
4+
const wait = (ms) => new Promise((r) => setTimeout(r, ms))
5+
6+
async function main() {
7+
const messagesFromFirstConsumer = []
8+
const messagesFromSecondConsumer = []
9+
10+
console.log("Connecting...")
11+
const client = await rabbit.connect({
12+
vhost: "/",
13+
port: 5552,
14+
hostname: "localhost",
15+
username: "rabbit",
16+
password: "rabbit",
17+
})
18+
19+
console.log("Making sure the stream exists...")
20+
const streamName = "active-consumer-switch-on-single-active-consumer"
21+
await client.createStream({ stream: streamName, arguments: {} })
22+
const consumerRef = `my-consumer-${crypto.randomUUID()}`
23+
24+
console.log("Creating the publisher and sending 100 messages...")
25+
const publisher = await client.declarePublisher({ stream: streamName })
26+
for (let i = 1; i <= 100; i++) {
27+
await publisher.send(Buffer.from(`${i}`))
28+
}
29+
30+
console.log("Creating the first consumer, when 50 messages are consumed it saves the offset on the server...")
31+
const consumer1 = await client.declareConsumer(
32+
{
33+
stream: streamName,
34+
offset: rabbit.Offset.first(),
35+
singleActive: true,
36+
consumerRef: consumerRef,
37+
},
38+
async (message) => {
39+
messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
40+
if (messagesFromFirstConsumer.length === 50) {
41+
await consumer1.storeOffset(message.offset)
42+
}
43+
}
44+
)
45+
46+
await wait(500)
47+
48+
console.log("Creating the second consumer, when it becomes active it resumes from the stored offset on the server...")
49+
await client.declareConsumer(
50+
{
51+
stream: streamName,
52+
offset: rabbit.Offset.first(),
53+
singleActive: true,
54+
consumerRef: consumerRef,
55+
// This callback is executed when the consumer becomes active
56+
consumerUpdateListener: async (consumerReference, streamName) => {
57+
const offset = await client.queryOffset({ reference: consumerReference, stream: streamName })
58+
return rabbit.Offset.offset(offset)
59+
},
60+
},
61+
(message) => {
62+
messagesFromSecondConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
63+
}
64+
)
65+
66+
console.log("Closing the first consumer to trigger the activation of the second one...")
67+
await client.closeConsumer(consumer1.extendedId)
68+
69+
await wait(500)
70+
71+
console.log(`Messages consumed by the first consumer: ${messagesFromFirstConsumer.length}`)
72+
console.log(`Messages consumed by the second consumer: ${messagesFromSecondConsumer.length}`)
73+
}
74+
75+
main()
76+
.then(() => {
77+
console.log("done!")
78+
process.exit(0)
79+
})
80+
.catch((res) => {
81+
console.log("Error in publishing message!", res)
82+
process.exit(-1)
83+
})

src/client.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { inspect } from "util"
44
import { Compression, CompressionType, GzipCompression, NoneCompression } from "./compression"
55
import { Connection, ConnectionInfo, ConnectionParams, errorMessageOf } from "./connection"
66
import { ConnectionPool, ConnectionPurpose } from "./connection_pool"
7-
import { Consumer, ConsumerFunc, StreamConsumer, computeExtendedConsumerId } from "./consumer"
7+
import { Consumer, ConsumerFunc, ConsumerUpdateListener, StreamConsumer, computeExtendedConsumerId } from "./consumer"
88
import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
99
import { Logger, NullLogger } from "./logger"
1010
import { FilterFunc, Message, Publisher, StreamPublisher } from "./publisher"
@@ -212,6 +212,8 @@ export class Client {
212212
consumerTag: params.consumerTag,
213213
offset: params.offset,
214214
creditPolicy: params.creditPolicy,
215+
singleActive: params.singleActive,
216+
consumerUpdateListener: params.consumerUpdateListener,
215217
},
216218
params.filter
217219
)
@@ -589,13 +591,31 @@ export class Client {
589591
this.logger.error(`On consumer_update_query no consumer found`)
590592
return
591593
}
594+
const offset = await this.getConsumerOrServerSavedOffset(consumer)
595+
consumer.updateConsumerOffset(offset)
592596
this.logger.debug(`on consumer_update_query -> ${consumer.consumerRef}`)
593597
await connection.send(
594-
new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset: consumer.offset })
598+
new ConsumerUpdateResponse({ correlationId: response.correlationId, responseCode: 1, offset })
595599
)
596600
}
597601
}
598602

603+
private async getConsumerOrServerSavedOffset(consumer: StreamConsumer) {
604+
if (consumer.isSingleActive && consumer.consumerRef && consumer.consumerUpdateListener) {
605+
try {
606+
const offset = await consumer.consumerUpdateListener(consumer.consumerRef, consumer.streamName)
607+
return offset
608+
} catch (error) {
609+
this.logger.error(
610+
`Error in consumerUpdateListener for consumerRef ${consumer.consumerRef}: ${(error as Error).message}`
611+
)
612+
return consumer.offset
613+
}
614+
}
615+
616+
return consumer.offset
617+
}
618+
599619
private getLocatorConnection() {
600620
const connectionParams = this.buildConnectionParams(false, "", this.params.listeners?.connection_closed)
601621
return Connection.create(connectionParams, this.logger)
@@ -790,6 +810,7 @@ export interface DeclareConsumerParams {
790810
consumerRef?: string
791811
offset: Offset
792812
connectionClosedListener?: ConnectionClosedListener
813+
consumerUpdateListener?: ConsumerUpdateListener
793814
singleActive?: boolean
794815
filter?: ConsumerFilter
795816
creditPolicy?: ConsumerCreditPolicy

src/consumer.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Message } from "./publisher"
66
import { Offset } from "./requests/subscribe_request"
77

88
export type ConsumerFunc = (message: Message) => Promise<void> | void
9+
export type ConsumerUpdateListener = (consumerRef: string, streamName: string) => Promise<Offset>
910
export const computeExtendedConsumerId = (consumerId: number, connectionId: string) => {
1011
return `${consumerId}@${connectionId}`
1112
}
@@ -40,6 +41,13 @@ export interface Consumer {
4041
*/
4142
getConnectionInfo(): ConnectionInfo
4243

44+
/**
45+
* Updates the offset of the consumer instance
46+
*
47+
* @param {Offset} offset - The new offset to set
48+
*/
49+
updateConsumerOffset(offset: Offset): void
50+
4351
consumerId: number
4452
consumerRef?: string
4553
readonly extendedId: string
@@ -52,10 +60,12 @@ export class StreamConsumer implements Consumer {
5260
public consumerRef?: string
5361
public consumerTag?: string
5462
public offset: Offset
63+
public consumerUpdateListener?: ConsumerUpdateListener
5564
private clientLocalOffset: Offset
5665
private creditsHandler: ConsumerCreditPolicy
5766
private consumerHandle: ConsumerFunc
5867
private closed: boolean
68+
private singleActive: boolean = false
5969

6070
constructor(
6171
handle: ConsumerFunc,
@@ -67,6 +77,8 @@ export class StreamConsumer implements Consumer {
6777
consumerTag?: string
6878
offset: Offset
6979
creditPolicy?: ConsumerCreditPolicy
80+
singleActive?: boolean
81+
consumerUpdateListener?: ConsumerUpdateListener
7082
},
7183
readonly filter?: ConsumerFilter
7284
) {
@@ -79,7 +91,9 @@ export class StreamConsumer implements Consumer {
7991
this.connection.incrRefCount()
8092
this.creditsHandler = params.creditPolicy || defaultCreditPolicy
8193
this.consumerHandle = handle
94+
this.consumerUpdateListener = params.consumerUpdateListener
8295
this.closed = false
96+
this.singleActive = params.singleActive ?? false
8397
}
8498

8599
async close(manuallyClose: boolean): Promise<void> {
@@ -127,6 +141,15 @@ export class StreamConsumer implements Consumer {
127141
return this.creditsHandler
128142
}
129143

144+
public get isSingleActive() {
145+
return this.singleActive
146+
}
147+
148+
public updateConsumerOffset(offset: Offset) {
149+
this.offset = offset.clone()
150+
this.clientLocalOffset = offset.clone()
151+
}
152+
130153
private maybeUpdateLocalOffset(message: Message) {
131154
if (message.offset !== undefined) this.clientLocalOffset = Offset.offset(message.offset)
132155
}

test/e2e/declare_consumer.test.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
getTestNodesFromEnv,
3131
password,
3232
username,
33+
wait,
3334
waitSleeping,
3435
} from "../support/util"
3536
import { Connection, Channel } from "amqplib"
@@ -216,6 +217,57 @@ describe("declare consumer", () => {
216217
await eventually(() => expect(messages).eql([Buffer.from("hello"), Buffer.from("hello")]))
217218
}).timeout(10000)
218219

220+
it("declaring two single active consumer on an existing stream - after closing one consumer the active one can resume the consuming from the last saved offset on the server", async () => {
221+
const messagesFromFirstConsumer: string[] = []
222+
const messagesFromSecondConsumer: string[] = []
223+
const consumerRef = createConsumerRef()
224+
for (let i = 1; i <= 100; i++) {
225+
await publisher.send(Buffer.from(`${i}`))
226+
}
227+
228+
const consumer1 = await client.declareConsumer(
229+
{
230+
stream: streamName,
231+
offset: Offset.first(),
232+
singleActive: true,
233+
consumerRef: consumerRef,
234+
consumerUpdateListener: async (cr: string, sn: string) => {
235+
const offset = await client.queryOffset({ reference: cr, stream: sn })
236+
return Offset.offset(offset)
237+
},
238+
},
239+
async (message: Message) => {
240+
messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
241+
if (messagesFromFirstConsumer.length === 50) {
242+
await consumer1.storeOffset(message.offset!)
243+
}
244+
}
245+
)
246+
await wait(500)
247+
await client.declareConsumer(
248+
{
249+
stream: streamName,
250+
offset: Offset.first(),
251+
singleActive: true,
252+
consumerRef: consumerRef,
253+
consumerUpdateListener: async (cr: string, sn: string) => {
254+
const offset = await client.queryOffset({ reference: cr, stream: sn })
255+
return Offset.offset(offset)
256+
},
257+
},
258+
(message: Message) => {
259+
messagesFromSecondConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
260+
}
261+
)
262+
await client.closeConsumer(consumer1.extendedId)
263+
await wait(500)
264+
265+
await eventually(() => {
266+
expect(messagesFromSecondConsumer.find((m) => m === `Message 50 from ${consumerRef}`)).to.not.be.undefined
267+
expect(messagesFromSecondConsumer.find((m) => m === `Message 49 from ${consumerRef}`)).to.be.undefined
268+
}, 8000)
269+
}).timeout(20000)
270+
219271
it("declaring a single active consumer without reference on an existing stream - should throw an error", async () => {
220272
const messages: Buffer[] = []
221273

0 commit comments

Comments
 (0)