Skip to content

Commit 2a4c663

Browse files
l4mbymagne
andauthored
chore: add an offset tracking example (#205)
* chore: add an offset tracking example * chore: pr fixes * chore: changing variables names * chore: fix offset tracking example --------- Co-authored-by: magne <[email protected]>
1 parent aa4b2a4 commit 2a4c663

File tree

1 file changed

+70
-0
lines changed

1 file changed

+70
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
const rabbit = require("rabbitmq-stream-js-client")
2+
3+
const rabbitUser = process.env.RABBITMQ_USER || "rabbit"
4+
const rabbitPassword = process.env.RABBITMQ_PASSWORD || "rabbit"
5+
6+
async function main() {
7+
const streamName = `stream-offset-tracking-javascript`
8+
console.log(`Creating stream ${streamName}`)
9+
10+
const client = await rabbit.connect({
11+
hostname: "localhost",
12+
port: 5552,
13+
username: rabbitUser,
14+
password: rabbitPassword,
15+
vhost: "/",
16+
heartbeat: 0,
17+
})
18+
await client.createStream({ stream: streamName, arguments: {} })
19+
const publisher = await client.declarePublisher({ stream: streamName })
20+
const toSend = 100
21+
22+
console.log(`Publishing ${toSend} messages`)
23+
for (let i = 0; i < toSend; i++) {
24+
const body = i === toSend - 1 ? "marker" : `hello ${i}`
25+
await publisher.send(Buffer.from(body))
26+
}
27+
28+
const consumerRef = "offset-tracking-tutorial"
29+
let firstOffset = undefined
30+
let offsetSpecification = rabbit.Offset.first()
31+
try {
32+
const offset = await client.queryOffset({ reference: consumerRef, stream: streamName })
33+
offsetSpecification = rabbit.Offset.offset(offset + 1n)
34+
} catch (e) {}
35+
36+
let lastOffset = offsetSpecification.value
37+
let messageCount = 0
38+
const consumer = await client.declareConsumer(
39+
{ stream: streamName, offset: offsetSpecification, consumerRef },
40+
async (message) => {
41+
messageCount++
42+
if (!firstOffset && messageCount === 1) {
43+
firstOffset = message.offset
44+
console.log("First message received")
45+
}
46+
if (messageCount % 10 === 0) {
47+
await consumer.storeOffset(message.offset)
48+
}
49+
if (message.content.toString() === "marker") {
50+
console.log("Marker found")
51+
lastOffset = message.offset
52+
await consumer.storeOffset(message.offset)
53+
console.log(`Done consuming, first offset was ${firstOffset}, last offset was ${lastOffset}`)
54+
await consumer.close(true)
55+
process.exit(0)
56+
}
57+
}
58+
)
59+
60+
console.log(`Start consuming...`)
61+
await sleep(2000)
62+
}
63+
64+
main()
65+
.then(() => console.log("done!"))
66+
.catch((res) => {
67+
console.log("ERROR ", res)
68+
process.exit(-1)
69+
})
70+
const sleep = (ms) => new Promise((r) => setTimeout(r, ms))

0 commit comments

Comments
 (0)