Skip to content

Commit 9d40f6e

Browse files
committed
Merge branch 'wangbill/largepayloadv2' of https://github.com/microsoft/durabletask-dotnet into wangbill/largepayloadv2
2 parents fbecfd2 + e30dfda commit 9d40f6e

File tree

3 files changed

+617
-6
lines changed

3 files changed

+617
-6
lines changed

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 102 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,8 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
276276
() => this.OnRunOrchestratorAsync(
277277
workItem.OrchestratorRequest,
278278
workItem.CompletionToken,
279-
cancellation));
279+
cancellation),
280+
cancellation);
280281
}
281282
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.ActivityRequest)
282283
{
@@ -285,13 +286,15 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
285286
() => this.OnRunActivityAsync(
286287
workItem.ActivityRequest,
287288
workItem.CompletionToken,
288-
cancellation));
289+
cancellation),
290+
cancellation);
289291
}
290292
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequest)
291293
{
292294
this.RunBackgroundTask(
293295
workItem,
294-
() => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation));
296+
() => this.OnRunEntityBatchAsync(workItem.EntityRequest.ToEntityBatchRequest(), cancellation),
297+
cancellation);
295298
}
296299
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequestV2)
297300
{
@@ -305,7 +308,8 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
305308
batchRequest,
306309
cancellation,
307310
workItem.CompletionToken,
308-
operationInfos));
311+
operationInfos),
312+
cancellation);
309313
}
310314
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing)
311315
{
@@ -333,10 +337,11 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
333337
}
334338
}
335339

336-
void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler)
340+
void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler, CancellationToken cancellation)
337341
{
338342
// TODO: is Task.Run appropriate here? Should we have finer control over the tasks and their threads?
339-
_ = Task.Run(async () =>
343+
_ = Task.Run(
344+
async () =>
340345
{
341346
try
342347
{
@@ -351,8 +356,99 @@ void RunBackgroundTask(P.WorkItem? workItem, Func<Task> handler)
351356
string instanceId =
352357
workItem?.OrchestratorRequest?.InstanceId ??
353358
workItem?.ActivityRequest?.OrchestrationInstance?.InstanceId ??
359+
workItem?.EntityRequest?.InstanceId ??
360+
workItem?.EntityRequestV2?.InstanceId ??
354361
string.Empty;
355362
this.Logger.UnexpectedError(ex, instanceId);
363+
364+
if (workItem?.OrchestratorRequest != null)
365+
{
366+
try
367+
{
368+
this.Logger.AbandoningOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
369+
await this.client.AbandonTaskOrchestratorWorkItemAsync(
370+
new P.AbandonOrchestrationTaskRequest
371+
{
372+
CompletionToken = workItem?.CompletionToken,
373+
},
374+
cancellationToken: cancellation);
375+
this.Logger.AbandonedOrchestratorWorkItem(instanceId, workItem?.CompletionToken ?? string.Empty);
376+
}
377+
catch (Exception abandonException)
378+
{
379+
this.Logger.UnexpectedError(abandonException, instanceId);
380+
}
381+
}
382+
else if (workItem?.ActivityRequest != null)
383+
{
384+
try
385+
{
386+
this.Logger.AbandoningActivityWorkItem(
387+
instanceId,
388+
workItem.ActivityRequest.Name,
389+
workItem.ActivityRequest.TaskId,
390+
workItem?.CompletionToken ?? string.Empty);
391+
await this.client.AbandonTaskActivityWorkItemAsync(
392+
new P.AbandonActivityTaskRequest
393+
{
394+
CompletionToken = workItem?.CompletionToken,
395+
},
396+
cancellationToken: cancellation);
397+
this.Logger.AbandonedActivityWorkItem(
398+
instanceId,
399+
workItem.ActivityRequest.Name,
400+
workItem.ActivityRequest.TaskId,
401+
workItem?.CompletionToken ?? string.Empty);
402+
}
403+
catch (Exception abandonException)
404+
{
405+
this.Logger.UnexpectedError(abandonException, instanceId);
406+
}
407+
}
408+
else if (workItem?.EntityRequest != null)
409+
{
410+
try
411+
{
412+
this.Logger.AbandoningEntityWorkItem(
413+
workItem.EntityRequest.InstanceId,
414+
workItem?.CompletionToken ?? string.Empty);
415+
await this.client.AbandonTaskEntityWorkItemAsync(
416+
new P.AbandonEntityTaskRequest
417+
{
418+
CompletionToken = workItem?.CompletionToken,
419+
},
420+
cancellationToken: cancellation);
421+
this.Logger.AbandonedEntityWorkItem(
422+
workItem.EntityRequest.InstanceId,
423+
workItem?.CompletionToken ?? string.Empty);
424+
}
425+
catch (Exception abandonException)
426+
{
427+
this.Logger.UnexpectedError(abandonException, workItem.EntityRequest.InstanceId);
428+
}
429+
}
430+
else if (workItem?.EntityRequestV2 != null)
431+
{
432+
try
433+
{
434+
this.Logger.AbandoningEntityWorkItem(
435+
workItem.EntityRequestV2.InstanceId,
436+
workItem?.CompletionToken ?? string.Empty);
437+
await this.client.AbandonTaskEntityWorkItemAsync(
438+
new P.AbandonEntityTaskRequest
439+
{
440+
CompletionToken = workItem?.CompletionToken,
441+
},
442+
cancellationToken: cancellation);
443+
this.Logger.AbandonedEntityWorkItem(
444+
workItem.EntityRequestV2.InstanceId,
445+
workItem?.CompletionToken ?? string.Empty);
446+
}
447+
catch (Exception abandonException)
448+
{
449+
this.Logger.UnexpectedError(abandonException, workItem.EntityRequestV2.InstanceId);
450+
}
451+
}
356452
}
357453
});
358454
}

src/Worker/Grpc/Logs.cs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,5 +60,24 @@ static partial class Logs
6060

6161
[LoggerMessage(EventId = 59, Level = LogLevel.Information, Message = "Abandoning orchestration due to filtering. InstanceId = '{instanceId}'. Completion token = '{completionToken}'")]
6262
public static partial void AbandoningOrchestrationDueToOrchestrationFilter(this ILogger logger, string instanceId, string completionToken);
63+
64+
// Abandoning/Abandoned logs for background task error handling
65+
[LoggerMessage(EventId = 60, Level = LogLevel.Information, Message = "{instanceId}: Abandoning orchestrator work item. Completion token = '{completionToken}'")]
66+
public static partial void AbandoningOrchestratorWorkItem(this ILogger logger, string instanceId, string completionToken);
67+
68+
[LoggerMessage(EventId = 61, Level = LogLevel.Information, Message = "{instanceId}: Abandoned orchestrator work item. Completion token = '{completionToken}'")]
69+
public static partial void AbandonedOrchestratorWorkItem(this ILogger logger, string instanceId, string completionToken);
70+
71+
[LoggerMessage(EventId = 62, Level = LogLevel.Information, Message = "{instanceId}: Abandoning activity work item '{name}#{taskId}'. Completion token = '{completionToken}'")]
72+
public static partial void AbandoningActivityWorkItem(this ILogger logger, string instanceId, string name, int taskId, string completionToken);
73+
74+
[LoggerMessage(EventId = 63, Level = LogLevel.Information, Message = "{instanceId}: Abandoned activity work item '{name}#{taskId}'. Completion token = '{completionToken}'")]
75+
public static partial void AbandonedActivityWorkItem(this ILogger logger, string instanceId, string name, int taskId, string completionToken);
76+
77+
[LoggerMessage(EventId = 64, Level = LogLevel.Information, Message = "{instanceId}: Abandoning entity work item. Completion token = '{completionToken}'")]
78+
public static partial void AbandoningEntityWorkItem(this ILogger logger, string instanceId, string completionToken);
79+
80+
[LoggerMessage(EventId = 65, Level = LogLevel.Information, Message = "{instanceId}: Abandoned entity work item. Completion token = '{completionToken}'")]
81+
public static partial void AbandonedEntityWorkItem(this ILogger logger, string instanceId, string completionToken);
6382
}
6483
}

0 commit comments

Comments
 (0)