Skip to content

Commit 204ff20

Browse files
authored
Merge branch 'main' into wangbill/largepayloadv2
2 parents 8da17bd + b217eb3 commit 204ff20

File tree

4 files changed

+45
-22
lines changed

4 files changed

+45
-22
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
# Changelog
2+
3+
## v1.15.1 (Unreleased)
4+
- Add version check to activities by @halspang in ([#472](https://github.com/microsoft/durabletask-dotnet/pull/472))
5+
26
## v1.15.0
37
- Abandon workitem if processing workitem failed by @YunchuWang in ([#467](https://github.com/microsoft/durabletask-dotnet/pull/467))
48
- Extended Sessions for Isolated (Orchestrations) by @sophiatev in ([#449](https://github.com/microsoft/durabletask-dotnet/pull/449))

eng/targets/Release.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
</PropertyGroup>
1818

1919
<PropertyGroup>
20-
<VersionPrefix>1.15.0</VersionPrefix>
20+
<VersionPrefix>1.15.1</VersionPrefix>
2121
<VersionSuffix></VersionSuffix>
2222
</PropertyGroup>
2323

src/Worker/Core/Shims/TaskOrchestrationContextWrapper.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ public override async Task<T> CallActivityAsync<T>(
154154
{
155155
return await this.innerContext.ScheduleTask<T>(
156156
name.Name,
157-
name.Version,
157+
this.innerContext.Version,
158158
options: ScheduleTaskOptions.CreateBuilder()
159159
.WithRetryOptions(policy.ToDurableTaskCoreRetryOptions())
160160
.WithTags(tags)
@@ -166,7 +166,7 @@ public override async Task<T> CallActivityAsync<T>(
166166
return await this.InvokeWithCustomRetryHandler(
167167
() => this.innerContext.ScheduleTask<T>(
168168
name.Name,
169-
name.Version,
169+
this.innerContext.Version,
170170
options: ScheduleTaskOptions.CreateBuilder()
171171
.WithTags(tags)
172172
.Build(),
@@ -179,7 +179,7 @@ public override async Task<T> CallActivityAsync<T>(
179179
{
180180
return await this.innerContext.ScheduleTask<T>(
181181
name.Name,
182-
name.Version,
182+
this.innerContext.Version,
183183
options: ScheduleTaskOptions.CreateBuilder()
184184
.WithTags(tags)
185185
.Build(),

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 37 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -759,37 +759,56 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken,
759759

760760
OrchestrationInstance instance = request.OrchestrationInstance.ToCore();
761761
string rawInput = request.Input;
762-
763762
int inputSize = rawInput != null ? Encoding.UTF8.GetByteCount(rawInput) : 0;
764763
this.Logger.ReceivedActivityRequest(request.Name, request.TaskId, instance.InstanceId, inputSize);
765764

765+
P.TaskFailureDetails? failureDetails = null;
766766
TaskContext innerContext = new(instance);
767767
TaskName name = new(request.Name);
768768
string? output = null;
769-
P.TaskFailureDetails? failureDetails = null;
770-
try
769+
770+
failureDetails = EvaluateOrchestrationVersioning(this.worker.workerOptions.Versioning, request.Version, out bool versioningFailed);
771+
if (!versioningFailed)
771772
{
772-
await using AsyncServiceScope scope = this.worker.services.CreateAsyncScope();
773-
if (this.worker.Factory.TryCreateActivity(name, scope.ServiceProvider, out ITaskActivity? activity))
773+
try
774774
{
775-
// Both the factory invocation and the RunAsync could involve user code and need to be handled as
776-
// part of try/catch.
777-
TaskActivity shim = this.shimFactory.CreateActivity(name, activity);
778-
output = await shim.RunAsync(innerContext, request.Input);
775+
await using AsyncServiceScope scope = this.worker.services.CreateAsyncScope();
776+
if (this.worker.Factory.TryCreateActivity(name, scope.ServiceProvider, out ITaskActivity? activity))
777+
{
778+
// Both the factory invocation and the RunAsync could involve user code and need to be handled as
779+
// part of try/catch.
780+
TaskActivity shim = this.shimFactory.CreateActivity(name, activity);
781+
output = await shim.RunAsync(innerContext, request.Input);
782+
}
783+
else
784+
{
785+
failureDetails = new P.TaskFailureDetails
786+
{
787+
ErrorType = "ActivityTaskNotFound",
788+
ErrorMessage = $"No activity task named '{name}' was found.",
789+
IsNonRetriable = true,
790+
};
791+
}
779792
}
780-
else
793+
catch (Exception applicationException)
781794
{
782-
failureDetails = new P.TaskFailureDetails
783-
{
784-
ErrorType = "ActivityTaskNotFound",
785-
ErrorMessage = $"No activity task named '{name}' was found.",
786-
IsNonRetriable = true,
787-
};
795+
failureDetails = applicationException.ToTaskFailureDetails();
788796
}
789797
}
790-
catch (Exception applicationException)
798+
else
791799
{
792-
failureDetails = applicationException.ToTaskFailureDetails();
800+
if (this.worker.workerOptions.Versioning?.FailureStrategy == DurableTaskWorkerOptions.VersionFailureStrategy.Reject)
801+
{
802+
this.Logger.AbandoningActivityWorkItem(instance.InstanceId, request.Name, request.TaskId, completionToken);
803+
await this.client.AbandonTaskActivityWorkItemAsync(
804+
new P.AbandonActivityTaskRequest
805+
{
806+
CompletionToken = completionToken,
807+
},
808+
cancellationToken: cancellation);
809+
}
810+
811+
return;
793812
}
794813

795814
int outputSizeInBytes = 0;

0 commit comments

Comments
 (0)