Skip to content

Commit 236268a

Browse files
author
magne
committed
chore: add an offset tracking example
1 parent 94bc290 commit 236268a

File tree

1 file changed

+68
-0
lines changed

1 file changed

+68
-0
lines changed
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
const rabbit = require("rabbitmq-stream-js-client")
2+
const { randomUUID } = require("crypto")
3+
4+
const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
5+
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"
6+
7+
async function main() {
8+
const streamName = `example-${randomUUID()}`
9+
console.log(`Creating stream ${streamName}`)
10+
11+
const client = await rabbit.connect({
12+
hostname: "localhost",
13+
port: 5552,
14+
username: rabbitUser,
15+
password: rabbitPassword,
16+
vhost: "/",
17+
heartbeat: 0,
18+
})
19+
await client.createStream({ stream: streamName, arguments: {} })
20+
const publisher = await client.declarePublisher({ stream: streamName })
21+
const totalMessages = 100
22+
23+
console.log(`Publishing ${totalMessages} messages`)
24+
for (let i = 0; i < totalMessages; i++) {
25+
const messageBody = i === totalMessages - 1 ? "marker" : `hello ${i}`
26+
await publisher.send(Buffer.from(messageBody))
27+
}
28+
29+
let initialOffset = rabbit.Offset.offset(0n)
30+
let firstOffset = initialOffset.value
31+
let lastOffset = initialOffset.value
32+
let messageCount = 0
33+
const consumerRef = "offset-tracking-consumer"
34+
const consumer = await client.declareConsumer(
35+
{ stream: streamName, offset: initialOffset, consumerRef },
36+
(message) => {
37+
messageCount++
38+
if (message.offset === initialOffset.value) {
39+
console.log("First message received")
40+
}
41+
if (messageCount % 10 === 0) {
42+
console.log("Storing offset")
43+
client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset })
44+
}
45+
if (message.content.toString() === "marker") {
46+
console.log("Marker found")
47+
client.storeOffset({ stream: streamName, reference: consumerRef, offsetValue: message.offset })
48+
lastOffset = message.offset
49+
}
50+
}
51+
)
52+
53+
console.log(`Start consuming...`)
54+
await sleep(2000)
55+
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`)
56+
const lastStoredOffset = await consumer.queryOffset()
57+
console.log(`Last stored offset was ${lastStoredOffset}`)
58+
59+
await client.close()
60+
}
61+
62+
main()
63+
.then(() => console.log("done!"))
64+
.catch((res) => {
65+
console.log("ERROR ", res)
66+
process.exit(-1)
67+
})
68+
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))

0 commit comments

Comments
 (0)