Skip to content

Commit 948b351

Browse files
magnedc1992
authored andcommitted
chore: fix offset tracking example
1 parent 46b2d2d commit 948b351

File tree

1 file changed

+33
-28
lines changed

1 file changed

+33
-28
lines changed

example/src/offset_tracking_example.js

Lines changed: 33 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,43 +17,48 @@ async function main() {
1717
})
1818
await client.createStream({ stream: streamName, arguments: {} })
1919
const publisher = await client.declarePublisher({ stream: streamName })
20-
const messageCount = 100
20+
const toSend = 100
2121

22-
console.log(`Publishing ${messageCount} messages`)
23-
for (let i = 0; i < messageCount; i++) {
24-
const body = i === messageCount - 1 ? "marker" : `hello ${i}`
22+
console.log(`Publishing ${toSend} messages`)
23+
for (let i = 0; i < toSend; i++) {
24+
const body = i === toSend - 1 ? "marker" : `hello ${i}`
2525
await publisher.send(Buffer.from(body))
2626
}
2727

28-
const startFrom = rabbit.Offset.offset(0n)
29-
let firstOffset = startFrom.value
30-
let lastOffset = startFrom.value
31-
let messageReceivedCount = 0
3228
const consumerRef = "offset-tracking-tutorial"
33-
const consumer = await client.declareConsumer({ stream: streamName, offset: startFrom, consumerRef }, (message) => {
34-
messageReceivedCount++
35-
if (message.offset === startFrom.value) {
36-
console.log("First message received")
37-
firstOffset = message.offset
38-
}
39-
if (messageReceivedCount % 10 === 0) {
40-
console.log("Storing offset")
41-
client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset })
42-
}
43-
if (message.content.toString() === "marker") {
44-
console.log("Marker found")
45-
client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset })
46-
lastOffset = message.offset
29+
let firstOffset = undefined
30+
let offsetSpecification = rabbit.Offset.first()
31+
try {
32+
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName })
33+
offsetSpecification = rabbit.Offset.offset(offset + 1n)
34+
} catch (e) {}
35+
36+
let lastOffset = offsetSpecification.value
37+
let messageCount = 0
38+
const consumer = await client.declareConsumer(
39+
{ stream: streamName, offset: offsetSpecification, consumerRef },
40+
async (message) => {
41+
messageCount++
42+
if (!firstOffset && messageCount === 1) {
43+
firstOffset = message.offset
44+
console.log("First message received")
45+
}
46+
if (messageCount % 10 === 0) {
47+
await consumer.storeOffset(message.offset)
48+
}
49+
if (message.content.toString() === "marker") {
50+
console.log("Marker found")
51+
lastOffset = message.offset
52+
await consumer.storeOffset(message.offset)
53+
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`)
54+
await consumer.close(true)
55+
process.exit(0)
56+
}
4757
}
48-
})
58+
)
4959

5060
console.log(`Start consuming...`)
5161
await sleep(2000)
52-
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`)
53-
const lastStoredOffset = await consumer.queryOffset()
54-
console.log(`Last stored offset was ${lastStoredOffset}`)
55-
56-
await client.close()
5762
}
5863

5964
main()

0 commit comments

Comments
 (0)