@@ -17,43 +17,48 @@ async function main() {
1717 } )
1818 await client . createStream ( { stream : streamName , arguments : { } } )
1919 const publisher = await client . declarePublisher ( { stream : streamName } )
20- const messageCount = 100
20+ const toSend = 100
2121
22- console . log ( `Publishing ${ messageCount } messages` )
23- for ( let i = 0 ; i < messageCount ; i ++ ) {
24- const body = i === messageCount - 1 ? "marker" : `hello ${ i } `
22+ console . log ( `Publishing ${ toSend } messages` )
23+ for ( let i = 0 ; i < toSend ; i ++ ) {
24+ const body = i === toSend - 1 ? "marker" : `hello ${ i } `
2525 await publisher . send ( Buffer . from ( body ) )
2626 }
2727
28- const startFrom = rabbit . Offset . offset ( 0n )
29- let firstOffset = startFrom . value
30- let lastOffset = startFrom . value
31- let messageReceivedCount = 0
3228 const consumerRef = "offset-tracking-tutorial"
33- const consumer = await client . declareConsumer ( { stream : streamName , offset : startFrom , consumerRef } , ( message ) => {
34- messageReceivedCount ++
35- if ( message . offset === startFrom . value ) {
36- console . log ( "First message received" )
37- firstOffset = message . offset
38- }
39- if ( messageReceivedCount % 10 === 0 ) {
40- console . log ( "Storing offset" )
41- client . storeOffset ( { stream : streamName , reference : consumerRef , offsetValue : message . offset } )
42- }
43- if ( message . content . toString ( ) === "marker" ) {
44- console . log ( "Marker found" )
45- client . storeOffset ( { stream : streamName , reference : consumerRef , offsetValue : message . offset } )
46- lastOffset = message . offset
29+ let firstOffset = undefined
30+ let offsetSpecification = rabbit . Offset . first ( )
31+ try {
32+ const offset = await client . queryOffset ( { reference : consumerRef , stream : streamName } )
33+ offsetSpecification = rabbit . Offset . offset ( offset + 1n )
34+ } catch ( e ) { }
35+
36+ let lastOffset = offsetSpecification . value
37+ let messageCount = 0
38+ const consumer = await client . declareConsumer (
39+ { stream : streamName , offset : offsetSpecification , consumerRef } ,
40+ async ( message ) => {
41+ messageCount ++
42+ if ( ! firstOffset && messageCount === 1 ) {
43+ firstOffset = message . offset
44+ console . log ( "First message received" )
45+ }
46+ if ( messageCount % 10 === 0 ) {
47+ await consumer . storeOffset ( message . offset )
48+ }
49+ if ( message . content . toString ( ) === "marker" ) {
50+ console . log ( "Marker found" )
51+ lastOffset = message . offset
52+ await consumer . storeOffset ( message . offset )
53+ console . log ( `Done consuming, first offset was ${ firstOffset } , last offset was ${ lastOffset } ` )
54+ await consumer . close ( true )
55+ process . exit ( 0 )
56+ }
4757 }
48- } )
58+ )
4959
5060 console . log ( `Start consuming...` )
5161 await sleep ( 2000 )
52- console . log ( `Done consuming, first offset was ${ firstOffset } , last offset was ${ lastOffset } ` )
53- const lastStoredOffset = await consumer . queryOffset ( )
54- console . log ( `Last stored offset was ${ lastStoredOffset } ` )
55-
56- await client . close ( )
5762}
5863
5964main ( )
0 commit comments