diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java index 20fc3a05..330d009c 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java @@ -359,4 +359,37 @@ public String restartInstance(String instanceId, boolean restartWithNewInstanceI private PurgeResult toPurgeResult(PurgeInstancesResponse response){ return new PurgeResult(response.getDeletedInstanceCount()); } + + /** + * Streams the history events for an orchestration instance. + * + * @param instanceId The ID of the orchestration instance. + * @param executionId Optional execution ID of the orchestration instance. + * @param forWorkItemProcessing Whether the history is being streamed for work item processing. + * @return An iterator of HistoryChunk objects containing the orchestration history. + */ + public Iterator streamInstanceHistory(String instanceId, String executionId, boolean forWorkItemProcessing) { + Helpers.throwIfArgumentNull(instanceId, "instanceId"); + + StreamInstanceHistoryRequest.Builder requestBuilder = StreamInstanceHistoryRequest.newBuilder() + .setInstanceId(instanceId) + .setForWorkItemProcessing(forWorkItemProcessing); + + if (executionId != null && !executionId.isEmpty()) { + requestBuilder.setExecutionId(StringValue.of(executionId)); + } + + return this.sidecarClient.streamInstanceHistory(requestBuilder.build()); + } + + /** + * Streams the history events for an orchestration instance. + * + * @param instanceId The ID of the orchestration instance. + * @param forWorkItemProcessing Whether the history is being streamed for work item processing. + * @return An iterator of HistoryChunk objects containing the orchestration history. + */ + public Iterator streamInstanceHistory(String instanceId, boolean forWorkItemProcessing) { + return streamInstanceHistory(instanceId, null, forWorkItemProcessing); + } } diff --git a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java index 2c4b472d..e5e2eae4 100644 --- a/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java @@ -6,6 +6,7 @@ import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.*; +import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkerCapability; import com.microsoft.durabletask.implementation.protobuf.OrchestratorService.WorkItem.RequestCase; import com.microsoft.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc.*; @@ -122,7 +123,9 @@ public void startAndBlock() { // TODO: How do we interrupt manually? while (true) { try { - GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder().build(); + GetWorkItemsRequest getWorkItemsRequest = GetWorkItemsRequest.newBuilder() + .addCapabilities(WorkerCapability.WORKER_CAPABILITY_HISTORY_STREAMING) + .build(); Iterator workItemStream = this.sidecarClient.getWorkItems(getWorkItemsRequest); while (workItemStream.hasNext()) { WorkItem workItem = workItemStream.next(); @@ -132,9 +135,19 @@ public void startAndBlock() { // TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava // TODO: Error handling - TaskOrchestratorResult taskOrchestratorResult = taskOrchestrationExecutor.execute( - orchestratorRequest.getPastEventsList(), - orchestratorRequest.getNewEventsList()); + TaskOrchestratorResult taskOrchestratorResult; + + if (orchestratorRequest.getRequiresHistoryStreaming()) { + // Stream the history events when requested by the orchestrator service + taskOrchestratorResult = processOrchestrationWithStreamingHistory( + taskOrchestrationExecutor, + orchestratorRequest); + } else { + // Standard non-streaming execution path + taskOrchestratorResult = taskOrchestrationExecutor.execute( + orchestratorRequest.getPastEventsList(), + orchestratorRequest.getNewEventsList()); + } OrchestratorResponse response = OrchestratorResponse.newBuilder() .setInstanceId(orchestratorRequest.getInstanceId()) @@ -210,4 +223,73 @@ else if (requestType == RequestCase.HEALTHPING) public void stop() { this.close(); } + + /** + * Process an orchestration request using streaming history instead of receiving the full history in the work item. + * This is used when the history is too large to fit in a single gRPC message. + * + * @param taskOrchestrationExecutor the executor to use for processing the orchestration + * @param orchestratorRequest the request containing orchestration details + * @return the result of executing the orchestration + */ + private TaskOrchestratorResult processOrchestrationWithStreamingHistory( + TaskOrchestrationExecutor taskOrchestrationExecutor, + OrchestratorRequest orchestratorRequest) { + + logger.fine(() -> String.format( + "Streaming history for instance '%s' as it requires history streaming", + orchestratorRequest.getInstanceId())); + + // Create a request to stream the instance history + StreamInstanceHistoryRequest.Builder requestBuilder = StreamInstanceHistoryRequest.newBuilder() + .setInstanceId(orchestratorRequest.getInstanceId()) + .setForWorkItemProcessing(true); + + // Include execution ID if present + if (orchestratorRequest.hasExecutionId()) { + requestBuilder.setExecutionId(orchestratorRequest.getExecutionId()); + } + + StreamInstanceHistoryRequest request = requestBuilder.build(); + + // Stream history from the service + List pastEvents = new ArrayList<>(); + List newEvents = new ArrayList<>(); + + try { + // Get a stream of history chunks + Iterator historyStream = this.sidecarClient.streamInstanceHistory(request); + + // Process each chunk of history events + while (historyStream.hasNext()) { + HistoryChunk chunk = historyStream.next(); + + // The first chunk is considered the "past events", and the rest are "new events" + if (pastEvents.isEmpty()) { + pastEvents.addAll(chunk.getEventsList()); + } else { + newEvents.addAll(chunk.getEventsList()); + } + } + + logger.fine(() -> String.format( + "Successfully streamed history for instance '%s': %d past events, %d new events", + orchestratorRequest.getInstanceId(), pastEvents.size(), newEvents.size())); + + // Execute the orchestration with the collected history events + return taskOrchestrationExecutor.execute(pastEvents, newEvents); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + logger.log(Level.WARNING, "The sidecar service is unavailable while streaming history for instance " + + orchestratorRequest.getInstanceId()); + } else if (e.getStatus().getCode() == Status.Code.CANCELLED) { + logger.log(Level.WARNING, "History streaming was canceled for instance " + + orchestratorRequest.getInstanceId()); + } else { + logger.log(Level.WARNING, "Error streaming history for instance " + + orchestratorRequest.getInstanceId(), e); + } + throw e; + } + } } \ No newline at end of file