Skip to content

Commit 6acaa32

Browse files
Updated PurgeInstances to throw TimeoutException (#90)
* Updated PurgeInstances to throw TimeoutException * Updated changelog * Updated timeout and exception message
1 parent 89514a0 commit 6acaa32

File tree

5 files changed

+106
-5
lines changed

5 files changed

+106
-5
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
* Updated package version to v1.0.0 - to be updated
1212
* update DataConverterException with detail error message ([#78](https://github.com/microsoft/durabletask-java/issues/78))
1313
* update OrchestratorBlockedEvent and TaskFailedException to be unchecked exceptions ([#88](https://github.com/microsoft/durabletask-java/issues/88))
14+
* updated PurgeInstances to take a timeout parameter and throw TimeoutException ([#37](https://github.com/microsoft/durabletask-java/issues/37))
1415

1516
### Breaking changes
1617

client/src/main/java/com/microsoft/durabletask/DurableTaskClient.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,6 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
262262
*/
263263
public abstract PurgeResult purgeInstance(String instanceId);
264264

265-
// TODO, https://github.com/microsoft/durabletask-java/issues/37, add a timeout parameter
266265
/**
267266
* Purges orchestration instance metadata from the durable store using a filter that determines which instances to
268267
* purge data for.
@@ -277,7 +276,9 @@ public abstract OrchestrationMetadata waitForInstanceCompletion(
277276
* into multiple method calls over a period of time and have them cover smaller time windows.
278277
*
279278
* @param purgeInstanceCriteria orchestration instance filter criteria used to determine which instances to purge
279+
* @throws TimeoutException when purging instances is not completed within the specified amount of time.
280+
* The default timeout for purging instances is 10 minutes
280281
* @return the result of the purge operation, including the number of purged orchestration instances (0 or 1)
281282
*/
282-
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria);
283+
public abstract PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException;
283284
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcClient.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -252,13 +252,32 @@ public PurgeResult purgeInstance(String instanceId) {
252252
}
253253

254254
@Override
255-
public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) {
255+
public PurgeResult purgeInstances(PurgeInstanceCriteria purgeInstanceCriteria) throws TimeoutException {
256256
PurgeInstanceFilter.Builder builder = PurgeInstanceFilter.newBuilder();
257257
builder.setCreatedTimeFrom(DataConverter.getTimestampFromInstant(purgeInstanceCriteria.getCreatedTimeFrom()));
258258
Optional.ofNullable(purgeInstanceCriteria.getCreatedTimeTo()).ifPresent(createdTimeTo -> builder.setCreatedTimeTo(DataConverter.getTimestampFromInstant(createdTimeTo)));
259259
purgeInstanceCriteria.getRuntimeStatusList().forEach(runtimeStatus -> Optional.ofNullable(runtimeStatus).ifPresent(status -> builder.addRuntimeStatus(OrchestrationRuntimeStatus.toProtobuf(status))));
260-
PurgeInstancesResponse response = this.sidecarClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build());
261-
return toPurgeResult(response);
260+
261+
Duration timeout = purgeInstanceCriteria.getTimeout();
262+
if (timeout == null || timeout.isNegative() || timeout.isZero()) {
263+
timeout = Duration.ofMinutes(4);
264+
}
265+
266+
TaskHubSidecarServiceBlockingStub grpcClient = this.sidecarClient.withDeadlineAfter(
267+
timeout.toMillis(),
268+
TimeUnit.MILLISECONDS);
269+
270+
PurgeInstancesResponse response;
271+
try {
272+
response = grpcClient.purgeInstances(PurgeInstancesRequest.newBuilder().setPurgeInstanceFilter(builder).build());
273+
return toPurgeResult(response);
274+
} catch (StatusRuntimeException e) {
275+
if (e.getStatus().getCode() == Status.Code.DEADLINE_EXCEEDED) {
276+
String timeOutException = String.format("Purge instances timeout duration of %s reached.", timeout);
277+
throw new TimeoutException(timeOutException);
278+
}
279+
throw e;
280+
}
262281
}
263282

264283
private PurgeResult toPurgeResult(PurgeInstancesResponse response){

client/src/main/java/com/microsoft/durabletask/PurgeInstanceCriteria.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package com.microsoft.durabletask;
44

55
import javax.annotation.Nullable;
6+
import java.time.Duration;
67
import java.time.Instant;
78
import java.util.ArrayList;
89
import java.util.List;
@@ -15,6 +16,7 @@ public final class PurgeInstanceCriteria {
1516
private Instant createdTimeFrom;
1617
private Instant createdTimeTo;
1718
private List<OrchestrationRuntimeStatus> runtimeStatusList = new ArrayList<>();
19+
private Duration timeout;
1820

1921
/**
2022
* Creates a new, default instance of the {@code PurgeInstanceCriteria} class.
@@ -58,6 +60,17 @@ public PurgeInstanceCriteria setRuntimeStatusList(List<OrchestrationRuntimeStatu
5860
return this;
5961
}
6062

63+
/**
64+
* Sets a timeout duration for the purge operation. Setting to {@code null} will reset the timeout to be the default value.
65+
*
66+
* @param timeout the amount of time to wait for the purge instance operation to complete
67+
* @return this criteria object
68+
*/
69+
public PurgeInstanceCriteria setTimeout(Duration timeout) {
70+
this.timeout = timeout;
71+
return this;
72+
}
73+
6174
/**
6275
* Gets the configured minimum orchestration creation time or {@code null} if none was configured.
6376
* @return the configured minimum orchestration creation time or {@code null} if none was configured
@@ -83,4 +96,14 @@ public Instant getCreatedTimeTo() {
8396
public List<OrchestrationRuntimeStatus> getRuntimeStatusList() {
8497
return this.runtimeStatusList;
8598
}
99+
100+
/**
101+
* Gets the configured timeout duration or {@code null} if none was configured.
102+
* @return the configured timeout
103+
*/
104+
@Nullable
105+
public Duration getTimeout() {
106+
return this.timeout;
107+
}
108+
86109
}

client/src/test/java/com/microsoft/durabletask/IntegrationTests.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -790,6 +790,63 @@ void purgeInstanceFilter() throws TimeoutException {
790790
}
791791
}
792792

793+
@Test
794+
void purgeInstanceFilterTimeout() throws TimeoutException {
795+
final String orchestratorName = "PurgeInstance";
796+
final String plusOne = "PlusOne";
797+
final String plusTwo = "PlusTwo";
798+
799+
DurableTaskGrpcWorker worker = this.createWorkerBuilder()
800+
.addOrchestrator(orchestratorName, ctx -> {
801+
int value = ctx.getInput(int.class);
802+
value = ctx.callActivity(plusOne, value, int.class).await();
803+
ctx.complete(value);
804+
})
805+
.addActivity(plusOne, ctx -> ctx.getInput(int.class) + 1)
806+
.addOrchestrator(plusOne, ctx -> {
807+
int value = ctx.getInput(int.class);
808+
value = ctx.callActivity(plusOne, value, int.class).await();
809+
ctx.complete(value);
810+
})
811+
.addOrchestrator(plusTwo, ctx -> {
812+
int value = ctx.getInput(int.class);
813+
value = ctx.callActivity(plusTwo, value, int.class).await();
814+
ctx.complete(value);
815+
})
816+
.addActivity(plusTwo, ctx -> ctx.getInput(int.class) + 2)
817+
.buildAndStart();
818+
819+
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
820+
try (worker; client) {
821+
client.createTaskHub(true);
822+
Instant startTime = Instant.now();
823+
824+
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
825+
OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
826+
assertNotNull(metadata);
827+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
828+
assertEquals(1, metadata.readOutputAs(int.class));
829+
830+
String instanceId1 = client.scheduleNewOrchestrationInstance(plusOne, 0);
831+
metadata = client.waitForInstanceCompletion(instanceId1, defaultTimeout, true);
832+
assertNotNull(metadata);
833+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
834+
assertEquals(1, metadata.readOutputAs(int.class));
835+
836+
String instanceId2 = client.scheduleNewOrchestrationInstance(plusTwo, 10);
837+
metadata = client.waitForInstanceCompletion(instanceId2, defaultTimeout, true);
838+
assertNotNull(metadata);
839+
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
840+
assertEquals(12, metadata.readOutputAs(int.class));
841+
842+
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria();
843+
criteria.setCreatedTimeFrom(startTime);
844+
criteria.setTimeout(Duration.ofNanos(1));
845+
846+
assertThrows(TimeoutException.class, () -> client.purgeInstances(criteria));
847+
}
848+
}
849+
793850
@Test()
794851
void waitForInstanceStartThrowsException() {
795852
final String orchestratorName = "orchestratorName";

0 commit comments

Comments
 (0)