Skip to content

Commit adfa755

Browse files
feat(PubSub): Nack pending messages during shutdown.
1 parent ff75e09 commit adfa755

File tree

1 file changed

+28
-6
lines changed

1 file changed

+28
-6
lines changed

apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -496,12 +496,28 @@ Task ProcessSuccessfulMessages(List<ReceivedMessage> messages, HashSet<string> m
496496
}
497497
}
498498

499-
private async Task ProcessPullMessagesAsync(List<ReceivedMessage> msgs, HashSet<string> msgIds)
499+
private async Task ProcessPullMessagesAsync(List<ReceivedMessage> msgs, HashSet<string> leaseTracking)
500500
{
501501
// Running async. Common data needs locking
502502
for (int msgIndex = 0; msgIndex < msgs.Count; msgIndex++)
503503
{
504-
_softStopCts.Token.ThrowIfCancellationRequested();
504+
if (_softStopCts.IsCancellationRequested)
505+
{
506+
var remainingAckIds = msgs.Skip(msgIndex).Select(x => x.AckId).ToList();
507+
lock (_lock)
508+
{
509+
_nackQueue.Enqueue(remainingAckIds);
510+
}
511+
lock (leaseTracking)
512+
{
513+
leaseTracking.ExceptWith(remainingAckIds);
514+
}
515+
// Ids have been added to ack/nack-queue, so trigger a push.
516+
_eventPush.Set();
517+
_softStopCts.Token.ThrowIfCancellationRequested();
518+
}
519+
520+
505521
var msg = msgs[msgIndex];
506522
msgs[msgIndex] = null;
507523
// Prepare to call user message handler, _flow.Process(...) enforces the user-handler concurrency constraints.
@@ -510,19 +526,25 @@ await _taskHelper.ConfigureAwait(_flow.Process(msg.CalculateSize(), _messageOrde
510526
// Running async. Common data needs locking
511527
lock (_lock)
512528
{
513-
_softStopCts.Token.ThrowIfCancellationRequested();
514529
_userHandlerInFlight += 1;
515530
}
516531
if (msg.DeliveryAttempt > 0)
517532
{
518533
msg.Message.Attributes[DeliveryAttemptAttrKey] = msg.DeliveryAttempt.ToString(CultureInfo.InvariantCulture);
519534
}
520535
// Call user message handler
521-
var reply = await _taskHelper.ConfigureAwaitHideErrors(() => _handler.HandleMessage(msg.Message, _hardStopCts.Token), Reply.Nack);
536+
var reply = await _taskHelper.ConfigureAwaitHideErrors(() =>
537+
{
538+
// If the subscriber shut down while waiting for flow control, skip the handler.
539+
// Throwing here triggers a Nack, releasing the message for redelivery.
540+
_softStopCts.Token.ThrowIfCancellationRequested();
541+
return _handler.HandleMessage(msg.Message, _hardStopCts.Token);
542+
}, Reply.Nack);
543+
522544
// Lock msgsIds, this is accessed concurrently here and in HandleExtendLease().
523-
lock (msgIds)
545+
lock (leaseTracking)
524546
{
525-
msgIds.Remove(msg.AckId);
547+
leaseTracking.Remove(msg.AckId);
526548
}
527549
// Lock ack/nack-queues, this is accessed concurrently here and in "master" thread.
528550
lock (_lock)

0 commit comments

Comments
 (0)