Skip to content

Commit 7ea9728

Browse files
jonnepmyraFalk Jonas - HK
andauthored
Last offset fix (#407)
* Currently, the last consumed offset is stored before the message handler is called. This change does it after the messagehandler is called in order to avoid potential messageloss on the event of connection-issues/reconnecting-consumer * Aligned the fix for consumers with superstream consumers --------- Co-authored-by: Falk Jonas - HK <[email protected]>
1 parent 830e540 commit 7ea9728

File tree

1 file changed

+6
-4
lines changed

1 file changed

+6
-4
lines changed

RabbitMQ.Stream.Client/Reliable/ConsumerFactory.cs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,14 @@ await OnEntityClosed(_consumerConfig.StreamSystem, _consumerConfig.Stream,
7575
},
7676
MessageHandler = async (consumer, ctx, message) =>
7777
{
78-
_consumedFirstTime = true;
79-
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
8078
if (_consumerConfig.MessageHandler != null)
8179
{
8280
await _consumerConfig.MessageHandler(_consumerConfig.Stream, consumer, ctx, message)
8381
.ConfigureAwait(false);
8482
}
83+
84+
_consumedFirstTime = true;
85+
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
8586
},
8687
}, BaseLogger).ConfigureAwait(false);
8788
}
@@ -146,13 +147,14 @@ await OnEntityClosed(_consumerConfig.StreamSystem, update.Stream, r,
146147
},
147148
MessageHandler = async (partitionStream, consumer, ctx, message) =>
148149
{
149-
_consumedFirstTime = true;
150-
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
151150
if (_consumerConfig.MessageHandler != null)
152151
{
153152
await _consumerConfig.MessageHandler(partitionStream, consumer, ctx,
154153
message).ConfigureAwait(false);
155154
}
155+
156+
_consumedFirstTime = true;
157+
_lastOffsetConsumed[_consumerConfig.Stream] = ctx.Offset;
156158
},
157159
}, BaseLogger).ConfigureAwait(false);
158160
}

0 commit comments

Comments
 (0)