Skip to content

Commit fb6ad3a

Browse files
committed
Fix Start -> Init Workflow
With temp workaround for job ordering
1 parent 62e0e53 commit fb6ad3a

File tree

4 files changed

+46
-29
lines changed

4 files changed

+46
-29
lines changed

src/Temporalio/Worker/WorkflowCodecHelper.cs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,8 @@ await codec.DecodeFailureAsync(
102102
await DecodeAsync(codec, job.SignalWorkflow.Input).ConfigureAwait(false);
103103
await DecodeAsync(codec, job.SignalWorkflow.Headers).ConfigureAwait(false);
104104
break;
105-
case WorkflowActivationJob.VariantOneofCase.StartWorkflow:
106-
await DecodeAsync(codec, job.StartWorkflow).ConfigureAwait(false);
105+
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
106+
await DecodeAsync(codec, job.InitializeWorkflow).ConfigureAwait(false);
107107
break;
108108
}
109109
}
@@ -278,21 +278,21 @@ private static async Task DecodeAsync(IPayloadCodec codec, ChildWorkflowResult r
278278
}
279279
}
280280

281-
private static async Task DecodeAsync(IPayloadCodec codec, StartWorkflow start)
281+
private static async Task DecodeAsync(IPayloadCodec codec, InitializeWorkflow init)
282282
{
283-
await DecodeAsync(codec, start.Arguments).ConfigureAwait(false);
284-
if (start.ContinuedFailure != null)
283+
await DecodeAsync(codec, init.Arguments).ConfigureAwait(false);
284+
if (init.ContinuedFailure != null)
285285
{
286-
await codec.DecodeFailureAsync(start.ContinuedFailure).ConfigureAwait(false);
286+
await codec.DecodeFailureAsync(init.ContinuedFailure).ConfigureAwait(false);
287287
}
288-
if (start.Memo != null)
288+
if (init.Memo != null)
289289
{
290-
await DecodeAsync(codec, start.Memo.Fields).ConfigureAwait(false);
290+
await DecodeAsync(codec, init.Memo.Fields).ConfigureAwait(false);
291291
}
292-
await DecodeAsync(codec, start.Headers).ConfigureAwait(false);
293-
if (start.LastCompletionResult != null)
292+
await DecodeAsync(codec, init.Headers).ConfigureAwait(false);
293+
if (init.LastCompletionResult != null)
294294
{
295-
await DecodeAsync(codec, start.LastCompletionResult.Payloads_).ConfigureAwait(false);
295+
await DecodeAsync(codec, init.LastCompletionResult.Payloads_).ConfigureAwait(false);
296296
}
297297
}
298298

@@ -330,4 +330,4 @@ private static async Task DecodeAsync(IPayloadCodec codec, Payload payload)
330330
payload.MergeFrom(decoded.Single());
331331
}
332332
}
333-
}
333+
}

src/Temporalio/Worker/WorkflowInstance.cs

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,21 +121,21 @@ public WorkflowInstance(WorkflowInstanceDetails details)
121121
mutableQueries = new(() => new(Definition.Queries, OnQueryDefinitionAdded), false);
122122
mutableSignals = new(() => new(Definition.Signals, OnSignalDefinitionAdded), false);
123123
mutableUpdates = new(() => new(Definition.Updates, OnUpdateDefinitionAdded), false);
124-
var initialMemo = details.Start.Memo;
124+
var initialMemo = details.Init.Memo;
125125
memo = new(
126126
() => initialMemo == null ? new Dictionary<string, IRawValue>(0) :
127127
initialMemo.Fields.ToDictionary(
128128
kvp => kvp.Key,
129129
kvp => (IRawValue)new RawValue(kvp.Value)),
130130
false);
131-
var initialSearchAttributes = details.Start.SearchAttributes;
131+
var initialSearchAttributes = details.Init.SearchAttributes;
132132
typedSearchAttributes = new(
133133
() => initialSearchAttributes == null ? new(new()) :
134134
SearchAttributeCollection.FromProto(initialSearchAttributes),
135135
false);
136136
var act = details.InitialActivation;
137137
CurrentBuildId = act.BuildIdForCurrentTask;
138-
var start = details.Start;
138+
var start = details.Init;
139139
startArgs = new(
140140
() => DecodeArgs(
141141
method: Definition.RunMethod,
@@ -151,7 +151,7 @@ public WorkflowInstance(WorkflowInstanceDetails details)
151151
{ "task_queue", details.TaskQueue },
152152
{ "workflow_type", start.WorkflowType },
153153
})));
154-
initialSearchAttributes = details.Start.SearchAttributes;
154+
initialSearchAttributes = details.Init.SearchAttributes;
155155
WorkflowInfo.ParentInfo? parent = null;
156156
if (start.ParentWorkflowInfo != null)
157157
{
@@ -188,7 +188,7 @@ public WorkflowInstance(WorkflowInstanceDetails details)
188188
replaySafeLogger = new(logger);
189189
onTaskStarting = details.OnTaskStarting;
190190
onTaskCompleted = details.OnTaskCompleted;
191-
Random = new(details.Start.RandomnessSeed);
191+
Random = new(details.Init.RandomnessSeed);
192192
TracingEventsEnabled = !details.DisableTracingEvents;
193193
workerLevelFailureExceptionTypes = details.WorkerLevelFailureExceptionTypes;
194194
disableEagerActivityExecution = details.DisableEagerActivityExecution;
@@ -538,8 +538,25 @@ public WorkflowActivationCompletion Activate(WorkflowActivation act)
538538
{
539539
// We must set the sync context to null so work isn't posted there
540540
SynchronizationContext.SetSynchronizationContext(null);
541+
// TODO: Temporary workaround in lieu of https://github.com/temporalio/sdk-dotnet/issues/375
542+
var sortedJobs = act.Jobs.OrderBy(j =>
543+
{
544+
switch (j.VariantCase)
545+
{
546+
case WorkflowActivationJob.VariantOneofCase.NotifyHasPatch:
547+
case WorkflowActivationJob.VariantOneofCase.UpdateRandomSeed:
548+
return 1;
549+
case WorkflowActivationJob.VariantOneofCase.SignalWorkflow:
550+
case WorkflowActivationJob.VariantOneofCase.DoUpdate:
551+
return 2;
552+
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
553+
return 3;
554+
default:
555+
return 4;
556+
}
557+
}).ToList();
541558
// We can trust jobs are deterministically ordered by core
542-
foreach (var job in act.Jobs)
559+
foreach (var job in sortedJobs)
543560
{
544561
Apply(job);
545562
// Run scheduler once. Do not check conditions when patching or querying.
@@ -880,8 +897,8 @@ private void Apply(WorkflowActivationJob job)
880897
case WorkflowActivationJob.VariantOneofCase.SignalWorkflow:
881898
ApplySignalWorkflow(job.SignalWorkflow);
882899
break;
883-
case WorkflowActivationJob.VariantOneofCase.StartWorkflow:
884-
ApplyStartWorkflow(job.StartWorkflow);
900+
case WorkflowActivationJob.VariantOneofCase.InitializeWorkflow:
901+
ApplyStartWorkflow(job.InitializeWorkflow);
885902
break;
886903
case WorkflowActivationJob.VariantOneofCase.UpdateRandomSeed:
887904
ApplyUpdateRandomSeed(job.UpdateRandomSeed);
@@ -1292,7 +1309,7 @@ await inbound.Value.HandleSignalAsync(new(
12921309
}));
12931310
}
12941311

1295-
private void ApplyStartWorkflow(StartWorkflow start)
1312+
private void ApplyStartWorkflow(InitializeWorkflow init)
12961313
{
12971314
_ = QueueNewTaskAsync(() => RunTopLevelAsync(async () =>
12981315
{

src/Temporalio/Worker/WorkflowInstanceDetails.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ namespace Temporalio.Worker
1515
/// <param name="TaskQueue">Workflow task queue.</param>
1616
/// <param name="Definition">Workflow definition.</param>
1717
/// <param name="InitialActivation">Initial activation for the workflow.</param>
18-
/// <param name="Start">Start attributes for the workflow.</param>
18+
/// <param name="Init">Start attributes for the workflow.</param>
1919
/// <param name="Interceptors">Interceptors.</param>
2020
/// <param name="PayloadConverter">Payload converter.</param>
2121
/// <param name="FailureConverter">Failure converter.</param>
@@ -32,7 +32,7 @@ internal record WorkflowInstanceDetails(
3232
string TaskQueue,
3333
WorkflowDefinition Definition,
3434
WorkflowActivation InitialActivation,
35-
StartWorkflow Start,
35+
InitializeWorkflow Init,
3636
IReadOnlyCollection<Interceptors.IWorkerInterceptor> Interceptors,
3737
IPayloadConverter PayloadConverter,
3838
IFailureConverter FailureConverter,
@@ -44,4 +44,4 @@ internal record WorkflowInstanceDetails(
4444
Lazy<MetricMeter> RuntimeMetricMeter,
4545
IReadOnlyCollection<Type>? WorkerLevelFailureExceptionTypes,
4646
bool DisableEagerActivityExecution);
47-
}
47+
}

src/Temporalio/Worker/WorkflowWorker.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -254,16 +254,16 @@ private async Task HandleActivationAsync(IPayloadCodec? codec, WorkflowActivatio
254254

255255
private IWorkflowInstance CreateInstance(WorkflowActivation act)
256256
{
257-
var start = act.Jobs.Select(j => j.StartWorkflow).FirstOrDefault(s => s != null) ??
257+
var init = act.Jobs.Select(j => j.InitializeWorkflow).FirstOrDefault(s => s != null) ??
258258
throw new InvalidOperationException("Missing workflow start (unexpectedly evicted?)");
259-
if (!workflows.TryGetValue(start.WorkflowType, out var defn))
259+
if (!workflows.TryGetValue(init.WorkflowType, out var defn))
260260
{
261261
defn = dynamicWorkflow;
262262
if (defn == null)
263263
{
264264
var names = string.Join(", ", workflows.Keys.OrderBy(s => s));
265265
throw new ApplicationFailureException(
266-
$"Workflow type {start.WorkflowType} is not registered on this worker, available workflows: {names}",
266+
$"Workflow type {init.WorkflowType} is not registered on this worker, available workflows: {names}",
267267
"NotFoundError");
268268
}
269269
}
@@ -273,7 +273,7 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
273273
TaskQueue: options.TaskQueue,
274274
Definition: defn,
275275
InitialActivation: act,
276-
Start: start,
276+
Init: init,
277277
Interceptors: options.Interceptors,
278278
PayloadConverter: options.DataConverter.PayloadConverter,
279279
FailureConverter: options.DataConverter.FailureConverter,
@@ -287,4 +287,4 @@ private IWorkflowInstance CreateInstance(WorkflowActivation act)
287287
DisableEagerActivityExecution: options.DisableEagerActivityExecution));
288288
}
289289
}
290-
}
290+
}

0 commit comments

Comments
 (0)