Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions src/InProcessTestHost/Sidecar/InMemoryOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -739,6 +739,12 @@ public void ReleaseLock(string instanceId)

public Task<OrchestrationState> WaitForInstanceAsync(string instanceId, CancellationToken cancellationToken)
{
// First, add the waiter before checking completion to avoid a race condition.
// This ensures we don't miss a completion notification that happens between
// checking the status and adding the waiter.
var tcs = this.waiters.GetOrAdd(instanceId, _ => new TaskCompletionSource<OrchestrationState>());

// Now check if already completed - if so, complete the waiter immediately
if (this.store.TryGetValue(instanceId, out SerializedInstanceState? state))
{
lock (state)
Expand All @@ -750,16 +756,18 @@ public Task<OrchestrationState> WaitForInstanceAsync(string instanceId, Cancella
statusRecord.OrchestrationStatus == OrchestrationStatus.Failed ||
statusRecord.OrchestrationStatus == OrchestrationStatus.Terminated)
{
// orchestration has already completed
return Task.FromResult(statusRecord);
// Orchestration has already completed - complete the waiter and clean it up
if (tcs.TrySetResult(statusRecord))
{
this.waiters.TryRemove(instanceId, out _);
}
}
}
}
}

// Caller will be notified when the instance completes.
// The ContinueWith is just to enable cancellation: https://stackoverflow.com/a/25652873/2069
var tcs = this.waiters.GetOrAdd(instanceId, _ => new TaskCompletionSource<OrchestrationState>());
return tcs.Task.ContinueWith(t => t.GetAwaiter().GetResult(), cancellationToken);
}

Expand Down
Loading