Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 102 additions & 6 deletions src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
() => this.OnRunOrchestratorAsync(
workItem.OrchestratorRequest,
workItem.CompletionToken,
cancellation));
cancellation),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.ActivityRequest)
{
Expand All @@ -285,13 +286,15 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
() => this.OnRunActivityAsync(
workItem.ActivityRequest,
workItem.CompletionToken,
cancellation));
cancellation),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequest)
{
this.RunBackgroundTask(
workItem,
() => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation));
() => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequestV2)
{
Expand All @@ -305,7 +308,8 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
batchRequest,
cancellation,
workItem.CompletionToken,
operationInfos));
operationInfos),
cancellation);
}
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing)
{
Expand Down Expand Up @@ -333,10 +337,11 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
}
}

void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler)
void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler, CancellationToken cancellation)
{
// TODO: is Task.Run appropriate here? Should we have finer control over the tasks and their threads?
_ = Task.Run(async () =>
_ = Task.Run(
async () =>
{
try
{
Expand All @@ -351,8 +356,99 @@ void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler)
string instanceId =
workItem?.OrchestratorRequest?.InstanceId ??
workItem?.ActivityRequest?.OrchestrationInstance?.InstanceId ??
workItem?.EntityRequest?.InstanceId ??
workItem?.EntityRequestV2?.InstanceId ??
string.Empty;
this.Logger.UnexpectedError(ex, instanceId);

if (workItem?.OrchestratorRequest != null)
{
try
{
this.Logger.AbandoningOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskOrchestratorWorkItemAsync(
new P.AbandonOrchestrationTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
this.Logger.UnexpectedError(abandonException, instanceId);
}
}
else if (workItem?.ActivityRequest != null)
{
try
{
this.Logger.AbandoningActivityWorkItem(
instanceId,
workItem.ActivityRequest.Name,
workItem.ActivityRequest.TaskId,
workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskActivityWorkItemAsync(
new P.AbandonActivityTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedActivityWorkItem(
instanceId,
workItem.ActivityRequest.Name,
workItem.ActivityRequest.TaskId,
workItem?.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
this.Logger.UnexpectedError(abandonException, instanceId);
}
}
else if (workItem?.EntityRequest != null)
{
try
{
this.Logger.AbandoningEntityWorkItem(
workItem.EntityRequest.InstanceId,
workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskEntityWorkItemAsync(
new P.AbandonEntityTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedEntityWorkItem(
workItem.EntityRequest.InstanceId,
workItem?.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
this.Logger.UnexpectedError(abandonException, workItem.EntityRequest.InstanceId);
}
}
else if (workItem?.EntityRequestV2 != null)
{
try
{
this.Logger.AbandoningEntityWorkItem(
workItem.EntityRequestV2.InstanceId,
workItem?.CompletionToken ?? string.Empty);
await this.client.AbandonTaskEntityWorkItemAsync(
new P.AbandonEntityTaskRequest
{
CompletionToken = workItem?.CompletionToken,
},
cancellationToken: cancellation);
this.Logger.AbandonedEntityWorkItem(
workItem.EntityRequestV2.InstanceId,
workItem?.CompletionToken ?? string.Empty);
}
catch (Exception abandonException)
{
this.Logger.UnexpectedError(abandonException, workItem.EntityRequestV2.InstanceId);
}
}
}
});
}
Expand Down
19 changes: 19 additions & 0 deletions src/Worker/Grpc/Logs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,24 @@ static partial class Logs

[LoggerMessage(EventId = 59, Level = LogLevel.Information, Message = "Abandoning orchestration due to filtering. InstanceId = '{instanceId}'. Completion token = '{completionToken}'")]
public static partial void AbandoningOrchestrationDueToOrchestrationFilter(this ILogger logger, string instanceId, string completionToken);

// Abandoning/Abandoned logs for background task error handling
[LoggerMessage(EventId = 60, Level = LogLevel.Information, Message = "{instanceId}: Abandoning orchestrator work item. Completion token = '{completionToken}'")]
public static partial void AbandoningOrchestratorWorkItem(this ILogger logger, string instanceId, string completionToken);

[LoggerMessage(EventId = 61, Level = LogLevel.Information, Message = "{instanceId}: Abandoned orchestrator work item. Completion token = '{completionToken}'")]
public static partial void AbandonedOrchestratorWorkItem(this ILogger logger, string instanceId, string completionToken);

[LoggerMessage(EventId = 62, Level = LogLevel.Information, Message = "{instanceId}: Abandoning activity work item '{name}#{taskId}'. Completion token = '{completionToken}'")]
public static partial void AbandoningActivityWorkItem(this ILogger logger, string instanceId, string name, int taskId, string completionToken);

[LoggerMessage(EventId = 63, Level = LogLevel.Information, Message = "{instanceId}: Abandoned activity work item '{name}#{taskId}'. Completion token = '{completionToken}'")]
public static partial void AbandonedActivityWorkItem(this ILogger logger, string instanceId, string name, int taskId, string completionToken);

[LoggerMessage(EventId = 64, Level = LogLevel.Information, Message = "{instanceId}: Abandoning entity work item. Completion token = '{completionToken}'")]
public static partial void AbandoningEntityWorkItem(this ILogger logger, string instanceId, string completionToken);

[LoggerMessage(EventId = 65, Level = LogLevel.Information, Message = "{instanceId}: Abandoned entity work item. Completion token = '{completionToken}'")]
public static partial void AbandonedEntityWorkItem(this ILogger logger, string instanceId, string completionToken);
}
}
Loading
Loading