From 5bfb0ee2c9aed017b3e61913a4fc8a3efdebb8a5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 23 May 2025 05:50:21 +0000 Subject: [PATCH 1/3] Initial plan for issue From 4169347a2583f66b94afa0230b2ccaee5b9d3b3b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 23 May 2025 05:57:50 +0000 Subject: [PATCH 2/3] Add support for history streaming Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../durabletask/DurableTaskGrpcClient.java | 20 +++++ .../durabletask/DurableTaskGrpcWorker.java | 78 ++++++++++++++++++- 2 files changed, 94 insertions(+), 4 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 20fc3a05..0f5dfc26 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -359,4 +359,24 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } + + /** + * Streams the history events for an orchestration instance. + * + * @param instanceId The ID of the orchestration instance. + * @param executionId Optional execution ID of the orchestration instance. + * @param forWorkItemProcessing Whether the history is being streamed for work item processing. + * @return An iterator of HistoryChunk objects containing the orchestration history. + */ + public Iterator streamInstanceHistory(String instanceId, String executionId, boolean forWorkItemProcessing) { + StreamInstanceHistoryRequest.Builder requestBuilder = StreamInstanceHistoryRequest.newBuilder() + .setInstanceId(instanceId) + .setForWorkItemProcessing(forWorkItemProcessing); + + if (executionId != null && !executionId.isEmpty()) { + requestBuilder.setExecutionId(StringValue.of(executionId)); + } + + return this.sidecarClient.streamInstanceHistory(requestBuilder.build()); + } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 2c4b472d..cdb622b5 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -6,6 +6,7 @@ import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkerCapability; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase; import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; @@ -122,7 +123,9 @@ public void startAndBlock() { // TODO: How do we interrupt manually? while (true) { try { - GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); + GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder() + .addCapabilities(WorkerCapability.WORKER_CAPABILITY_HISTORY_STREAMING) + .build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { WorkItem workItem = workItemStream.next(); @@ -132,9 +135,19 @@ public void startAndBlock() { // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava // TODO: Error handling - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); + TaskOrchestratorResult taskOrchestratorResult; + + if (orchestratorRequest.getRequiresHistoryStreaming()) { + // Stream the history events when requested by the orchestrator service + taskOrchestratorResult = processOrchestrationWithStreamingHistory( + taskOrchestrationExecutor, + orchestratorRequest); + } else { + // Standard non-streaming execution path + taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList()); + } OrchestratorResponse response = OrchestratorResponse.newBuilder() .setInstanceId(orchestratorRequest.getInstanceId()) @@ -210,4 +223,61 @@ else if (requestType == RequestCase.HEALTHPING) public void stop() { this.close(); } + + /** + * Process an orchestration request using streaming history instead of receiving the full history in the work item. + * This is used when the history is too large to fit in a single gRPC message. + * + * @param taskOrchestrationExecutor the executor to use for processing the orchestration + * @param orchestratorRequest the request containing orchestration details + * @return the result of executing the orchestration + */ + private TaskOrchestratorResult processOrchestrationWithStreamingHistory( + TaskOrchestrationExecutor taskOrchestrationExecutor, + OrchestratorRequest orchestratorRequest) { + + logger.fine(() -> String.format( + "Streaming history for instance '%s' as it requires history streaming", + orchestratorRequest.getInstanceId())); + + // Create a request to stream the instance history + StreamInstanceHistoryRequest.Builder requestBuilder = StreamInstanceHistoryRequest.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .setForWorkItemProcessing(true); + + // Include execution ID if present + if (orchestratorRequest.hasExecutionId()) { + requestBuilder.setExecutionId(orchestratorRequest.getExecutionId()); + } + + StreamInstanceHistoryRequest request = requestBuilder.build(); + + // Stream history from the service + List pastEvents = new ArrayList<>(); + List newEvents = new ArrayList<>(); + + try { + // Get a stream of history chunks + Iterator historyStream = this.sidecarClient.streamInstanceHistory(request); + + // Process each chunk of history events + while (historyStream.hasNext()) { + HistoryChunk chunk = historyStream.next(); + + // The first chunk is considered the "past events", and the rest are "new events" + if (pastEvents.isEmpty()) { + pastEvents.addAll(chunk.getEventsList()); + } else { + newEvents.addAll(chunk.getEventsList()); + } + } + + // Execute the orchestration with the collected history events + return taskOrchestrationExecutor.execute(pastEvents, newEvents); + } catch (StatusRuntimeException e) { + logger.log(Level.WARNING, "Error streaming history for instance " + + orchestratorRequest.getInstanceId(), e); + throw e; + } + } } \ No newline at end of file From b2cd20fdf1937533674bf41365b3a3b26b327e1b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 23 May 2025 06:00:38 +0000 Subject: [PATCH 3/3] Improve history streaming implementation with better error handling and convenience methods Co-authored-by: YunchuWang <12449837+YunchuWang@users.noreply.github.com> --- .../durabletask/DurableTaskGrpcClient.java | 13 +++++++++++++ .../durabletask/DurableTaskGrpcWorker.java | 16 ++++++++++++++-- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 0f5dfc26..330d009c 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -369,6 +369,8 @@ private PurgeResult toPurgeResult(PurgeInstancesResponse response){ * @return An iterator of HistoryChunk objects containing the orchestration history. */ public Iterator streamInstanceHistory(String instanceId, String executionId, boolean forWorkItemProcessing) { + Helpers.throwIfArgumentNull(instanceId, "instanceId"); + StreamInstanceHistoryRequest.Builder requestBuilder = StreamInstanceHistoryRequest.newBuilder() .setInstanceId(instanceId) .setForWorkItemProcessing(forWorkItemProcessing); @@ -379,4 +381,15 @@ public Iterator streamInstanceHistory(String instanceId, String ex return this.sidecarClient.streamInstanceHistory(requestBuilder.build()); } + + /** + * Streams the history events for an orchestration instance. + * + * @param instanceId The ID of the orchestration instance. + * @param forWorkItemProcessing Whether the history is being streamed for work item processing. + * @return An iterator of HistoryChunk objects containing the orchestration history. + */ + public Iterator streamInstanceHistory(String instanceId, boolean forWorkItemProcessing) { + return streamInstanceHistory(instanceId, null, forWorkItemProcessing); + } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index cdb622b5..e5e2eae4 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -272,11 +272,23 @@ private TaskOrchestratorResult processOrchestrationWithStreamingHistory( } } + logger.fine(() -> String.format( + "Successfully streamed history for instance '%s': %d past events, %d new events", + orchestratorRequest.getInstanceId(), pastEvents.size(), newEvents.size())); + // Execute the orchestration with the collected history events return taskOrchestrationExecutor.execute(pastEvents, newEvents); } catch (StatusRuntimeException e) { - logger.log(Level.WARNING, "Error streaming history for instance " + - orchestratorRequest.getInstanceId(), e); + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, "The sidecar service is unavailable while streaming history for instance " + + orchestratorRequest.getInstanceId()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, "History streaming was canceled for instance " + + orchestratorRequest.getInstanceId()); + } else { + logger.log(Level.WARNING, "Error streaming history for instance " + + orchestratorRequest.getInstanceId(), e); + } throw e; } }