Skip to content
19 changes: 17 additions & 2 deletions src/Abstractions/TaskOrchestrationContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -222,13 +222,28 @@
/// </param>
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
/// <inheritdoc cref="WaitForExternalEvent(string, CancellationToken)"/>
public async Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout)
public Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout)
{
return this.WaitForExternalEvent<T>(eventName, timeout, CancellationToken.None);
}

/// <param name="eventName">
/// The name of the event to wait for. Event names are case-insensitive. External event names can be reused any
/// number of times; they are not required to be unique.
/// </param>
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
/// <param name="cancellationToken">A <c>CancellationToken</c> to use to abort waiting for the event.</param>
/// <inheritdoc cref="WaitForExternalEvent(string, CancellationToken)"/>
public async Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout, CancellationToken cancellationToken)
{
// Timeouts are implemented using durable timers.
using CancellationTokenSource timerCts = new();
Task timeoutTask = this.CreateTimer(timeout, timerCts.Token);

using CancellationTokenSource eventCts = new();
// Create a linked cancellation token source from the external cancellation token.
// This allows us to cancel the event wait either when the external token is cancelled
// or when the timeout fires (by calling eventCts.Cancel()).
using CancellationTokenSource eventCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
Task<T> externalEventTask = this.WaitForExternalEvent<T>(eventName, eventCts.Token);

// Wait for either task to complete and then cancel the one that didn't.
Expand Down Expand Up @@ -441,7 +456,7 @@
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}

public IDisposable BeginScope<TState>(TState state) => this.logger.BeginScope(state);

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Possible null reference return.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

The type 'TState' cannot be used as type parameter 'TState' in the generic type or method 'ILogger.BeginScope<TState>(TState)'. Nullability of type argument 'TState' doesn't match 'notnull' constraint.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / Analyze (csharp)

Nullability in constraints for type parameter 'TState' of method 'TaskOrchestrationContext.ReplaySafeLogger.BeginScope<TState>(TState)' doesn't match the constraints for type parameter 'TState' of interface method 'ILogger.BeginScope<TState>(TState)'. Consider using an explicit interface implementation instead.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

Possible null reference return.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

The type 'TState' cannot be used as type parameter 'TState' in the generic type or method 'ILogger.BeginScope<TState>(TState)'. Nullability of type argument 'TState' doesn't match 'notnull' constraint.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / smoke-tests

Nullability in constraints for type parameter 'TState' of method 'TaskOrchestrationContext.ReplaySafeLogger.BeginScope<TState>(TState)' doesn't match the constraints for type parameter 'TState' of interface method 'ILogger.BeginScope<TState>(TState)'. Consider using an explicit interface implementation instead.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference return.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / build

The type 'TState' cannot be used as type parameter 'TState' in the generic type or method 'ILogger.BeginScope<TState>(TState)'. Nullability of type argument 'TState' doesn't match 'notnull' constraint.

Check warning on line 459 in src/Abstractions/TaskOrchestrationContext.cs

View workflow job for this annotation

GitHub Actions / build

Nullability in constraints for type parameter 'TState' of method 'TaskOrchestrationContext.ReplaySafeLogger.BeginScope<TState>(TState)' doesn't match the constraints for type parameter 'TState' of interface method 'ILogger.BeginScope<TState>(TState)'. Consider using an explicit interface implementation instead.

public bool IsEnabled(LogLevel logLevel) => this.logger.IsEnabled(logLevel);

Expand Down
168 changes: 168 additions & 0 deletions test/Grpc.IntegrationTests/OrchestrationPatterns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1338,4 +1338,172 @@ public async Task CatchingActivityExceptionsByType()
Assert.Equal("Success", results[2]);
Assert.Equal("Caught base Exception", results[3]);
}

[Fact]
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_EventWins()
{
const string EventName = "TestEvent";
const string EventPayload = "test-payload";
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_EventWins);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
using CancellationTokenSource cts = new();
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromDays(7), cts.Token);
string result = await eventTask;
return result;
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);

// Send event - should complete the wait
await server.Client.RaiseEventAsync(instanceId, EventName, EventPayload);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string? result = metadata.ReadOutputAs<string>();
Assert.Equal(EventPayload, result);
}

[Fact]
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_CancellationWins()
{
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_CancellationWins);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
using CancellationTokenSource cts = new();

// Create two event waiters with cancellation tokens
Task<string> event1Task = ctx.WaitForExternalEvent<string>("Event1", TimeSpan.FromDays(7), cts.Token);

using CancellationTokenSource cts2 = new();
Task<string> event2Task = ctx.WaitForExternalEvent<string>("Event2", TimeSpan.FromDays(7), cts2.Token);

// Wait for any to complete
Task winner = await Task.WhenAny(event1Task, event2Task);

// Cancel the other one
if (winner == event1Task)
{
cts2.Cancel();
return $"Event1: {await event1Task}";
}
else
{
cts.Cancel();
return $"Event2: {await event2Task}";
}
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);

// Send Event1 - should complete and cancel Event2
await server.Client.RaiseEventAsync(instanceId, "Event1", "first-event");

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string? result = metadata.ReadOutputAs<string>();
Assert.Equal("Event1: first-event", result);
}

[Fact]
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_TimeoutWins()
{
const string EventName = "TestEvent";
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_TimeoutWins);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
using CancellationTokenSource cts = new();
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromMilliseconds(500), cts.Token);

try
{
string result = await eventTask;
return $"Event: {result}";
}
catch (OperationCanceledException)
{
return "Timeout occurred";
}
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string? result = metadata.ReadOutputAs<string>();
Assert.Equal("Timeout occurred", result);
}

[Fact]
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_ExternalCancellationWins()
{
const string EventName = "TestEvent";
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_ExternalCancellationWins);

await using HostTestLifetime server = await this.StartWorkerAsync(b =>
{
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
{
using CancellationTokenSource cts = new();

// Create a timer that will fire and trigger cancellation
Task cancelTrigger = ctx.CreateTimer(TimeSpan.FromMilliseconds(100), CancellationToken.None);

// Wait for external event with a long timeout
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromDays(7), cts.Token);

// Wait for either the cancel trigger or the event
Task winner = await Task.WhenAny(cancelTrigger, eventTask);

if (winner == cancelTrigger)
{
// Cancel the external cancellation token
cts.Cancel();
}

try
{
string result = await eventTask;
return $"Event: {result}";
}
catch (OperationCanceledException)
{
return "External cancellation occurred";
}
}));
});

string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);

OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
Assert.NotNull(metadata);
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);

string? result = metadata.ReadOutputAs<string>();
Assert.Equal("External cancellation occurred", result);
}
}
Loading