@@ -26,29 +26,27 @@ async function main() {
2626 await publisher . send ( Buffer . from ( messageBody ) )
2727 }
2828
29- let initialOffset = rabbit . Offset . offset ( 0n )
30- let firstOffset = initialOffset . value
31- let lastOffset = initialOffset . value
29+ const startFrom = rabbit . Offset . offset ( 0n )
30+ let firstOffset = startFrom . value
31+ let lastOffset = startFrom . value
3232 let messageCount = 0
3333 const consumerRef = "offset-tracking-consumer"
34- const consumer = await client . declareConsumer (
35- { stream : streamName , offset : initialOffset , consumerRef } ,
36- ( message ) => {
37- messageCount ++
38- if ( message . offset === initialOffset . value ) {
39- console . log ( "First message received" )
40- }
41- if ( messageCount % 10 === 0 ) {
42- console . log ( "Storing offset" )
43- client . storeOffset ( { stream : streamName , reference : consumerRef , offsetValue : message . offset } )
44- }
45- if ( message . content . toString ( ) === "marker" ) {
46- console . log ( "Marker found" )
47- client . storeOffset ( { stream : streamName , reference : consumerRef , offsetValue : message . offset } )
48- lastOffset = message . offset
49- }
34+ const consumer = await client . declareConsumer ( { stream : streamName , offset : startFrom , consumerRef } , ( message ) => {
35+ messageCount ++
36+ if ( message . offset === startFrom . value ) {
37+ console . log ( "First message received" )
38+ firstOffset = message . offset
5039 }
51- )
40+ if ( messageCount % 10 === 0 ) {
41+ console . log ( "Storing offset" )
42+ client . storeOffset ( { stream : streamName , reference : consumerRef , offsetValue : message . offset } )
43+ }
44+ if ( message . content . toString ( ) === "marker" ) {
45+ console . log ( "Marker found" )
46+ client . storeOffset ( { stream : streamName , reference : consumerRef , offsetValue : message . offset } )
47+ lastOffset = message . offset
48+ }
49+ } )
5250
5351 console . log ( `Start consuming...` )
5452 await sleep ( 2000 )
0 commit comments