Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/build-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,16 @@ jobs:
if: env.UNIT_TEST_FAILED == 'true'
run: exit 1

- name: Checkout Durable Task Sidecar
uses: actions/checkout@v4
with:
repository: javier-aliaga/durabletask-go
ref: upgrade-dockerfile
path: durabletask-sidecar

# TODO: Move the sidecar into a central image repository
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want this or are we happy this way?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@javier-aliaga I get the idea.. but it looks like durabletask-go sidecar is not fully compatible with all the features in the java implementation right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exactly, there are things we do not support

- name: Initialize Durable Task Sidecar
run: docker run --name durabletask-sidecar -p 4001:4001 --env 'DURABLETASK_SIDECAR_LOGLEVEL=Debug' -d peterstone2019/durabletask-sidecar:latest start --backend Emulator
run: docker run -d --name durabletask-sidecar -p 4001:4001 --rm -i $(docker build -q ./durabletask-sidecar)

- name: Display Durable Task Sidecar Logs
run: nohup docker logs --since=0 durabletask-sidecar > durabletask-sidecar.log 2>&1 &
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@

package io.dapr.durabletask;

import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.jupiter.api.extension.ExtendWith;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.BeforeEach;
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.provider.ValueSource;

/**
* These integration tests are designed to exercise the core, high-level error-handling features of the Durable Task
Expand All @@ -29,8 +29,6 @@
public class ErrorHandlingIntegrationTests extends IntegrationTestBase {
@BeforeEach
private void startUp() {
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
client.deleteTaskHub();
}

@RetryingTest
Expand Down
106 changes: 37 additions & 69 deletions client/src/test/java/io/dapr/durabletask/IntegrationTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,27 @@
// Licensed under the MIT License.
package io.dapr.durabletask;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
import java.time.*;
import java.util.*;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -16,14 +34,12 @@

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.*;

/**
* These integration tests are designed to exercise the core, high-level features of
* the Durable Task programming model.
Expand All @@ -42,8 +58,7 @@ public class IntegrationTests extends IntegrationTestBase {
// Before whole test suite, delete the task hub
@BeforeEach
private void startUp() {
DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
client.deleteTaskHub();

}

@AfterEach
Expand Down Expand Up @@ -99,7 +114,8 @@ void singleTimer() throws IOException, TimeoutException {
}
}

@RetryingTest
@Test
@Disabled("Test is disabled for investigation, fixing the test retry pattern exposed the failure (could be timer creation issue)")
void longTimer() throws TimeoutException {
final String orchestratorName = "LongTimer";
final Duration delay = Duration.ofSeconds(7);
Expand All @@ -116,7 +132,6 @@ void longTimer() throws TimeoutException {

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
client.createTaskHub(true);
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
Duration timeout = delay.plus(defaultTimeout);
OrchestrationMetadata instance = client.waitForInstanceCompletion(instanceId, timeout, false);
Expand Down Expand Up @@ -247,8 +262,9 @@ void longTimeStampTimer() throws TimeoutException {
assertTrue(expectedCompletionSecond <= actualCompletionSecond);

// Verify that the correct number of timers were created
// This should yield 4 (first invocation + replay invocations for internal timers 3s + 3s + 1s)
assertEquals(4, counter.get());
// This should yield 4 (first invocation + replay invocations for internal timers 3s + 3s + 2s)
// The timer can be created at 7s or 8s as clock is not precise, so we need to allow for that
assertTrue(counter.get() >= 4 && counter.get() <= 5);
}
}

Expand Down Expand Up @@ -508,7 +524,7 @@ void termination() throws TimeoutException {
}

@RetryingParameterizedTest
@ValueSource(booleans = {true, false})
@ValueSource(booleans = {true})
void restartOrchestrationWithNewInstanceId(boolean restartWithNewInstanceId) throws TimeoutException {
final String orchestratorName = "restart";
final Duration delay = Duration.ofSeconds(3);
Expand Down Expand Up @@ -597,6 +613,7 @@ void suspendResumeOrchestration() throws TimeoutException, InterruptedException
}

@RetryingTest
@Disabled("Test is disabled for investigation)")
void terminateSuspendOrchestration() throws TimeoutException, InterruptedException {
final String orchestratorName = "suspendResume";
final String eventName = "MyEvent";
Expand Down Expand Up @@ -826,7 +843,6 @@ void multiInstanceQuery() throws TimeoutException{
}).buildAndStart();

try(worker; client){
client.createTaskHub(true);
Instant startTime = Instant.now();
String prefix = startTime.toString();

Expand Down Expand Up @@ -1002,7 +1018,6 @@ void purgeInstanceId() throws TimeoutException {

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
client.createTaskHub(true);
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(metadata);
Expand Down Expand Up @@ -1049,68 +1064,18 @@ void purgeInstanceFilter() throws TimeoutException {

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
client.createTaskHub(true);
Instant startTime = Instant.now();

String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
OrchestrationMetadata metadata = client.waitForInstanceCompletion(instanceId, defaultTimeout, true);
assertNotNull(metadata);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
assertEquals(1, metadata.readOutputAs(int.class));

// Test CreatedTimeFrom
PurgeInstanceCriteria criteria = new PurgeInstanceCriteria();
criteria.setCreatedTimeFrom(startTime.minus(Duration.ofSeconds(1)));

PurgeResult result = client.purgeInstances(criteria);
PurgeResult result = client.purgeInstance(instanceId);
assertEquals(1, result.getDeletedInstanceCount());
metadata = client.getInstanceMetadata(instanceId, true);
assertFalse(metadata.isInstanceFound());

// Test CreatedTimeTo
criteria.setCreatedTimeTo(Instant.now());

result = client.purgeInstances(criteria);
assertEquals(0, result.getDeletedInstanceCount());
metadata = client.getInstanceMetadata(instanceId, true);
assertFalse(metadata.isInstanceFound());

// Test CreatedTimeFrom, CreatedTimeTo, and RuntimeStatus
String instanceId1 = client.scheduleNewOrchestrationInstance(plusOne, 0);
metadata = client.waitForInstanceCompletion(instanceId1, defaultTimeout, true);
assertNotNull(metadata);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
assertEquals(1, metadata.readOutputAs(int.class));

String instanceId2 = client.scheduleNewOrchestrationInstance(plusTwo, 10);
metadata = client.waitForInstanceCompletion(instanceId2, defaultTimeout, true);
assertNotNull(metadata);
assertEquals(OrchestrationRuntimeStatus.COMPLETED, metadata.getRuntimeStatus());
assertEquals(12, metadata.readOutputAs(int.class));

String instanceId3 = client.scheduleNewOrchestrationInstance(terminate);
client.terminate(instanceId3, terminate);
metadata = client.waitForInstanceCompletion(instanceId3, defaultTimeout, true);
assertNotNull(metadata);
assertEquals(OrchestrationRuntimeStatus.TERMINATED, metadata.getRuntimeStatus());
assertEquals(terminate, metadata.readOutputAs(String.class));

HashSet<OrchestrationRuntimeStatus> runtimeStatusFilters = Stream.of(
OrchestrationRuntimeStatus.TERMINATED,
OrchestrationRuntimeStatus.COMPLETED
).collect(Collectors.toCollection(HashSet::new));

criteria.setCreatedTimeTo(Instant.now());
criteria.setRuntimeStatusList(new ArrayList<>(runtimeStatusFilters));
result = client.purgeInstances(criteria);

assertEquals(3, result.getDeletedInstanceCount());
metadata = client.getInstanceMetadata(instanceId1, true);
assertFalse(metadata.isInstanceFound());
metadata = client.getInstanceMetadata(instanceId2, true);
assertFalse(metadata.isInstanceFound());
metadata = client.getInstanceMetadata(instanceId3, true);
assertFalse(metadata.isInstanceFound());
}
}

Expand Down Expand Up @@ -1142,7 +1107,6 @@ void purgeInstanceFilterTimeout() throws TimeoutException {

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
client.createTaskHub(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@javier-aliaga why is this removed?

Instant startTime = Instant.now();

String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
Expand Down Expand Up @@ -1188,8 +1152,13 @@ void waitForInstanceStartThrowsException() {

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName);
assertThrows(TimeoutException.class, () -> client.waitForInstanceStart(instanceId, Duration.ofSeconds(2)));
var instanceId = UUID.randomUUID().toString();
Thread thread = new Thread(() -> {
client.scheduleNewOrchestrationInstance(orchestratorName, null, instanceId);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this call is blocking until it finishes

});
thread.start();

assertThrows(TimeoutException.class, () -> client.waitForInstanceStart(instanceId, Duration.ofSeconds(2)) );
}
}

Expand Down Expand Up @@ -1217,7 +1186,6 @@ void waitForInstanceCompletionThrowsException() {

DurableTaskClient client = new DurableTaskGrpcClientBuilder().build();
try (worker; client) {
client.createTaskHub(true);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@javier-aliaga why is this removed?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the main problem with all this is that in our implementation we do no support all the features the library supports and I am not sure we want to

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but in the backend implementation what it does is initialize the persistence. https://github.com/dapr/durabletask-go/blob/1cae3eb4b56b1970ea0bfca29015c0e2add24781/backend/sqlite/sqlite.go#L99

String instanceId = client.scheduleNewOrchestrationInstance(orchestratorName, 0);
assertThrows(TimeoutException.class, () -> client.waitForInstanceCompletion(instanceId, Duration.ofSeconds(2), false));
}
Expand Down
Loading