Skip to content

Commit c303a12

Browse files
authored
Implement suspend and resume client APIs (#104)
* fix comments to align with official doc * implement suspend and resume client apis * add integration test * add integration tests * update integratio tests * update CHANGELOG.md * fix potential NPE of terminate method - update unit tests for suspend logics * update release notes - minior refactor unit test * update sidecar image for testing
1 parent 6d6dff9 commit c303a12

File tree

8 files changed

+226
-62
lines changed

8 files changed

+226
-62
lines changed

.github/workflows/build-validation.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ jobs:
4242
arguments: build
4343
# TODO: Move the sidecar into a central image repository
4444
- name: Initialize Durable Task Sidecar
45-
run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d cgillum/durabletask-sidecar:latest start --backend Emulator
45+
run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d kaibocai/durabletask-sidecar:latest start --backend Emulator
4646
- name: Integration Tests with Gradle
4747
uses: gradle/gradle-build-action@bc3340afc5e3cc44f2321809ac090d731c13c514
4848
with:

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,10 @@
1+
## v1.1.0
2+
3+
### Updates
4+
* Support Suspend and Resume Client APIs ([#104](https://github.com/microsoft/durabletask-java/issues/104))
5+
* Fix the potential NPE issue of `DurableTaskClient terminate` method ([#104](https://github.com/microsoft/durabletask-java/issues/104))
6+
7+
18
## v1.0.0
29

310
### New

client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,4 +281,34 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
281281
* @return the result of the purge operation, including the number of purged orchestration instances (0 or 1)
282282
*/
283283
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;
284+
285+
/**
286+
* Suspends a running orchestration instance.
287+
* @param instanceId the ID of the orchestration instance to suspend
288+
*/
289+
public void suspendInstance (String instanceId) {
290+
this.suspendInstance(instanceId, null);
291+
}
292+
293+
/**
294+
* Resumes a running orchestration instance.
295+
* @param instanceId the ID of the orchestration instance to resume
296+
*/
297+
public void resumeInstance(String instanceId) {
298+
this.resumeInstance(instanceId, null);
299+
}
300+
301+
/**
302+
* Suspends a running orchestration instance.
303+
* @param instanceId the ID of the orchestration instance to suspend
304+
* @param reason the reason for suspending the orchestration instance
305+
*/
306+
public abstract void suspendInstance(String instanceId, @Nullable String reason);
307+
308+
/**
309+
* Resumes a running orchestration instance.
310+
* @param instanceId the ID of the orchestration instance to resume
311+
* @param reason the reason for resuming the orchestration instance
312+
*/
313+
public abstract void resumeInstance(String instanceId, @Nullable String reason);
284314
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,10 @@ public void terminate(String instanceId, @Nullable Object output) {
204204
"Terminating instance %s and setting output to: %s",
205205
instanceId,
206206
serializeOutput != null ? serializeOutput : "(null)"));
207-
TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId).setOutput(StringValue.of(serializeOutput));
207+
TerminateRequest.Builder builder = TerminateRequest.newBuilder().setInstanceId(instanceId);
208+
if (serializeOutput != null){
209+
builder.setOutput(StringValue.of(serializeOutput));
210+
}
208211
this.sidecarClient.terminateInstance(builder.build());
209212
}
210213

@@ -280,6 +283,26 @@ public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) t
280283
}
281284
}
282285

286+
@Override
287+
public void suspendInstance(String instanceId, @Nullable String reason) {
288+
SuspendRequest.Builder suspendRequestBuilder = SuspendRequest.newBuilder();
289+
suspendRequestBuilder.setInstanceId(instanceId);
290+
if (reason != null) {
291+
suspendRequestBuilder.setReason(StringValue.of(reason));
292+
}
293+
this.sidecarClient.suspendInstance(suspendRequestBuilder.build());
294+
}
295+
296+
@Override
297+
public void resumeInstance(String instanceId, @Nullable String reason) {
298+
ResumeRequest.Builder resumeRequestBuilder = ResumeRequest.newBuilder();
299+
resumeRequestBuilder.setInstanceId(instanceId);
300+
if (reason != null) {
301+
resumeRequestBuilder.setReason(StringValue.of(reason));
302+
}
303+
this.sidecarClient.resumeInstance(resumeRequestBuilder.build());
304+
}
305+
283306
private PurgeResult toPurgeResult(PurgeInstancesResponse response){
284307
return new PurgeResult(response.getDeletedInstanceCount());
285308
}

client/src/main/java/com/microsoft/durabletask/OrchestrationRuntimeStatus.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@ public enum OrchestrationRuntimeStatus {
4646
/**
4747
* The orchestration was scheduled but hasn't started running.
4848
*/
49-
PENDING;
49+
PENDING,
50+
51+
/**
52+
* The orchestration is in a suspended state.
53+
*/
54+
SUSPENDED;
5055

5156
static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) {
5257
switch (status) {
@@ -64,6 +69,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestrationStatus status) {
6469
return TERMINATED;
6570
case ORCHESTRATION_STATUS_PENDING:
6671
return PENDING;
72+
case ORCHESTRATION_STATUS_SUSPENDED:
73+
return SUSPENDED;
6774
default:
6875
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
6976
}
@@ -85,6 +92,8 @@ static OrchestrationStatus toProtobuf(OrchestrationRuntimeStatus status){
8592
return ORCHESTRATION_STATUS_TERMINATED;
8693
case PENDING:
8794
return ORCHESTRATION_STATUS_PENDING;
95+
case SUSPENDED:
96+
return ORCHESTRATION_STATUS_SUSPENDED;
8897
default:
8998
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
9099
}

client/src/main/java/com/microsoft/durabletask/TaskOrchestrationExecutor.java

Lines changed: 87 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -66,21 +66,22 @@ private class ContextImplTask implements TaskOrchestrationContext {
6666
private String instanceId;
6767
private Instant currentInstant;
6868
private boolean isComplete;
69+
private boolean isSuspended;
6970
private boolean isReplaying = true;
7071

7172
// LinkedHashMap to maintain insertion order when returning the list of pending actions
7273
private final LinkedHashMap<Integer, OrchestratorAction> pendingActions = new LinkedHashMap<>();
7374
private final HashMap<Integer, TaskRecord<?>> openTasks = new HashMap<>();
7475
private final LinkedHashMap<String, Queue<TaskRecord<?>>> outstandingEvents = new LinkedHashMap<>();
7576
private final LinkedList<HistoryEvent> unprocessedEvents = new LinkedList<>();
77+
private final Queue<HistoryEvent> eventsWhileSuspended = new ArrayDeque<>();
7678
private final DataConverter dataConverter = TaskOrchestrationExecutor.this.dataConverter;
7779
private final Logger logger = TaskOrchestrationExecutor.this.logger;
7880
private final OrchestrationHistoryIterator historyEventPlayer;
7981
private int sequenceNumber;
8082
private boolean continuedAsNew;
8183
private Object continuedAsNewInput;
8284
private boolean preserveUnprocessedEvents;
83-
8485
private Object customStatus;
8586

8687
public ContextImplTask(List<HistoryEvent> pastEvents, List<HistoryEvent> newEvents) {
@@ -524,6 +525,23 @@ private void handleEventRaised(HistoryEvent e) {
524525
task.complete(result);
525526
}
526527

528+
private void handleEventWhileSuspended (HistoryEvent historyEvent){
529+
if (historyEvent.getEventTypeCase() != HistoryEvent.EventTypeCase.EXECUTIONSUSPENDED) {
530+
eventsWhileSuspended.offer(historyEvent);
531+
}
532+
}
533+
534+
private void handleExecutionSuspended(HistoryEvent historyEvent) {
535+
this.isSuspended = true;
536+
}
537+
538+
private void handleExecutionResumed(HistoryEvent historyEvent) {
539+
this.isSuspended = false;
540+
while (!eventsWhileSuspended.isEmpty()) {
541+
this.processEvent(eventsWhileSuspended.poll());
542+
}
543+
}
544+
527545
public Task<Void> createTimer(Duration duration) {
528546
Helpers.throwIfOrchestratorComplete(this.isComplete);
529547
Helpers.throwIfArgumentNull(duration, "duration");
@@ -717,75 +735,86 @@ private boolean processNextEvent() {
717735
}
718736

719737
private void processEvent(HistoryEvent e) {
720-
switch (e.getEventTypeCase()) {
721-
case ORCHESTRATORSTARTED:
722-
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
723-
this.setCurrentInstant(instant);
724-
break;
725-
case ORCHESTRATORCOMPLETED:
726-
// No action
727-
break;
728-
case EXECUTIONSTARTED:
729-
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
730-
String name = startedEvent.getName();
731-
this.setName(name);
732-
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
733-
this.setInstanceId(instanceId);
734-
String input = startedEvent.getInput().getValue();
735-
this.setInput(input);
736-
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
737-
if (factory == null) {
738-
// Try getting the default orchestrator
739-
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
740-
}
741-
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
742-
TaskOrchestration orchestrator = factory.create();
743-
orchestrator.run(this);
744-
break;
738+
boolean overrideSuspension = e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONRESUMED || e.getEventTypeCase() == HistoryEvent.EventTypeCase.EXECUTIONTERMINATED;
739+
if (this.isSuspended && !overrideSuspension) {
740+
this.handleEventWhileSuspended(e);
741+
} else {
742+
switch (e.getEventTypeCase()) {
743+
case ORCHESTRATORSTARTED:
744+
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
745+
this.setCurrentInstant(instant);
746+
break;
747+
case ORCHESTRATORCOMPLETED:
748+
// No action
749+
break;
750+
case EXECUTIONSTARTED:
751+
ExecutionStartedEvent startedEvent = e.getExecutionStarted();
752+
String name = startedEvent.getName();
753+
this.setName(name);
754+
String instanceId = startedEvent.getOrchestrationInstance().getInstanceId();
755+
this.setInstanceId(instanceId);
756+
String input = startedEvent.getInput().getValue();
757+
this.setInput(input);
758+
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.get(name);
759+
if (factory == null) {
760+
// Try getting the default orchestrator
761+
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
762+
}
763+
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
764+
TaskOrchestration orchestrator = factory.create();
765+
orchestrator.run(this);
766+
break;
745767
// case EXECUTIONCOMPLETED:
746768
// break;
747769
// case EXECUTIONFAILED:
748770
// break;
749-
case EXECUTIONTERMINATED:
750-
this.handleExecutionTerminated(e);
751-
break;
752-
case TASKSCHEDULED:
753-
this.handleTaskScheduled(e);
754-
break;
755-
case TASKCOMPLETED:
756-
this.handleTaskCompleted(e);
757-
break;
758-
case TASKFAILED:
759-
this.handleTaskFailed(e);
760-
break;
761-
case TIMERCREATED:
762-
this.handleTimerCreated(e);
763-
break;
764-
case TIMERFIRED:
765-
this.handleTimerFired(e);
766-
break;
767-
case SUBORCHESTRATIONINSTANCECREATED:
768-
this.handleSubOrchestrationCreated(e);
769-
break;
770-
case SUBORCHESTRATIONINSTANCECOMPLETED:
771-
this.handleSubOrchestrationCompleted(e);
772-
break;
773-
case SUBORCHESTRATIONINSTANCEFAILED:
774-
this.handleSubOrchestrationFailed(e);
775-
break;
771+
case EXECUTIONTERMINATED:
772+
this.handleExecutionTerminated(e);
773+
break;
774+
case TASKSCHEDULED:
775+
this.handleTaskScheduled(e);
776+
break;
777+
case TASKCOMPLETED:
778+
this.handleTaskCompleted(e);
779+
break;
780+
case TASKFAILED:
781+
this.handleTaskFailed(e);
782+
break;
783+
case TIMERCREATED:
784+
this.handleTimerCreated(e);
785+
break;
786+
case TIMERFIRED:
787+
this.handleTimerFired(e);
788+
break;
789+
case SUBORCHESTRATIONINSTANCECREATED:
790+
this.handleSubOrchestrationCreated(e);
791+
break;
792+
case SUBORCHESTRATIONINSTANCECOMPLETED:
793+
this.handleSubOrchestrationCompleted(e);
794+
break;
795+
case SUBORCHESTRATIONINSTANCEFAILED:
796+
this.handleSubOrchestrationFailed(e);
797+
break;
776798
// case EVENTSENT:
777799
// break;
778-
case EVENTRAISED:
779-
this.handleEventRaised(e);
780-
break;
800+
case EVENTRAISED:
801+
this.handleEventRaised(e);
802+
break;
781803
// case GENERICEVENT:
782804
// break;
783805
// case HISTORYSTATE:
784806
// break;
785807
// case EVENTTYPE_NOT_SET:
786808
// break;
787-
default:
788-
throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
809+
case EXECUTIONSUSPENDED:
810+
this.handleExecutionSuspended(e);
811+
break;
812+
case EXECUTIONRESUMED:
813+
this.handleExecutionResumed(e);
814+
break;
815+
default:
816+
throw new IllegalStateException("Don't know how to handle history type " + e.getEventTypeCase());
817+
}
789818
}
790819
}
791820

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,6 +319,72 @@ void termination() throws TimeoutException {
319319
}
320320
}
321321

322+
@Test
323+
void suspendResumeOrchestration() throws TimeoutException, InterruptedException {
324+
final String orchestratorName = "suspend";
325+
final String eventName = "MyEvent";
326+
final String eventPayload = "testPayload";
327+
final Duration suspendTimeout = Duration.ofSeconds(5);
328+
329+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
330+
.addOrchestrator(orchestratorName, ctx -> {
331+
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
332+
ctx.complete(payload);
333+
})
334+
.buildAndStart();
335+
336+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
337+
try (worker; client) {
338+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
339+
client.suspendInstance(instanceId);
340+
OrchestrationMetadata instance = client.waitForInstanceStart(instanceId, defaultTimeout);
341+
assertNotNull(instance);
342+
assertEquals(OrchestrationRuntimeStatus.SUSPENDED, instance.getRuntimeStatus());
343+
344+
client.raiseEvent(instanceId, eventName, eventPayload);
345+
346+
assertThrows(
347+
TimeoutException.class,
348+
() -> client.waitForInstanceCompletion(instanceId, suspendTimeout, false),
349+
"Expected to throw TimeoutException, but it didn't"
350+
);
351+
352+
String resumeReason = "Resume for testing.";
353+
client.resumeInstance(instanceId, resumeReason);
354+
instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
355+
assertNotNull(instance);
356+
assertEquals(instanceId, instance.getInstanceId());
357+
assertEquals(eventPayload, instance.readOutputAs(String.class));
358+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, instance.getRuntimeStatus());
359+
}
360+
}
361+
362+
@Test
363+
void terminateSuspendOrchestration() throws TimeoutException, InterruptedException {
364+
final String orchestratorName = "suspendResume";
365+
final String eventName = "MyEvent";
366+
final String eventPayload = "testPayload";
367+
368+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
369+
.addOrchestrator(orchestratorName, ctx -> {
370+
String payload = ctx.waitForExternalEvent(eventName, String.class).await();
371+
ctx.complete(payload);
372+
})
373+
.buildAndStart();
374+
375+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
376+
try (worker; client) {
377+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
378+
String suspendReason = "Suspend for testing.";
379+
client.suspendInstance(instanceId, suspendReason);
380+
client.terminate(instanceId, null);
381+
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, defaultTimeout, false);
382+
assertNotNull(instance);
383+
assertEquals(instanceId, instance.getInstanceId());
384+
assertEquals(OrchestrationRuntimeStatus.TERMINATED, instance.getRuntimeStatus());
385+
}
386+
}
387+
322388
@Test
323389
void activityFanOut() throws IOException, TimeoutException {
324390
final String orchestratorName = "ActivityFanOut";

0 commit comments

Comments
 (0)