Skip to content

Commit c1e25a4

Browse files
cgillumSophia Tevosyan
andauthored
Initial attempt to fix carryover events issue on continue-as-new (#496)
* Initial attempt to fix #387 * fixing the test dispatcher * trying to remove the large diff for the ProtoUtils file --------- Co-authored-by: Sophia Tevosyan <[email protected]>
1 parent 829aba4 commit c1e25a4

File tree

3 files changed

+94
-37
lines changed

3 files changed

+94
-37
lines changed

src/InProcessTestHost/Sidecar/Dispatcher/TaskOrchestrationDispatcher.cs

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,12 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor
111111
{
112112
throw new ArgumentException($"Could not find an orchestration instance ID in the work item's runtime state.", nameof(workItem));
113113
}
114-
115-
// We loop for as long as the orchestrator does a ContinueAsNew
114+
115+
var activityMessages = new List<TaskMessage>();
116+
var timerMessages = new List<TaskMessage>();
117+
var orchestratorMessages = new List<TaskMessage>();
118+
119+
// We loop for as long as the orchestrator does a ContinueAsNew
116120
while (true)
117121
{
118122
if (this.log.IsEnabled(LogLevel.Debug))
@@ -138,9 +142,9 @@ protected override async Task ExecuteWorkItemAsync(TaskOrchestrationWorkItem wor
138142
this.ApplyOrchestratorActions(
139143
result,
140144
ref workItem.OrchestrationRuntimeState,
141-
out List<TaskMessage> activityMessages,
142-
out List<TaskMessage> orchestratorMessages,
143-
out List<TaskMessage> timerMessages,
145+
activityMessages,
146+
orchestratorMessages,
147+
timerMessages,
144148
out OrchestrationState? updatedStatus,
145149
out bool continueAsNew);
146150
if (continueAsNew)
@@ -247,9 +251,9 @@ static string GetShortHistoryEventDescription(HistoryEvent e)
247251
void ApplyOrchestratorActions(
248252
OrchestratorExecutionResult result,
249253
ref OrchestrationRuntimeState runtimeState,
250-
out List<TaskMessage> activityMessages, // CA1859: Use concrete types for better performance
251-
out List<TaskMessage> orchestratorMessages, // CA1859: Use concrete types for better performance
252-
out List<TaskMessage> timerMessages, // CA1859: Use concrete types for better performance
254+
List<TaskMessage> activityMessages, // CA1859: Use concrete types for better performance
255+
List<TaskMessage> orchestratorMessages, // CA1859: Use concrete types for better performance
256+
List<TaskMessage> timerMessages, // CA1859: Use concrete types for better performance
253257
out OrchestrationState? updatedStatus,
254258
out bool continueAsNew)
255259
{
@@ -258,9 +262,6 @@ void ApplyOrchestratorActions(
258262
throw new ArgumentException($"The provided {nameof(OrchestrationRuntimeState)} doesn't contain an instance ID!", nameof(runtimeState));
259263
}
260264

261-
List<TaskMessage>? newActivityMessages = null; // CA1859: Use concrete types for better performance
262-
List<TaskMessage>? newTimerMessages = null; // CA1859: Use concrete types for better performance
263-
List<TaskMessage>? newOrchestratorMessages = null; // CA1859: Use concrete types for better performance
264265
FailureDetails? failureDetails = null;
265266
continueAsNew = false;
266267

@@ -288,8 +289,7 @@ void ApplyOrchestratorActions(
288289
scheduledEvent.ParentTraceContext ??= new(grpcAction.ParentTraceContext.TraceParent, grpcAction.ParentTraceContext.TraceState);
289290
}
290291

291-
newActivityMessages ??= new List<TaskMessage>();
292-
newActivityMessages.Add(new TaskMessage
292+
activityMessages.Add(new TaskMessage
293293
{
294294
Event = scheduledEvent,
295295
OrchestrationInstance = runtimeState.OrchestrationInstance,
@@ -301,8 +301,7 @@ void ApplyOrchestratorActions(
301301
{
302302
TimerCreatedEvent timerEvent = new(timerAction.Id, timerAction.FireAt);
303303

304-
newTimerMessages ??= new List<TaskMessage>();
305-
newTimerMessages.Add(new TaskMessage
304+
timerMessages.Add(new TaskMessage
306305
{
307306
Event = new TimerFiredEvent(-1, timerAction.FireAt)
308307
{
@@ -346,8 +345,7 @@ void ApplyOrchestratorActions(
346345
Tags = subOrchestrationAction.Tags,
347346
};
348347

349-
newOrchestratorMessages ??= new List<TaskMessage>();
350-
newOrchestratorMessages.Add(new TaskMessage
348+
orchestratorMessages.Add(new TaskMessage
351349
{
352350
Event = startedEvent,
353351
OrchestrationInstance = startedEvent.OrchestrationInstance,
@@ -367,13 +365,17 @@ void ApplyOrchestratorActions(
367365
Input = sendEventAction.EventData,
368366
};
369367

370-
runtimeState.AddEvent(sendEvent);
368+
runtimeState.AddEvent(sendEvent);
369+
370+
EventRaisedEvent eventRaisedEvent = new(-1, sendEventAction.EventData)
371+
{
372+
Name = sendEventAction.EventName
373+
};
371374

372-
newOrchestratorMessages ??= new List<TaskMessage>();
373-
newOrchestratorMessages.Add(new TaskMessage
375+
orchestratorMessages.Add(new TaskMessage
374376
{
375-
Event = sendEvent,
376-
OrchestrationInstance = runtimeState.OrchestrationInstance,
377+
Event = eventRaisedEvent,
378+
OrchestrationInstance = sendEventAction.Instance,
377379
});
378380
}
379381
else if (action is OrchestrationCompleteOrchestratorAction completeAction)
@@ -408,9 +410,6 @@ void ApplyOrchestratorActions(
408410
}
409411

410412
runtimeState = newRuntimeState;
411-
activityMessages = new List<TaskMessage>();
412-
orchestratorMessages = new List<TaskMessage>();
413-
timerMessages = new List<TaskMessage>();
414413
continueAsNew = true;
415414
updatedStatus = null;
416415
return;
@@ -457,8 +456,7 @@ void ApplyOrchestratorActions(
457456
completeAction.FailureDetails);
458457
}
459458

460-
newOrchestratorMessages ??= new List<TaskMessage>();
461-
newOrchestratorMessages.Add(new TaskMessage
459+
orchestratorMessages.Add(new TaskMessage
462460
{
463461
Event = subOrchestratorCompletedEvent,
464462
OrchestrationInstance = runtimeState.ParentInstance.OrchestrationInstance,
@@ -475,10 +473,6 @@ void ApplyOrchestratorActions(
475473

476474
runtimeState.AddEvent(new OrchestratorCompletedEvent(-1));
477475

478-
activityMessages = newActivityMessages ?? new List<TaskMessage>();
479-
timerMessages = newTimerMessages ?? new List<TaskMessage>();
480-
orchestratorMessages = newOrchestratorMessages ?? new List<TaskMessage>();
481-
482476
updatedStatus = new OrchestrationState
483477
{
484478
OrchestrationInstance = runtimeState.OrchestrationInstance,

src/Shared/Grpc/ProtoUtils.cs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -461,11 +461,8 @@ internal static P.OrchestratorResponse ConstructOrchestratorResponse(
461461

462462
var completeAction = (OrchestrationCompleteOrchestratorAction)action;
463463
protoAction.CompleteOrchestration = new P.CompleteOrchestrationAction
464-
{
465-
CarryoverEvents =
466-
{
467-
// TODO
468-
},
464+
{
465+
CarryoverEvents = { completeAction.CarryoverEvents.Select(ToProtobuf) },
469466
Details = completeAction.Details,
470467
NewVersion = completeAction.NewVersion,
471468
OrchestrationStatus = completeAction.OrchestrationStatus.ToProtobuf(),
@@ -1170,6 +1167,28 @@ static P.OrchestrationInstance ToProtobuf(this OrchestrationInstance instance)
11701167
InstanceId = instance.InstanceId,
11711168
ExecutionId = instance.ExecutionId,
11721169
};
1170+
}
1171+
1172+
static P.HistoryEvent ToProtobuf(HistoryEvent e)
1173+
{
1174+
var payload = new P.HistoryEvent()
1175+
{
1176+
EventId = e.EventId,
1177+
Timestamp = Timestamp.FromDateTime(e.Timestamp),
1178+
};
1179+
1180+
if (e.EventType == EventType.EventRaised)
1181+
{
1182+
var eventRaised = (EventRaisedEvent)e;
1183+
payload.EventRaised = new P.EventRaisedEvent
1184+
{
1185+
Name = eventRaised.Name,
1186+
Input = eventRaised.Input,
1187+
};
1188+
return payload;
1189+
}
1190+
1191+
throw new ArgumentException("Unsupported event type");
11731192
}
11741193

11751194
/// <summary>

test/Grpc.IntegrationTests/OrchestrationPatterns.cs

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1145,7 +1145,51 @@ public ValueTask<bool> IsOrchestrationValidAsync(OrchestrationFilterParameters i
11451145
}
11461146
}
11471147

1148+
[Theory]
1149+
[InlineData(true)]
1150+
[InlineData(false)]
1151+
public async Task ContinueAsNewEventsArePreserved(bool injectTimers)
1152+
{
1153+
const int EventCount = 10;
1154+
async Task<int> OrchestratorFunc(TaskOrchestrationContext ctx, int counter)
1155+
{
1156+
await ctx.WaitForExternalEvent<string>("Event");
1157+
counter++;
1158+
1159+
if (injectTimers)
1160+
{
1161+
await ctx.CreateTimer(TimeSpan.FromMilliseconds(1), CancellationToken.None);
1162+
}
1163+
1164+
if (counter < EventCount)
1165+
{
1166+
ctx.ContinueAsNew(counter, preserveUnprocessedEvents: true);
1167+
}
1168+
1169+
return counter;
1170+
}
1171+
1172+
await using HostTestLifetime server = await this.StartWorkerAsync(
1173+
b => b.AddTasks(
1174+
tasks => tasks.AddOrchestratorFunc<int, int>(nameof(OrchestratorFunc), OrchestratorFunc)));
1175+
1176+
string instanceId = await server.Client.ScheduleNewOrchestrationInstanceAsync(
1177+
nameof(OrchestratorFunc),
1178+
input: 0);
1179+
1180+
for (int i = 0; i < EventCount; i++)
1181+
{
1182+
await server.Client.RaiseEventAsync(instanceId, eventName: "Event");
1183+
await Task.Delay(TimeSpan.FromMilliseconds(1), this.TimeoutToken);
1184+
}
1185+
1186+
OrchestrationMetadata metadata = await server.Client.WaitForInstanceCompletionAsync(
1187+
instanceId, getInputsAndOutputs: true, this.TimeoutToken);
1188+
Assert.NotNull(metadata);
1189+
Assert.Equal(OrchestrationRuntimeStatus.Completed, metadata.RuntimeStatus);
1190+
Assert.Equal(EventCount, metadata.ReadOutputAs<int>());
1191+
}
1192+
11481193
// TODO: Test for multiple external events with the same name
1149-
// TODO: Test for ContinueAsNew with external events that carry over
11501194
// TODO: Test for catching activity exceptions of specific types
11511195
}

0 commit comments

Comments
 (0)