Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion client/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,20 @@ task downloadProtoFiles {

doLast {
def protoDir = file("${rootProject.projectDir}/internal/durabletask-protobuf/protos")
def protoFile = new File(protoDir, 'orchestrator_service.proto')

// Skip download if proto file already exists
if (protoFile.exists()) {
logger.info("Proto file already exists, skipping download")
return
}

protoDir.mkdirs()

// Download the proto file
new URL("https://raw.githubusercontent.com/microsoft/durabletask-protobuf/${ext.branch}/protos/orchestrator_service.proto")
.withInputStream { i ->
new File(protoDir, 'orchestrator_service.proto').withOutputStream { it << i }
protoFile.withOutputStream { it << i }
}

// Get and save the commit hash
Expand Down
88 changes: 79 additions & 9 deletions internal/durabletask-protobuf/protos/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ message TaskFailureDetails {
google.protobuf.StringValue stackTrace = 3;
TaskFailureDetails innerFailure = 4;
bool isNonRetriable = 5;
map<string, google.protobuf.Value> properties = 6;
}

enum OrchestrationStatus {
Expand Down Expand Up @@ -95,6 +96,7 @@ message TaskScheduledEvent {
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
TraceContext parentTraceContext = 4;
map<string, string> tags = 5;
}

message TaskCompletedEvent {
Expand Down Expand Up @@ -192,7 +194,7 @@ message EntityOperationCalledEvent {
}

message EntityLockRequestedEvent {
string criticalSectionId = 1;
string criticalSectionId = 1;
repeated string lockSet = 2;
int32 position = 3;
google.protobuf.StringValue parentInstanceId = 4; // used only within messages, null in histories
Expand All @@ -217,7 +219,14 @@ message EntityUnlockSentEvent {
message EntityLockGrantedEvent {
string criticalSectionId = 1;
}


message ExecutionRewoundEvent {
google.protobuf.StringValue reason = 1;
google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise
google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise
TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise
}

message HistoryEvent {
int32 eventId = 1;
google.protobuf.Timestamp timestamp = 2;
Expand All @@ -244,25 +253,29 @@ message HistoryEvent {
ExecutionResumedEvent executionResumed = 22;
EntityOperationSignaledEvent entityOperationSignaled = 23;
EntityOperationCalledEvent entityOperationCalled = 24;
EntityOperationCompletedEvent entityOperationCompleted = 25;
EntityOperationFailedEvent entityOperationFailed = 26;
EntityOperationCompletedEvent entityOperationCompleted = 25;
EntityOperationFailedEvent entityOperationFailed = 26;
EntityLockRequestedEvent entityLockRequested = 27;
EntityLockGrantedEvent entityLockGranted = 28;
EntityUnlockSentEvent entityUnlockSent = 29;
ExecutionRewoundEvent executionRewound = 30;
}
}

message ScheduleTaskAction {
string name = 1;
google.protobuf.StringValue version = 2;
google.protobuf.StringValue input = 3;
map<string, string> tags = 4;
TraceContext parentTraceContext = 5;
}

message CreateSubOrchestrationAction {
string instanceId = 1;
string name = 2;
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
TraceContext parentTraceContext = 5;
}

message CreateTimerAction {
Expand All @@ -282,6 +295,7 @@ message CompleteOrchestrationAction {
google.protobuf.StringValue newVersion = 4;
repeated HistoryEvent carryoverEvents = 5;
TaskFailureDetails failureDetails = 6;
map<string, string> tags = 7;
}

message TerminateOrchestrationAction {
Expand Down Expand Up @@ -312,6 +326,11 @@ message OrchestratorAction {
}
}

message OrchestrationTraceContext {
google.protobuf.StringValue spanID = 1;
google.protobuf.Timestamp spanStartTime = 2;
}

message OrchestratorRequest {
string instanceId = 1;
google.protobuf.StringValue executionId = 2;
Expand All @@ -320,6 +339,8 @@ message OrchestratorRequest {
OrchestratorEntityParameters entityParameters = 5;
bool requiresHistoryStreaming = 6;
map<string, google.protobuf.Value> properties = 7;

OrchestrationTraceContext orchestrationTraceContext = 8;
}

message OrchestratorResponse {
Expand All @@ -331,6 +352,10 @@ message OrchestratorResponse {
// The number of work item events that were processed by the orchestrator.
// This field is optional. If not set, the service should assume that the orchestrator processed all events.
google.protobuf.Int32Value numEventsProcessed = 5;
OrchestrationTraceContext orchestrationTraceContext = 6;

// Whether or not a history is required to complete the original OrchestratorRequest and none was provided.
bool requiresHistory = 7;
}

message CreateInstanceRequest {
Expand All @@ -343,6 +368,7 @@ message CreateInstanceRequest {
google.protobuf.StringValue executionId = 7;
map<string, string> tags = 8;
TraceContext parentTraceContext = 9;
google.protobuf.Timestamp requestTime = 10;
}

message OrchestrationIdReusePolicy {
Expand Down Expand Up @@ -453,6 +479,7 @@ message PurgeInstancesRequest {
oneof request {
string instanceId = 1;
PurgeInstanceFilter purgeInstanceFilter = 2;
InstanceBatch instanceBatch = 4;
}
bool recursive = 3;
}
Expand All @@ -468,6 +495,15 @@ message PurgeInstancesResponse {
google.protobuf.BoolValue isComplete = 2;
}

message RestartInstanceRequest {
string instanceId = 1;
bool restartWithNewInstanceId = 2;
}

message RestartInstanceResponse {
string instanceId = 1;
}

message CreateTaskHubRequest {
bool recreateIfExists = 1;
}
Expand All @@ -490,10 +526,12 @@ message SignalEntityRequest {
google.protobuf.StringValue input = 3;
string requestId = 4;
google.protobuf.Timestamp scheduledTime = 5;
TraceContext parentTraceContext = 6;
google.protobuf.Timestamp requestTime = 7;
}

message SignalEntityResponse {
// no payload
// no payload
}

message GetEntityRequest {
Expand Down Expand Up @@ -575,6 +613,7 @@ message OperationRequest {
string operation = 1;
string requestId = 2;
google.protobuf.StringValue input = 3;
TraceContext traceContext = 4;
}

message OperationResult {
Expand All @@ -591,10 +630,14 @@ message OperationInfo {

message OperationResultSuccess {
google.protobuf.StringValue result = 1;
google.protobuf.Timestamp startTimeUtc = 2;
google.protobuf.Timestamp endTimeUtc = 3;
}

message OperationResultFailure {
TaskFailureDetails failureDetails = 1;
google.protobuf.Timestamp startTimeUtc = 2;
google.protobuf.Timestamp endTimeUtc = 3;
}

message OperationAction {
Expand All @@ -610,6 +653,8 @@ message SendSignalAction {
string name = 2;
google.protobuf.StringValue input = 3;
google.protobuf.Timestamp scheduledTime = 4;
google.protobuf.Timestamp requestTime = 5;
TraceContext parentTraceContext = 6;
}

message StartNewOrchestrationAction {
Expand All @@ -618,6 +663,8 @@ message StartNewOrchestrationAction {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
google.protobuf.Timestamp scheduledTime = 5;
google.protobuf.Timestamp requestTime = 6;
TraceContext parentTraceContext = 7;
}

message AbandonActivityTaskRequest {
Expand All @@ -644,6 +691,17 @@ message AbandonEntityTaskResponse {
// Empty.
}

message SkipGracefulOrchestrationTerminationsRequest {
InstanceBatch instanceBatch = 1;
google.protobuf.StringValue reason = 2;
}

message SkipGracefulOrchestrationTerminationsResponse {
// Those instances which could not be terminated because they had locked entities at the time of this termination call,
// are already in a terminal state (completed, failed, terminated, etc.), are not orchestrations, or do not exist (i.e. have been purged)
repeated string unterminatedInstanceIds = 1;
}

service TaskHubSidecarService {
// Sends a hello request to the sidecar service.
rpc Hello(google.protobuf.Empty) returns (google.protobuf.Empty);
Expand All @@ -657,18 +715,21 @@ service TaskHubSidecarService {
// Rewinds an orchestration instance to last known good state and replays from there.
rpc RewindInstance(RewindInstanceRequest) returns (RewindInstanceResponse);

// Restarts an orchestration instance.
rpc RestartInstance(RestartInstanceRequest) returns (RestartInstanceResponse);

// Waits for an orchestration instance to reach a running or completion state.
rpc WaitForInstanceStart(GetInstanceRequest) returns (GetInstanceResponse);

// Waits for an orchestration instance to reach a completion state (completed, failed, terminated, etc.).
rpc WaitForInstanceCompletion(GetInstanceRequest) returns (GetInstanceResponse);

// Raises an event to a running orchestration instance.
rpc RaiseEvent(RaiseEventRequest) returns (RaiseEventResponse);

// Terminates a running orchestration instance.
rpc TerminateInstance(TerminateRequest) returns (TerminateResponse);

// Suspends a running orchestration instance.
rpc SuspendInstance(SuspendRequest) returns (SuspendResponse);

Expand Down Expand Up @@ -714,6 +775,10 @@ service TaskHubSidecarService {

// Abandon an entity work item
rpc AbandonTaskEntityWorkItem(AbandonEntityTaskRequest) returns (AbandonEntityTaskResponse);

// "Skip" graceful termination of orchestrations by immediately changing their status in storage to "terminated".
// Note that a maximum of 500 orchestrations can be terminated at a time using this method.
rpc SkipGracefulOrchestrationTerminations(SkipGracefulOrchestrationTerminationsRequest) returns (SkipGracefulOrchestrationTerminationsResponse);
}

message GetWorkItemsRequest {
Expand Down Expand Up @@ -750,7 +815,7 @@ message CompleteTaskResponse {
}

message HealthPing {
// No payload
// No payload
}

message StreamInstanceHistoryRequest {
Expand All @@ -764,3 +829,8 @@ message StreamInstanceHistoryRequest {
message HistoryChunk {
repeated HistoryEvent events = 1;
}

message InstanceBatch {
// A maximum of 500 instance IDs can be provided in this list.
repeated string instanceIds = 1;
}