Skip to content

Commit d2400cd

Browse files
add additional data to entity work item result, when using with DTS
1 parent e2ce7ee commit d2400cd

File tree

4 files changed

+107
-8
lines changed

4 files changed

+107
-8
lines changed

eng/proto

src/Shared/Grpc/ProtoUtils.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,67 @@ internal static OrchestrationStatus ToCore(this P.OrchestrationStatus status)
513513
};
514514
}
515515

516+
/// <summary>
517+
/// Converts a <see cref="P.EntityRequest" /> to a <see cref="EntityBatchRequest" />.
518+
/// </summary>
519+
/// <param name="entityRequest">The entity request to convert.</param>
520+
/// <param name="responseTargetInstanceIds">A list containing the response target instance ids for calls.</param>
521+
/// <param name="requestIds">A list containing th request ids of the operations.</param>
522+
/// <returns>The converted entity batch request.</returns>
523+
[return: NotNullIfNotNull(nameof(entityRequest))]
524+
internal static EntityBatchRequest? ToEntityBatchRequest(
525+
this P.EntityRequest? entityRequest,
526+
out List<P.OrchestrationInstance?> responseTargetInstanceIds,
527+
out List<string> requestIds)
528+
{
529+
if (entityRequest == null)
530+
{
531+
responseTargetInstanceIds = [];
532+
requestIds = [];
533+
return null;
534+
}
535+
536+
var batchRequest = new EntityBatchRequest()
537+
{
538+
EntityState = entityRequest.EntityState,
539+
InstanceId = entityRequest.InstanceId,
540+
};
541+
responseTargetInstanceIds = new(entityRequest.OperationRequests.Count);
542+
requestIds = new(entityRequest.OperationRequests.Count);
543+
544+
foreach (P.HistoryEvent? op in entityRequest.OperationRequests)
545+
{
546+
if (op.EntityOperationSignaled is not null)
547+
{
548+
batchRequest.Operations!.Add(new OperationRequest
549+
{
550+
Id = Guid.Parse(op.EntityOperationSignaled.RequestId),
551+
Operation = op.EntityOperationSignaled.Operation,
552+
Input = op.EntityOperationSignaled.Input,
553+
});
554+
responseTargetInstanceIds.Add(null);
555+
requestIds.Add(op.EntityOperationSignaled.RequestId);
556+
}
557+
else if (op.EntityOperationCalled is not null)
558+
{
559+
batchRequest.Operations!.Add(new OperationRequest
560+
{
561+
Id = Guid.Parse(op.EntityOperationCalled.RequestId),
562+
Operation = op.EntityOperationCalled.Operation,
563+
Input = op.EntityOperationCalled.Input,
564+
});
565+
responseTargetInstanceIds.Add(new P.OrchestrationInstance
566+
{
567+
InstanceId = op.EntityOperationCalled.ParentInstanceId,
568+
ExecutionId = op.EntityOperationCalled.ParentExecutionId,
569+
});
570+
requestIds.Add(op.EntityOperationCalled.RequestId);
571+
}
572+
}
573+
574+
return batchRequest;
575+
}
576+
516577
/// <summary>
517578
/// Converts a <see cref="P.OperationRequest" /> to a <see cref="OperationRequest" />.
518579
/// </summary>

src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,25 @@ async Task ProcessWorkItemsAsync(AsyncServerStreamingCall<P.WorkItem> stream, Ca
183183
{
184184
this.RunBackgroundTask(
185185
workItem,
186-
() => this.OnRunEntityBatchAsync(workItem.EntityRequest, workItem.CompletionToken));
186+
() => this.OnRunEntityBatchAsync(
187+
workItem.EntityRequest.ToEntityBatchRequest(),
188+
workItem.CompletionToken,
189+
responseDestinations: null,
190+
requestIds: null));
191+
}
192+
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.EntityRequestV2)
193+
{
194+
EntityBatchRequest batchRequest = workItem.EntityRequestV2.ToEntityBatchRequest(
195+
out List<P.OrchestrationInstance?> responseDestinations,
196+
out List<string> requestIds);
197+
198+
this.RunBackgroundTask(
199+
workItem,
200+
() => this.OnRunEntityBatchAsync(
201+
workItem.EntityRequest.ToEntityBatchRequest(),
202+
workItem.CompletionToken,
203+
responseDestinations,
204+
requestIds));
187205
}
188206
else if (workItem.RequestCase == P.WorkItem.RequestOneofCase.HealthPing)
189207
{
@@ -383,14 +401,17 @@ async Task OnRunActivityAsync(P.ActivityRequest request, string completionToken)
383401
await this.sidecar.CompleteActivityTaskAsync(response);
384402
}
385403

386-
async Task OnRunEntityBatchAsync(P.EntityBatchRequest request, string completionToken)
404+
async Task OnRunEntityBatchAsync(
405+
EntityBatchRequest batchRequest,
406+
string completionToken,
407+
List<P.OrchestrationInstance?>? responseDestinations,
408+
IReadOnlyList<string>? requestIds)
387409
{
388-
var coreEntityId = DTCore.Entities.EntityId.FromString(request.InstanceId);
410+
var coreEntityId = DTCore.Entities.EntityId.FromString(batchRequest.InstanceId!);
389411
EntityId entityId = new(coreEntityId.Name, coreEntityId.Key);
390412

391413
TaskName name = new(entityId.Name);
392414

393-
EntityBatchRequest batchRequest = request.ToEntityBatchRequest();
394415
EntityBatchResult? batchResult;
395416

396417
try
@@ -441,9 +462,26 @@ async Task OnRunEntityBatchAsync(P.EntityBatchRequest request, string completion
441462
};
442463
}
443464

444-
// convert the result to protobuf format and send it back
445465
P.EntityBatchResult response = batchResult.ToEntityBatchResult();
466+
446467
response.CompletionToken = completionToken;
468+
469+
if (responseDestinations != null)
470+
{
471+
for (int i = 0; i < response.Results.Count; i++)
472+
{
473+
response.ResponseDestinations.Add(responseDestinations[i]);
474+
}
475+
}
476+
477+
if (requestIds != null)
478+
{
479+
foreach (string requestId in requestIds)
480+
{
481+
response.RequestIds.Add(requestId);
482+
}
483+
}
484+
447485
await this.sidecar.CompleteEntityTaskAsync(response);
448486
}
449487
}

src/Worker/Grpc/GrpcDurableTaskWorkerOptions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ public sealed class GrpcDurableTaskWorkerOptions : DurableTaskWorkerOptions
2424
public CallInvoker? CallInvoker { get; set; }
2525

2626
/// <summary>
27-
/// Gets or sets a flag that indicates whether entity-related events appearing in orchestration histories should be
27+
/// Gets or sets a value indicating whether entity-related events appearing in orchestration histories should be
2828
/// automatically converted back and forth between the old DT Core representation (JSON-encoded external events)
2929
/// and the new protobuf representation (explicit history events), which is used by the DTS scheduler backend.
3030
/// </summary>
3131
public bool ConvertOrchestrationEntityEvents { get; set; }
3232

3333
/// <summary>
34-
/// Gets or sets a flag that indicates whether to automatically add entity
34+
/// Gets or sets a value indicating whether to automatically add entity
3535
/// unlock events into the history when an orchestration terminates while holding an entity lock.
3636
/// </summary>
3737
public bool InsertEntityUnlocksOnCompletion { get; set; }

0 commit comments

Comments
 (0)