Skip to content

Commit 435871c

Browse files
Some improvement suggestions on Offset manipulation for consumers (coders51#270)
* base offset management * adds test of the proposed solution
1 parent 2d7185f commit 435871c

File tree

5 files changed

+116
-12
lines changed

5 files changed

+116
-12
lines changed

src/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ export class Client {
407407
await connection.restart()
408408
}
409409
uniqueConnectionIds.add(connection.connectionId)
410-
const consumerParams = { ...params, offset: consumer.localOffset }
410+
const consumerParams = { ...params, offset: Offset.offset(consumer.getOffset()) }
411411
await this.declareConsumerOnConnection(consumerParams, consumer.consumerId, connection)
412412
}
413413

src/consumer.ts

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ export interface Consumer {
2323
/**
2424
* Store the stream offset on the server
2525
*
26-
* @param {bigint} offsetValue - The value of the offset to save
26+
* @param {bigint} offsetValue - The value of the offset to save, if not specified the local offset is used
2727
*/
28-
storeOffset(offsetValue: bigint): Promise<void>
28+
storeOffset(offsetValue?: bigint): Promise<void>
2929

3030
/**
3131
* Get the saved offset on the server
@@ -34,6 +34,11 @@ export interface Consumer {
3434
*/
3535
queryOffset(): Promise<bigint>
3636

37+
/**
38+
* Get the stream local offset
39+
*/
40+
getOffset(): bigint
41+
3742
/**
3843
* Gets the infos of the publisher's connection
3944
*
@@ -102,25 +107,26 @@ export class StreamConsumer implements Consumer {
102107
await this.pool.releaseConnection(this.connection)
103108
}
104109

105-
public storeOffset(offsetValue: bigint): Promise<void> {
110+
public storeOffset(offsetValue?: bigint): Promise<void> {
106111
if (!this.consumerRef) throw new Error("ConsumerReference must be defined in order to use this!")
107-
return this.connection.storeOffset({ stream: this.stream, reference: this.consumerRef, offsetValue })
112+
const offset = offsetValue ? offsetValue : this.clientLocalOffset.value ?? 0n
113+
return this.connection.storeOffset({ stream: this.stream, reference: this.consumerRef, offsetValue: offset })
108114
}
109115

110116
public queryOffset(): Promise<bigint> {
111117
if (!this.consumerRef) throw new Error("ConsumerReference must be defined in order to use this!")
112118
return this.connection.queryOffset({ stream: this.stream, reference: this.consumerRef })
113119
}
114120

121+
getOffset(): bigint {
122+
return this.clientLocalOffset.value ?? 0n
123+
}
124+
115125
public getConnectionInfo(): ConnectionInfo {
116126
const { host, port, id, readable, localPort, ready, vhost } = this.connection.getConnectionInfo()
117127
return { host, port, id, readable, localPort, ready, vhost }
118128
}
119129

120-
public get localOffset() {
121-
return this.clientLocalOffset.clone()
122-
}
123-
124130
public async handle(message: Message) {
125131
if (this.closed || this.isMessageOffsetLessThanConsumers(message)) return
126132
await this.consumerHandle(message)

test/e2e/consumer_offset.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
import { expect } from "chai"
2+
import { Offset } from "../../src"
3+
import { Message } from "../../src/publisher"
4+
import { createClient, createStreamName } from "../support/fake_data"
5+
import { eventually, password, username } from "../support/util"
6+
7+
describe("Consumer Offset", () => {
8+
it("test start and stop", async () => {
9+
const client = await createClient(username, password)
10+
11+
const streamName = createStreamName()
12+
await client.createStream({ stream: streamName, arguments: {} })
13+
14+
let onIncomingMessageCalls = 0
15+
const onIncomingMessage = async (msg: Message) => {
16+
console.log(msg.content.toString("utf-8"))
17+
console.log(msg.offset)
18+
onIncomingMessageCalls++
19+
return
20+
}
21+
const referenceName = "ref"
22+
const consumer = await client.declareConsumer(
23+
{
24+
stream: streamName,
25+
offset: Offset.offset(0n),
26+
consumerRef: referenceName,
27+
},
28+
onIncomingMessage
29+
)
30+
const publisher = await client.declarePublisher({ stream: streamName })
31+
await publisher.send(Buffer.from("Hello1"))
32+
await eventually(async () => {
33+
expect(onIncomingMessageCalls).to.eql(1)
34+
})
35+
36+
const localOffset = consumer.getOffset()
37+
if (localOffset === undefined) {
38+
throw new Error("localOffset is undefined")
39+
}
40+
41+
// Perhaps there may be an option to upload the offset to the server directly from the consumer's internal store? Instead of having to fetch for it and then retrieve it
42+
await consumer.storeOffset(localOffset)
43+
await consumer.close(false)
44+
45+
await publisher.send(Buffer.from("Hello2"))
46+
await publisher.send(Buffer.from("Hello3"))
47+
48+
const lastMessageOffset = await client.queryOffset({
49+
stream: streamName,
50+
reference: referenceName,
51+
})
52+
expect(lastMessageOffset).to.eql(0n)
53+
54+
let resumedOnIncomingMessageCalls = 0
55+
let offset: bigint | undefined = 0n
56+
const resumedOnIncomingMessage = async (msg: Message) => {
57+
console.log("Resumed ", msg.content.toString("utf-8"))
58+
offset = msg.offset
59+
resumedOnIncomingMessageCalls++
60+
return
61+
}
62+
63+
const resumedConsumer = await client.declareConsumer(
64+
{
65+
stream: streamName,
66+
offset: Offset.offset(lastMessageOffset + 1n),
67+
consumerRef: referenceName,
68+
},
69+
resumedOnIncomingMessage
70+
)
71+
await eventually(async () => {
72+
expect(resumedOnIncomingMessageCalls).to.eql(2)
73+
})
74+
expect(resumedConsumer.getOffset()).to.eql(offset)
75+
76+
await publisher.close(false)
77+
})
78+
})

test/e2e/declare_consumer.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,7 @@ describe("declare consumer", () => {
239239
async (message: Message) => {
240240
messagesFromFirstConsumer.push(`Message ${message.content.toString("utf-8")} from ${consumerRef}`)
241241
if (messagesFromFirstConsumer.length === 50) {
242-
await consumer1.storeOffset(message.offset!)
242+
await consumer1.storeOffset(message.offset)
243243
}
244244
}
245245
)

test/e2e/offset.test.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ describe("offset", () => {
179179
const consumer = await client.declareConsumer(
180180
{ stream: testStreamName, consumerRef: "my consumer", offset: Offset.next() },
181181
async (message: Message) => {
182-
await consumer.storeOffset(message.offset!)
182+
await consumer.storeOffset(message.offset)
183183
offset = message.offset!
184184
}
185185
)
@@ -194,6 +194,26 @@ describe("offset", () => {
194194
})
195195
}).timeout(10000)
196196

197+
it("saving the offset of a stream correctly without specifying it ", async () => {
198+
let offset: bigint = 0n
199+
const consumer = await client.declareConsumer(
200+
{ stream: testStreamName, consumerRef: "my consumer", offset: Offset.next() },
201+
async (message: Message) => {
202+
offset = message.offset!
203+
}
204+
)
205+
const publisher = await client.declarePublisher({ stream: testStreamName })
206+
207+
await publisher.send(Buffer.from("hello"))
208+
await publisher.send(Buffer.from("world"))
209+
210+
await eventually(async () => {
211+
await consumer.storeOffset()
212+
const result = await consumer.queryOffset()
213+
expect(result).eql(offset)
214+
})
215+
}).timeout(10000)
216+
197217
it("declaring a consumer without consumerRef and saving the store offset should rise an error", async () => {
198218
const consumer = await client.declareConsumer(
199219
{ stream: testStreamName, offset: Offset.first() },
@@ -224,7 +244,7 @@ describe("offset", () => {
224244
async (message: Message) => {
225245
consumerOneMessages.push(message)
226246
if (message.content.toString() === "marker") {
227-
await consumer.storeOffset(message.offset!)
247+
await consumer.storeOffset(message.offset)
228248
}
229249
}
230250
)

0 commit comments

Comments
 (0)