Skip to content

Commit e0ee326

Browse files
CopilotYunchuWang
andauthored
Add WaitForExternalEvent overload with timeout and cancellation token (#555)
* Add WaitForExternalEvent overload with TimeSpan timeout and CancellationToken Co-authored-by: YunchuWang <[email protected]> * Refactor to reduce code duplication in WaitForExternalEvent overloads Co-authored-by: YunchuWang <[email protected]> * Clarify comment about linked cancellation token in WaitForExternalEvent Co-authored-by: YunchuWang <[email protected]> * Add test for external cancellation token firing in WaitForExternalEvent Co-authored-by: YunchuWang <[email protected]> --------- Co-authored-by: copilot-swe-agent[bot] <[email protected]> Co-authored-by: YunchuWang <[email protected]> Co-authored-by: wangbill <[email protected]>
1 parent e3b230e commit e0ee326

File tree

2 files changed

+185
-2
lines changed

2 files changed

+185
-2
lines changed

src/Abstractions/TaskOrchestrationContext.cs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,13 +222,28 @@ public virtual Task CreateTimer(TimeSpan delay, CancellationToken cancellationTo
222222
/// </param>
223223
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
224224
/// <inheritdoc cref="WaitForExternalEvent(string, CancellationToken)"/>
225-
public async Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout)
225+
public Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout)
226+
{
227+
return this.WaitForExternalEvent<T>(eventName, timeout, CancellationToken.None);
228+
}
229+
230+
/// <param name="eventName">
231+
/// The name of the event to wait for. Event names are case-insensitive. External event names can be reused any
232+
/// number of times; they are not required to be unique.
233+
/// </param>
234+
/// <param name="timeout">The amount of time to wait before cancelling the external event task.</param>
235+
/// <param name="cancellationToken">A <c>CancellationToken</c> to use to abort waiting for the event.</param>
236+
/// <inheritdoc cref="WaitForExternalEvent(string, CancellationToken)"/>
237+
public async Task<T> WaitForExternalEvent<T>(string eventName, TimeSpan timeout, CancellationToken cancellationToken)
226238
{
227239
// Timeouts are implemented using durable timers.
228240
using CancellationTokenSource timerCts = new();
229241
Task timeoutTask = this.CreateTimer(timeout, timerCts.Token);
230242

231-
using CancellationTokenSource eventCts = new();
243+
// Create a linked cancellation token source from the external cancellation token.
244+
// This allows us to cancel the event wait either when the external token is cancelled
245+
// or when the timeout fires (by calling eventCts.Cancel()).
246+
using CancellationTokenSource eventCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
232247
Task<T> externalEventTask = this.WaitForExternalEvent<T>(eventName, eventCts.Token);
233248

234249
// Wait for either task to complete and then cancel the one that didn't.

test/Grpc.IntegrationTests/OrchestrationPatterns.cs

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1338,4 +1338,172 @@ public async Task CatchingActivityExceptionsByType()
13381338
Assert.Equal("Success", results[2]);
13391339
Assert.Equal("Caught base Exception", results[3]);
13401340
}
1341+
1342+
[Fact]
1343+
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_EventWins()
1344+
{
1345+
const string EventName = "TestEvent";
1346+
const string EventPayload = "test-payload";
1347+
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_EventWins);
1348+
1349+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
1350+
{
1351+
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
1352+
{
1353+
using CancellationTokenSource cts = new();
1354+
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromDays(7), cts.Token);
1355+
string result = await eventTask;
1356+
return result;
1357+
}));
1358+
});
1359+
1360+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
1361+
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);
1362+
1363+
// Send event - should complete the wait
1364+
await server.Client.RaiseEventAsync(instanceId, EventName, EventPayload);
1365+
1366+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
1367+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
1368+
Assert.NotNull(metadata);
1369+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1370+
1371+
string? result = metadata.ReadOutputAs<string>();
1372+
Assert.Equal(EventPayload, result);
1373+
}
1374+
1375+
[Fact]
1376+
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_CancellationWins()
1377+
{
1378+
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_CancellationWins);
1379+
1380+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
1381+
{
1382+
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
1383+
{
1384+
using CancellationTokenSource cts = new();
1385+
1386+
// Create two event waiters with cancellation tokens
1387+
Task<string> event1Task = ctx.WaitForExternalEvent<string>("Event1", TimeSpan.FromDays(7), cts.Token);
1388+
1389+
using CancellationTokenSource cts2 = new();
1390+
Task<string> event2Task = ctx.WaitForExternalEvent<string>("Event2", TimeSpan.FromDays(7), cts2.Token);
1391+
1392+
// Wait for any to complete
1393+
Task winner = await Task.WhenAny(event1Task, event2Task);
1394+
1395+
// Cancel the other one
1396+
if (winner == event1Task)
1397+
{
1398+
cts2.Cancel();
1399+
return $"Event1: {await event1Task}";
1400+
}
1401+
else
1402+
{
1403+
cts.Cancel();
1404+
return $"Event2: {await event2Task}";
1405+
}
1406+
}));
1407+
});
1408+
1409+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
1410+
await server.Client.WaitForInstanceStartAsync(instanceId, this.TimeoutToken);
1411+
1412+
// Send Event1 - should complete and cancel Event2
1413+
await server.Client.RaiseEventAsync(instanceId, "Event1", "first-event");
1414+
1415+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
1416+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
1417+
Assert.NotNull(metadata);
1418+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1419+
1420+
string? result = metadata.ReadOutputAs<string>();
1421+
Assert.Equal("Event1: first-event", result);
1422+
}
1423+
1424+
[Fact]
1425+
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_TimeoutWins()
1426+
{
1427+
const string EventName = "TestEvent";
1428+
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_TimeoutWins);
1429+
1430+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
1431+
{
1432+
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
1433+
{
1434+
using CancellationTokenSource cts = new();
1435+
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromMilliseconds(500), cts.Token);
1436+
1437+
try
1438+
{
1439+
string result = await eventTask;
1440+
return $"Event: {result}";
1441+
}
1442+
catch (OperationCanceledException)
1443+
{
1444+
return "Timeout occurred";
1445+
}
1446+
}));
1447+
});
1448+
1449+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
1450+
1451+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
1452+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
1453+
Assert.NotNull(metadata);
1454+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1455+
1456+
string? result = metadata.ReadOutputAs<string>();
1457+
Assert.Equal("Timeout occurred", result);
1458+
}
1459+
1460+
[Fact]
1461+
public async Task WaitForExternalEvent_WithTimeoutAndCancellationToken_ExternalCancellationWins()
1462+
{
1463+
const string EventName = "TestEvent";
1464+
TaskName orchestratorName = nameof(WaitForExternalEvent_WithTimeoutAndCancellationToken_ExternalCancellationWins);
1465+
1466+
await using HostTestLifetime server = await this.StartWorkerAsync(b =>
1467+
{
1468+
b.AddTasks(tasks => tasks.AddOrchestratorFunc(orchestratorName, async ctx =>
1469+
{
1470+
using CancellationTokenSource cts = new();
1471+
1472+
// Create a timer that will fire and trigger cancellation
1473+
Task cancelTrigger = ctx.CreateTimer(TimeSpan.FromMilliseconds(100), CancellationToken.None);
1474+
1475+
// Wait for external event with a long timeout
1476+
Task<string> eventTask = ctx.WaitForExternalEvent<string>(EventName, TimeSpan.FromDays(7), cts.Token);
1477+
1478+
// Wait for either the cancel trigger or the event
1479+
Task winner = await Task.WhenAny(cancelTrigger, eventTask);
1480+
1481+
if (winner == cancelTrigger)
1482+
{
1483+
// Cancel the external cancellation token
1484+
cts.Cancel();
1485+
}
1486+
1487+
try
1488+
{
1489+
string result = await eventTask;
1490+
return $"Event: {result}";
1491+
}
1492+
catch (OperationCanceledException)
1493+
{
1494+
return "External cancellation occurred";
1495+
}
1496+
}));
1497+
});
1498+
1499+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(orchestratorName);
1500+
1501+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
1502+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
1503+
Assert.NotNull(metadata);
1504+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1505+
1506+
string? result = metadata.ReadOutputAs<string>();
1507+
Assert.Equal("External cancellation occurred", result);
1508+
}
13411509
}

0 commit comments

Comments
 (0)