diff --git a/src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs b/src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs index 88665fa86..3fad8c98d 100644 --- a/src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs +++ b/src/DurableTask.AzureStorage/Partitioning/TablePartitionManager.cs @@ -426,8 +426,13 @@ public async Task ReadAndWriteTableAsync(bool isShuttingDown, // Ensure worker is listening to the control queue iff either: // 1) worker just claimed the lease, - // 2) worker was already the owner in the partitions table and is not actively draining the queue. Note that during draining, we renew the lease but do not want to listen to new messages. Otherwise, we'll never finish draining our in-memory messages. - bool isRenewingToDrainQueue = renewedLease & response.IsDrainingPartition; + // 2) worker was already the owner in the partitions table and is not actively draining the queue. + // Note that during draining, we renew the lease but do not want to listen to new messages. + // Otherwise, we'll never finish draining our in-memory messages. + // When draining completes, and the worker may decide to release the lease. In that moment, + // IsDrainingPartition can still be true but renewedLease can be false — without checking + // !releasedLease, the worker could incorrectly resume listening just before releasing the lease. + bool isRenewingToDrainQueue = renewedLease && response.IsDrainingPartition && !releasedLease; if (claimedLease || !isRenewingToDrainQueue) { // Notify the orchestration session manager that we acquired a lease for one of the partitions. @@ -505,7 +510,8 @@ void RenewOrReleaseMyLease( partition, ref releasedLease, ref renewedLease, - ref drainedLease); + ref drainedLease, + CloseReason.LeaseLost); } } @@ -583,7 +589,8 @@ void TryDrainAndReleaseAllPartitions( partition, ref releasedLease, ref renewedLease, - ref drainedLease); + ref drainedLease, + CloseReason.Shutdown); if (releasedLease) { @@ -661,7 +668,7 @@ await this.partitionTable.ReplaceEntityAsync( partition, etag, forceShutdownToken); - + this.settings.Logger.LeaseStealingSucceeded( this.storageAccountName, this.settings.TaskHubName, @@ -815,7 +822,8 @@ void CheckDrainTask( TablePartitionLease partition, ref bool releasedLease, ref bool renewedLease, - ref bool drainedLease) + ref bool drainedLease, + CloseReason reason) { // Check if drain process has started. if (this.backgroundDrainTasks.TryGetValue(partition.RowKey!, out Task? drainTask)) @@ -844,7 +852,7 @@ void CheckDrainTask( } else// If drain task hasn't been started yet, start it and keep renewing the lease to prevent it from expiring. { - this.DrainPartition(partition, CloseReason.Shutdown); + this.DrainPartition(partition, reason); this.RenewLease(partition); renewedLease = true; drainedLease = true;