diff --git a/src/test/java/com/uber/cadence/RegisterTestDomain.java b/src/test/java/com/uber/cadence/RegisterTestDomain.java index 602a6a3a0..0b4be6f45 100644 --- a/src/test/java/com/uber/cadence/RegisterTestDomain.java +++ b/src/test/java/com/uber/cadence/RegisterTestDomain.java @@ -1,7 +1,7 @@ package com.uber.cadence; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN2; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN2; import com.uber.cadence.serviceclient.ClientOptions; import com.uber.cadence.serviceclient.IWorkflowService; diff --git a/src/test/java/com/uber/cadence/testUtils/CadenceTestContext.java b/src/test/java/com/uber/cadence/testUtils/CadenceTestContext.java new file mode 100644 index 000000000..67f40c08c --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/CadenceTestContext.java @@ -0,0 +1,224 @@ +package com.uber.cadence.testUtils; + +import com.uber.cadence.FeatureFlags; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.internal.worker.PollerOptions; +import com.uber.cadence.serviceclient.ClientOptions; +import com.uber.cadence.serviceclient.IWorkflowService; +import com.uber.cadence.serviceclient.WorkflowServiceTChannel; +import com.uber.cadence.testing.TestEnvironmentOptions; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.worker.WorkerFactory; +import com.uber.cadence.worker.WorkerFactoryOptions; +import com.uber.cadence.worker.WorkerOptions; +import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public class CadenceTestContext { + + private final Map workers = new HashMap<>(); + private List> delayedCallbacks = new ArrayList<>(); + private final IWorkflowService wfService; + private final WorkflowClient workflowClient; + private final String defaultTaskList; + private final TracingWorkflowInterceptorFactory tracer; + private WorkerFactory workerFactory; + // Real Service only + private ScheduledExecutorService scheduledExecutor; + // Test service only + private TestWorkflowEnvironment testEnvironment; + + private CadenceTestContext( + WorkflowClient workflowClient, + String defaultTaskList, + TracingWorkflowInterceptorFactory tracer, + WorkerFactory workerFactory, + ScheduledExecutorService scheduledExecutor, + TestWorkflowEnvironment testEnvironment) { + this.wfService = workflowClient.getService(); + this.workflowClient = workflowClient; + this.defaultTaskList = defaultTaskList; + this.tracer = tracer; + this.workerFactory = workerFactory; + this.scheduledExecutor = scheduledExecutor; + this.testEnvironment = testEnvironment; + } + + public Worker getDefaultWorker() { + return getOrCreateWorker(getDefaultTaskList()); + } + + public Worker getOrCreateWorker(String taskList) { + return workers.computeIfAbsent(taskList, this::createWorker); + } + + public void start() { + if (isRealService()) { + workerFactory.start(); + } else { + testEnvironment.start(); + } + } + + public void stop() { + if (!workerFactory.isStarted() || workerFactory.isTerminated() || workerFactory.isShutdown()) { + return; + } + if (isRealService()) { + workerFactory.shutdown(); + workerFactory.awaitTermination(1, TimeUnit.SECONDS); + for (ScheduledFuture result : delayedCallbacks) { + if (result.isDone() && !result.isCancelled()) { + try { + result.get(); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted, some assertions may not have run", e); + } catch (ExecutionException e) { + if (e.getCause() instanceof AssertionError) { + throw (AssertionError) e.getCause(); + } else { + throw new RuntimeException("Failed to complete callback", e.getCause()); + } + } + } + } + wfService.close(); + } else { + testEnvironment.shutdown(); + testEnvironment.awaitTermination(1, TimeUnit.SECONDS); + } + if (tracer != null) { + tracer.assertExpected(); + } + } + + public void suspendPolling() { + workerFactory.suspendPolling(); + } + + public void resumePolling() { + workerFactory.resumePolling(); + } + + public void registerDelayedCallback(Duration delay, Runnable r) { + if (isRealService()) { + ScheduledFuture result = + scheduledExecutor.schedule(r, delay.toMillis(), TimeUnit.MILLISECONDS); + delayedCallbacks.add(result); + } else { + testEnvironment.registerDelayedCallback(delay, r); + } + } + + public void sleep(Duration d) { + if (isRealService()) { + try { + Thread.sleep(d.toMillis()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted", e); + } + } else { + testEnvironment.sleep(d); + } + } + + public long currentTimeMillis() { + if (isRealService()) { + return System.currentTimeMillis(); + } else { + return testEnvironment.currentTimeMillis(); + } + } + + public String getDefaultTaskList() { + return defaultTaskList; + } + + public WorkflowClient getWorkflowClient() { + return workflowClient; + } + + public WorkflowClient createWorkflowClient(WorkflowClientOptions options) { + if (isRealService()) { + return WorkflowClient.newInstance(getWorkflowClient().getService(), options); + } else { + return testEnvironment.newWorkflowClient(options); + } + } + + public TracingWorkflowInterceptorFactory getTracer() { + return tracer; + } + + private boolean isRealService() { + return testEnvironment == null; + } + + private Worker createWorker(String taskList) { + if (isRealService()) { + return workerFactory.newWorker( + taskList, + WorkerOptions.newBuilder() + .setActivityPollerOptions(PollerOptions.newBuilder().setPollThreadCount(5).build()) + .setMaxConcurrentActivityExecutionSize(1000) + .setInterceptorFactory(tracer) + .build()); + } else { + return testEnvironment.newWorker(taskList); + } + } + + public static CadenceTestContext forTestService( + Function envFactory, + WorkflowClientOptions clientOptions, + String defaultTaskList, + WorkerFactoryOptions workerFactoryOptions) { + TracingWorkflowInterceptorFactory tracer = new TracingWorkflowInterceptorFactory(); + + TestEnvironmentOptions testOptions = + new TestEnvironmentOptions.Builder() + .setWorkflowClientOptions(clientOptions) + .setInterceptorFactory(tracer) + .setWorkerFactoryOptions(workerFactoryOptions) + .build(); + TestWorkflowEnvironment testEnvironment = envFactory.apply(testOptions); + return new CadenceTestContext( + testEnvironment.newWorkflowClient(), + defaultTaskList, + tracer, + testEnvironment.getWorkerFactory(), + null, + testEnvironment); + } + + public static CadenceTestContext forRealService( + WorkflowClientOptions clientOptions, + String defaultTaskList, + WorkerFactoryOptions workerFactoryOptions) { + TracingWorkflowInterceptorFactory tracer = new TracingWorkflowInterceptorFactory(); + + IWorkflowService wfService = + new WorkflowServiceTChannel( + ClientOptions.newBuilder() + .setFeatureFlags( + new FeatureFlags().setWorkflowExecutionAlreadyCompletedErrorEnabled(true)) + .build()); + WorkflowClient workflowClient = WorkflowClient.newInstance(wfService, clientOptions); + WorkerFactory workerFactory = new WorkerFactory(workflowClient, workerFactoryOptions); + ScheduledExecutorService scheduledExecutor = new ScheduledThreadPoolExecutor(1); + return new CadenceTestContext( + workflowClient, defaultTaskList, tracer, workerFactory, scheduledExecutor, null); + } +} diff --git a/src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java b/src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java new file mode 100644 index 000000000..ad1bad0df --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/CadenceTestRule.java @@ -0,0 +1,289 @@ +package com.uber.cadence.testUtils; + +import com.uber.cadence.activity.ActivityOptions; +import com.uber.cadence.activity.LocalActivityOptions; +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.testing.TestEnvironmentOptions; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import com.uber.cadence.worker.Worker; +import com.uber.cadence.worker.WorkerFactoryOptions; +import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.UUID; +import java.util.function.Function; +import org.junit.AssumptionViolatedException; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.junit.rules.Timeout; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +public class CadenceTestRule implements TestRule { + + private final Builder builder; + private CadenceTestContext context; + + private CadenceTestRule(Builder builder) { + this.builder = builder; + } + + @Override + public Statement apply(Statement testCase, Description description) { + // Unless the test overrides the timeout, apply our own + Test annotation = description.getAnnotation(Test.class); + if (TestEnvironment.isDebuggerTimeouts() || (annotation == null || annotation.timeout() > 0)) { + testCase = Timeout.millis(getDefaultTestTimeout().toMillis()).apply(testCase, description); + } + Statement finalStatement = testCase; + return new Statement() { + @Override + public void evaluate() throws Throwable { + if (description.getAnnotation(RequiresDockerService.class) != null && !isDockerService()) { + throw new AssumptionViolatedException( + "Skipping test because it requires the Docker service"); + } + if (description.getAnnotation(RequiresTestService.class) != null && isDockerService()) { + throw new AssumptionViolatedException( + "Skipping test because it requires the test service"); + } + setup(description); + try { + finalStatement.evaluate(); + } finally { + teardown(); + } + } + }; + } + + public TracingWorkflowInterceptorFactory getTracer() { + return context.getTracer(); + } + + public WorkflowClient getWorkflowClient() { + return context.getWorkflowClient(); + } + + public WorkflowClient createWorkflowClient(WorkflowClientOptions options) { + return context.createWorkflowClient(options); + } + + public Worker getWorker() { + return context.getDefaultWorker(); + } + + public Worker getWorker(String tasklist) { + return context.getOrCreateWorker(tasklist); + } + + public String getDefaultTaskList() { + return context.getDefaultTaskList(); + } + + public void start() { + context.start(); + } + + public void stop() { + context.stop(); + } + + public void suspendPolling() { + context.suspendPolling(); + } + + public void resumePolling() { + context.resumePolling(); + } + + public void registerDelayedCallback(Duration delay, Runnable r) { + context.registerDelayedCallback(delay, r); + } + + public void sleep(Duration d) { + context.sleep(d); + } + + public long currentTimeMillis() { + return context.currentTimeMillis(); + } + + public WorkflowOptions.Builder workflowOptionsBuilder() { + return workflowOptionsBuilder(context.getDefaultTaskList()); + } + + public WorkflowOptions.Builder workflowOptionsBuilder(String taskList) { + if (TestEnvironment.isDebuggerTimeouts()) { + return new WorkflowOptions.Builder() + .setExecutionStartToCloseTimeout(Duration.ofSeconds(1000)) + .setTaskStartToCloseTimeout(Duration.ofSeconds(60)) + .setTaskList(taskList); + } else { + return new WorkflowOptions.Builder() + .setExecutionStartToCloseTimeout(Duration.ofSeconds(30)) + .setTaskStartToCloseTimeout(Duration.ofSeconds(5)) + .setTaskList(taskList); + } + } + + public ActivityOptions activityOptions() { + return activityOptions(context.getDefaultTaskList()); + } + + public ActivityOptions activityOptions(String taskList) { + if (TestEnvironment.isDebuggerTimeouts()) { + return new ActivityOptions.Builder() + .setTaskList(taskList) + .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) + .setHeartbeatTimeout(Duration.ofSeconds(1000)) + .setScheduleToStartTimeout(Duration.ofSeconds(1000)) + .setStartToCloseTimeout(Duration.ofSeconds(10000)) + .build(); + } else { + return new ActivityOptions.Builder() + .setTaskList(taskList) + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .setHeartbeatTimeout(Duration.ofSeconds(5)) + .setScheduleToStartTimeout(Duration.ofSeconds(5)) + .setStartToCloseTimeout(Duration.ofSeconds(10)) + .build(); + } + } + + public LocalActivityOptions localActivityOptions() { + if (TestEnvironment.isDebuggerTimeouts()) { + return new LocalActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) + .build(); + } else { + return new LocalActivityOptions.Builder() + .setScheduleToCloseTimeout(Duration.ofSeconds(5)) + .build(); + } + } + + private void setup(Description description) { + String testMethod = description.getMethodName(); + String defaultTaskList = + description.getClassName() + "-" + testMethod + "-" + UUID.randomUUID(); + + WorkflowClientOptions clientOptions = + WorkflowClientOptions.newBuilder(builder.clientOptions).setDomain(builder.domain).build(); + + if (isDockerService()) { + this.context = + CadenceTestContext.forRealService( + clientOptions, defaultTaskList, builder.workerFactoryOptions); + } else { + this.context = + CadenceTestContext.forTestService( + builder.testWorkflowEnvironmentProvider, + clientOptions, + defaultTaskList, + builder.workerFactoryOptions); + } + + if (!builder.activities.isEmpty() || !builder.workflowTypes.isEmpty()) { + Worker defaultWorker = context.getDefaultWorker(); + if (!builder.activities.isEmpty()) { + defaultWorker.registerActivitiesImplementations(builder.activities.toArray()); + } + if (!builder.workflowTypes.isEmpty()) { + defaultWorker.registerWorkflowImplementationTypes( + builder.workflowTypes.stream().toArray(Class[]::new)); + } + } + if (builder.startWorkers) { + context.start(); + } + } + + private void teardown() { + context.stop(); + this.context = null; + } + + private Duration getDefaultTestTimeout() { + if (!builder.timeout.isZero()) { + return builder.timeout; + } + if (TestEnvironment.isDebuggerTimeouts()) { + return Duration.ofSeconds(500); + } + if (isDockerService()) { + return Duration.ofSeconds(30); + } + return Duration.ofSeconds(10); + } + + public boolean isDockerService() { + return TestEnvironment.isUseDockerService(); + } + + public static Builder builder() { + return new Builder(); + } + + public static class Builder { + private List> workflowTypes = new ArrayList<>(); + private List activities = new ArrayList<>(); + private WorkerFactoryOptions workerFactoryOptions = WorkerFactoryOptions.newBuilder().build(); + private Function + testWorkflowEnvironmentProvider = TestWorkflowEnvironment::newInstance; + private WorkflowClientOptions clientOptions = WorkflowClientOptions.newBuilder().build(); + + private boolean startWorkers = false; + private Duration timeout = Duration.ZERO; + private String domain = TestEnvironment.DOMAIN; + + public Builder withWorkflowTypes(Class... workflowImpls) { + workflowTypes = Arrays.asList(workflowImpls); + return this; + } + + public Builder withActivities(Object... activities) { + this.activities = Arrays.asList(activities); + return this; + } + + public Builder withWorkerFactoryOptions(WorkerFactoryOptions options) { + this.workerFactoryOptions = options; + return this; + } + + public Builder withTimeout(Duration timeout) { + this.timeout = timeout; + return this; + } + + public Builder withDomain(String domain) { + this.domain = domain; + return this; + } + + public Builder withClientOptions(WorkflowClientOptions options) { + this.clientOptions = options; + return this; + } + + public Builder startWorkersAutomatically() { + startWorkers = true; + return this; + } + + public Builder withTestEnvironmentProvider( + Function testEnvironmentProvider) { + this.testWorkflowEnvironmentProvider = testEnvironmentProvider; + return this; + } + + public CadenceTestRule build() { + return new CadenceTestRule(this); + } + } +} diff --git a/src/test/java/com/uber/cadence/testUtils/RequiresDockerService.java b/src/test/java/com/uber/cadence/testUtils/RequiresDockerService.java new file mode 100644 index 000000000..49088e9fa --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/RequiresDockerService.java @@ -0,0 +1,10 @@ +package com.uber.cadence.testUtils; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface RequiresDockerService {} diff --git a/src/test/java/com/uber/cadence/testUtils/RequiresTestService.java b/src/test/java/com/uber/cadence/testUtils/RequiresTestService.java new file mode 100644 index 000000000..6a4580893 --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/RequiresTestService.java @@ -0,0 +1,10 @@ +package com.uber.cadence.testUtils; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface RequiresTestService {} diff --git a/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java new file mode 100644 index 000000000..ed20804f9 --- /dev/null +++ b/src/test/java/com/uber/cadence/testUtils/TestEnvironment.java @@ -0,0 +1,24 @@ +package com.uber.cadence.testUtils; + +public final class TestEnvironment { + public static final String DOMAIN = "UnitTest"; + public static final String DOMAIN2 = "UnitTest2"; + /** + * When set to true increases test, activity and workflow timeouts to large values to support + * stepping through code in a debugger without timing out. + */ + private static final boolean DEBUGGER_TIMEOUTS = false; + + private static final boolean USE_DOCKER_SERVICE = + Boolean.parseBoolean(System.getenv("USE_DOCKER_SERVICE")); + + private TestEnvironment() {} + + public static boolean isDebuggerTimeouts() { + return DEBUGGER_TIMEOUTS; + } + + public static boolean isUseDockerService() { + return USE_DOCKER_SERVICE; + } +} diff --git a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java index 936370cd1..a71e27d7d 100644 --- a/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java +++ b/src/test/java/com/uber/cadence/worker/CleanWorkerShutdownTest.java @@ -17,7 +17,7 @@ package com.uber.cadence.worker; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; diff --git a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java index afc54a7a6..eaa79218f 100644 --- a/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java +++ b/src/test/java/com/uber/cadence/worker/StickyWorkerTest.java @@ -17,7 +17,7 @@ package com.uber.cadence.worker; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; diff --git a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java index 30508fbda..0a0e801f2 100644 --- a/src/test/java/com/uber/cadence/worker/WorkerStressTests.java +++ b/src/test/java/com/uber/cadence/worker/WorkerStressTests.java @@ -17,7 +17,7 @@ package com.uber.cadence.worker; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertNotNull; import com.uber.cadence.activity.ActivityMethod; diff --git a/src/test/java/com/uber/cadence/workflow/CrossDomainWorkflowTest.java b/src/test/java/com/uber/cadence/workflow/CrossDomainWorkflowTest.java new file mode 100644 index 000000000..ed60d3891 --- /dev/null +++ b/src/test/java/com/uber/cadence/workflow/CrossDomainWorkflowTest.java @@ -0,0 +1,85 @@ +package com.uber.cadence.workflow; + +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN2; +import static org.junit.Assert.assertEquals; + +import com.uber.cadence.client.WorkflowClient; +import com.uber.cadence.client.WorkflowOptions; +import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal; +import com.uber.cadence.testUtils.CadenceTestRule; +import com.uber.cadence.testing.TestWorkflowEnvironment; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import org.junit.Rule; +import org.junit.Test; + +public class CrossDomainWorkflowTest { + + // When running against the test service we need both rules to share the same one rather than each + // creating their own. + private final TestWorkflowEnvironmentInternal.WorkflowServiceWrapper testWorkflowService = + new TestWorkflowEnvironmentInternal.WorkflowServiceWrapper(); + + @Rule + public CadenceTestRule firstDomain = + CadenceTestRule.builder() + .withWorkflowTypes(TestWorkflowCrossDomainImpl.class) + .startWorkersAutomatically() + .withTestEnvironmentProvider( + options -> TestWorkflowEnvironment.newInstance(testWorkflowService, options)) + .build(); + + @Rule + public CadenceTestRule secondDomain = + CadenceTestRule.builder() + .withDomain(DOMAIN2) + .withWorkflowTypes(WorkflowTest.TestWorkflowSignaledSimple.class) + .startWorkersAutomatically() + .withTestEnvironmentProvider( + options -> TestWorkflowEnvironment.newInstance(testWorkflowService, options)) + .build(); + + public interface TestWorkflowCrossDomain { + + @WorkflowMethod + String execute(String workflowId); + } + + public static class TestWorkflowCrossDomainImpl implements TestWorkflowCrossDomain { + + @Override + @WorkflowMethod + public String execute(String wfId) { + ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(wfId); + SignalOptions options = + SignalOptions.newBuilder().setDomain(DOMAIN2).setSignalName("testSignal").build(); + externalWorkflow.signal(options, "World"); + return "Signaled External workflow"; + } + } + + @Test + public void testSignalCrossDomainExternalWorkflow() + throws ExecutionException, InterruptedException { + + WorkflowOptions.Builder options = firstDomain.workflowOptionsBuilder(); + + String wfId = UUID.randomUUID().toString(); + WorkflowOptions.Builder options2 = secondDomain.workflowOptionsBuilder().setWorkflowId(wfId); + + TestWorkflowCrossDomain wf = + firstDomain + .getWorkflowClient() + .newWorkflowStub(TestWorkflowCrossDomain.class, options.build()); + + WorkflowTest.TestWorkflowSignaled simpleWorkflow = + secondDomain + .getWorkflowClient() + .newWorkflowStub(WorkflowTest.TestWorkflowSignaled.class, options2.build()); + + CompletableFuture result = WorkflowClient.execute(simpleWorkflow::execute); + assertEquals("Signaled External workflow", wf.execute(wfId)); + assertEquals("Simple workflow signaled", result.get()); + } +} diff --git a/src/test/java/com/uber/cadence/workflow/LoggerTest.java b/src/test/java/com/uber/cadence/workflow/LoggerTest.java index 2173042f5..5e6d939a7 100644 --- a/src/test/java/com/uber/cadence/workflow/LoggerTest.java +++ b/src/test/java/com/uber/cadence/workflow/LoggerTest.java @@ -27,6 +27,7 @@ import com.uber.cadence.client.WorkflowClientOptions; import com.uber.cadence.client.WorkflowOptions; import com.uber.cadence.internal.logging.LoggerTag; +import com.uber.cadence.testUtils.TestEnvironment; import com.uber.cadence.testing.TestEnvironmentOptions; import com.uber.cadence.testing.TestWorkflowEnvironment; import com.uber.cadence.worker.Worker; @@ -90,7 +91,7 @@ public void executeChild(String id) { @Test public void testWorkflowLogger() { WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder().setDomain(WorkflowTest.DOMAIN).build(); + WorkflowClientOptions.newBuilder().setDomain(TestEnvironment.DOMAIN).build(); TestEnvironmentOptions testOptions = new TestEnvironmentOptions.Builder() .setWorkflowClientOptions(clientOptions) diff --git a/src/test/java/com/uber/cadence/workflow/MetricsTest.java b/src/test/java/com/uber/cadence/workflow/MetricsTest.java index 21417847a..4b84a0919 100644 --- a/src/test/java/com/uber/cadence/workflow/MetricsTest.java +++ b/src/test/java/com/uber/cadence/workflow/MetricsTest.java @@ -17,6 +17,7 @@ package com.uber.cadence.workflow; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; @@ -197,10 +198,7 @@ public void setUp(com.uber.m3.util.Duration reportingFrequecy) { Scope scope = new RootScopeBuilder().reporter(reporter).reportEvery(reportingFrequecy); WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder() - .setDomain(WorkflowTest.DOMAIN) - .setMetricsScope(scope) - .build(); + WorkflowClientOptions.newBuilder().setDomain(DOMAIN).setMetricsScope(scope).build(); TestEnvironmentOptions testOptions = new TestEnvironmentOptions.Builder().setWorkflowClientOptions(clientOptions).build(); testEnvironment = TestWorkflowEnvironment.newInstance(testOptions); @@ -229,7 +227,7 @@ public void testWorkflowMetrics() throws InterruptedException { Map tags = new ImmutableMap.Builder(2) - .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) + .put(MetricsTag.DOMAIN, DOMAIN) .put(MetricsTag.TASK_LIST, taskList) .build(); @@ -252,7 +250,7 @@ public void testWorkflowMetrics() throws InterruptedException { Map activityCompletionTags = new ImmutableMap.Builder(3) - .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) + .put(MetricsTag.DOMAIN, DOMAIN) .put(MetricsTag.TASK_LIST, taskList) .put(MetricsTag.ACTIVITY_TYPE, "TestActivity::runActivity") .put(MetricsTag.WORKFLOW_TYPE, "TestWorkflow::execute") @@ -293,7 +291,7 @@ public void testCorruptedSignalMetrics() throws InterruptedException { Map tags = new ImmutableMap.Builder(2) - .put(MetricsTag.DOMAIN, WorkflowTest.DOMAIN) + .put(MetricsTag.DOMAIN, DOMAIN) .put(MetricsTag.TASK_LIST, taskList) .build(); verify(reporter, times(1)).reportCounter(MetricsType.CORRUPTED_SIGNALS_COUNTER, tags, 1); diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java index 4c15a7ed8..6bd3a633c 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowMigrationTest.java @@ -17,8 +17,8 @@ package com.uber.cadence.workflow; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN; -import static com.uber.cadence.workflow.WorkflowTest.DOMAIN2; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN2; import static junit.framework.TestCase.fail; import com.uber.cadence.*; diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowReplayTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowReplayTest.java new file mode 100644 index 000000000..3406c075b --- /dev/null +++ b/src/test/java/com/uber/cadence/workflow/WorkflowReplayTest.java @@ -0,0 +1,102 @@ +package com.uber.cadence.workflow; + +import com.uber.cadence.testing.WorkflowReplayer; +import org.junit.Ignore; +import org.junit.Test; + +public class WorkflowReplayTest { + + // Server doesn't guarantee that the timer fire timestamp is larger or equal of the + // expected fire time. This test ensures that client still fires timer in this case. + @Test + public void testTimerFiringTimestampEarlierThanExpected() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "timerfiring.json", WorkflowTest.TimerFiringWorkflowImpl.class); + } + + @Test + public void testWorkflowReset() throws Exception { + // Leave the following code to generate history. + // startWorkerFor(TestWorkflowResetReplayWorkflow.class, TestMultiargsWorkflowsImpl.class); + // TestWorkflow1 workflowStub = + // workflowClient.newWorkflowStub( + // TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); + // workflowStub.execute(taskList); + // + // try { + // Thread.sleep(60000000); + // } catch (InterruptedException e) { + // e.printStackTrace(); + // } + + WorkflowReplayer.replayWorkflowExecutionFromResource( + "resetWorkflowHistory.json", WorkflowTest.TestWorkflowResetReplayWorkflow.class); + } + + @Test + public void testGetVersionWithRetryReplay() throws Exception { + + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionWithRetryHistory.json", WorkflowTest.TestGetVersionWorkflowRetryImpl.class); + } + + @Test + public void testGetVersionRemoveAndAdd() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", WorkflowTest.TestGetVersionRemoveAndAddImpl.class); + } + + /** + * Tests that history that was created before server side retry was supported is backwards + * compatible with the client that supports the server side retry. + */ + @Test + public void testAsyncActivityRetryReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testAsyncActivityRetryHistory.json", WorkflowTest.TestAsyncActivityRetry.class); + } + + /** + * Tests that history created before marker header change is backwards compatible with old markers + * generated without headers. + */ + @Test + // This test previously had a check for the incorrect test name and never ran. The json doesn't + // parse. + // Keeping it around in case we decide to fix it. + @Ignore + public void testMutableSideEffectReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testMutableSideEffectBackwardCompatibility.json", + WorkflowTest.TestMutableSideEffectWorkflowImpl.class); + } + + @Test + public void testGetVersionRemoved() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", WorkflowTest.TestGetVersionRemovedImpl.class); + } + + @Test + public void testGetVersionAdded() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistory.json", WorkflowTest.TestGetVersionAddedImpl.class); + } + + @Test + public void testGetVersionAddedWithCadenceChangeVersion() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testGetVersionHistoryWithCadenceChangeVersion.json", + WorkflowTest.TestGetVersionAddedImpl.class); + } + + /** + * Tests that history that was created before server side retry was supported is backwards + * compatible with the client that supports the server side retry. + */ + @Test + public void testChildWorkflowRetryReplay() throws Exception { + WorkflowReplayer.replayWorkflowExecutionFromResource( + "testChildWorkflowRetryHistory.json", WorkflowTest.TestChildWorkflowRetryWorkflow.class); + } +} diff --git a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java index 84e3c10ca..36576bb89 100644 --- a/src/test/java/com/uber/cadence/workflow/WorkflowTest.java +++ b/src/test/java/com/uber/cadence/workflow/WorkflowTest.java @@ -17,6 +17,7 @@ package com.uber.cadence.workflow; +import static com.uber.cadence.testUtils.TestEnvironment.DOMAIN; import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -36,7 +37,6 @@ import com.uber.cadence.DomainAlreadyExistsError; import com.uber.cadence.DomainNotActiveError; import com.uber.cadence.EntityNotExistsError; -import com.uber.cadence.FeatureFlags; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; import com.uber.cadence.HistoryEvent; import com.uber.cadence.Memo; @@ -63,14 +63,10 @@ import com.uber.cadence.converter.JsonDataConverter; import com.uber.cadence.internal.common.WorkflowExecutionUtils; import com.uber.cadence.internal.sync.DeterministicRunnerTest; -import com.uber.cadence.internal.sync.TestWorkflowEnvironmentInternal; -import com.uber.cadence.internal.worker.PollerOptions; -import com.uber.cadence.serviceclient.ClientOptions; -import com.uber.cadence.serviceclient.IWorkflowService; -import com.uber.cadence.serviceclient.WorkflowServiceTChannel; -import com.uber.cadence.testing.TestEnvironmentOptions; -import com.uber.cadence.testing.TestWorkflowEnvironment; -import com.uber.cadence.testing.WorkflowReplayer; +import com.uber.cadence.testUtils.CadenceTestRule; +import com.uber.cadence.testUtils.RequiresDockerService; +import com.uber.cadence.testUtils.RequiresTestService; +import com.uber.cadence.testUtils.TestEnvironment; import com.uber.cadence.worker.*; import com.uber.cadence.workflow.interceptors.TracingWorkflowInterceptorFactory; import java.io.File; @@ -103,16 +99,12 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -120,12 +112,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; -import org.junit.rules.TestWatcher; -import org.junit.rules.Timeout; -import org.junit.runner.Description; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,93 +121,46 @@ @RunWith(Parameterized.class) public class WorkflowTest { - /** - * When set to true increases test, activity and workflow timeouts to large values to support - * stepping through code in a debugger without timing out. - */ - private static final boolean DEBUGGER_TIMEOUTS = false; + private static final Logger log = LoggerFactory.getLogger(WorkflowTest.class); private static final String ANNOTATION_TASK_LIST = "WorkflowTest-testExecute[Docker]"; - - private TracingWorkflowInterceptorFactory tracer; - private static final boolean useDockerService = - Boolean.parseBoolean(System.getenv("USE_DOCKER_SERVICE")); - - private TestWorkflowEnvironmentInternal.WorkflowServiceWrapper wfService; - + private static final String UUID_REGEXP = + "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; private static final boolean stickyOff = Boolean.parseBoolean(System.getenv("STICKY_OFF")); - @Parameters(name = "{1}") + @Rule public final CadenceTestRule cadenceTestRule; + @Rule public final TestName testName = new TestName(); + + @Parameters(name = "{0}") public static Object[] data() { - if (!useDockerService) { + if (TestEnvironment.isUseDockerService()) { return new Object[][] { - {false, "TestService Sticky OFF", true}, {false, "TestService Sticky ON", false} + {"Docker Sticky " + (stickyOff ? "OFF" : "ON"), stickyOff}, }; } else { - return new Object[][] { - {true, "Docker Sticky " + (stickyOff ? "OFF" : "ON"), stickyOff}, - }; + return new Object[][] {{"TestService Sticky OFF", true}, {"TestService Sticky ON", false}}; } } - @Rule public TestName testName = new TestName(); - - @Rule - public Timeout globalTimeout = - Timeout.seconds(DEBUGGER_TIMEOUTS ? 500 : !useDockerService ? 15 : 30); - - @Rule - public TestWatcher watchman = - new TestWatcher() { - @Override - protected void failed(Throwable e, Description description) { - if (tracer != null) { - System.err.println("TRACE:\n" + tracer.getTrace()); - } - if (testEnvironment != null) { - System.err.println("HISTORIES:\n" + testEnvironment.getDiagnostics()); - } - } - }; - - @Parameter public boolean useExternalService; - - @Parameter(1) - public String testType; - - @Parameter(2) - public boolean disableStickyExecution; - - public static final String DOMAIN = "UnitTest"; - public static final String DOMAIN2 = "UnitTest2"; - private static final Logger log = LoggerFactory.getLogger(WorkflowTest.class); - - private static String UUID_REGEXP = - "[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}"; - + private final boolean disableStickyExecution; private String taskList; - - private WorkerFactory workerFactory; - private Worker worker; private TestActivitiesImpl activitiesImpl; private WorkflowClient workflowClient; - private TestWorkflowEnvironment testEnvironment; - private ScheduledExecutorService scheduledExecutor; - private List> delayedCallbacks = new ArrayList<>(); - private static final IWorkflowService service = - new WorkflowServiceTChannel( - ClientOptions.newBuilder() - .setFeatureFlags( - new FeatureFlags().setWorkflowExecutionAlreadyCompletedErrorEnabled(true)) - .build()); + private TracingWorkflowInterceptorFactory tracer; - @AfterClass - public static void closeService() { - service.close(); + public WorkflowTest(String ignored, boolean disableStickyExecution) { + this.disableStickyExecution = disableStickyExecution; + this.cadenceTestRule = + CadenceTestRule.builder() + .withWorkerFactoryOptions( + WorkerFactoryOptions.newBuilder() + .setDisableStickyExecution(disableStickyExecution) + .build()) + .build(); } private static WorkflowOptions.Builder newWorkflowOptionsBuilder(String taskList) { - if (DEBUGGER_TIMEOUTS) { + if (TestEnvironment.isDebuggerTimeouts()) { return new WorkflowOptions.Builder() .setExecutionStartToCloseTimeout(Duration.ofSeconds(1000)) .setTaskStartToCloseTimeout(Duration.ofSeconds(60)) @@ -233,7 +174,7 @@ private static WorkflowOptions.Builder newWorkflowOptionsBuilder(String taskList } private static ActivityOptions newActivityOptions1(String taskList) { - if (DEBUGGER_TIMEOUTS) { + if (TestEnvironment.isDebuggerTimeouts()) { return new ActivityOptions.Builder() .setTaskList(taskList) .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) @@ -253,7 +194,7 @@ private static ActivityOptions newActivityOptions1(String taskList) { } private static LocalActivityOptions newLocalActivityOptions1() { - if (DEBUGGER_TIMEOUTS) { + if (TestEnvironment.isDebuggerTimeouts()) { return new LocalActivityOptions.Builder() .setScheduleToCloseTimeout(Duration.ofSeconds(1000)) .build(); @@ -270,54 +211,19 @@ private static ActivityOptions newActivityOptions2() { @Before public void setUp() { - this.wfService = new TestWorkflowEnvironmentInternal.WorkflowServiceWrapper(); String testMethod = testName.getMethodName(); if (testMethod.startsWith("testExecute") || testMethod.startsWith("testStart")) { taskList = ANNOTATION_TASK_LIST; } else { - taskList = "WorkflowTest-" + testMethod + "-" + UUID.randomUUID().toString(); - } - tracer = new TracingWorkflowInterceptorFactory(); - // TODO: Create a version of TestWorkflowEnvironment that runs against a real service. - WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder().setDomain(DOMAIN).build(); - if (useExternalService) { - workflowClient = WorkflowClient.newInstance(service, clientOptions); - WorkerFactoryOptions factoryOptions = - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build(); - workerFactory = new WorkerFactory(workflowClient, factoryOptions); - WorkerOptions workerOptions = - WorkerOptions.newBuilder() - .setActivityPollerOptions(PollerOptions.newBuilder().setPollThreadCount(5).build()) - .setMaxConcurrentActivityExecutionSize(1000) - .setInterceptorFactory(tracer) - .build(); - worker = workerFactory.newWorker(taskList, workerOptions); - scheduledExecutor = new ScheduledThreadPoolExecutor(1); - } else { - TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder() - .setWorkflowClientOptions(clientOptions) - .setInterceptorFactory(tracer) - .setWorkerFactoryOptions( - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build()) - .build(); - testEnvironment = TestWorkflowEnvironment.newInstance(wfService, testOptions); - worker = testEnvironment.newWorker(taskList); - workflowClient = testEnvironment.newWorkflowClient(); + taskList = cadenceTestRule.getDefaultTaskList(); } + workflowClient = cadenceTestRule.getWorkflowClient(); + tracer = cadenceTestRule.getTracer(); ActivityCompletionClient completionClient = workflowClient.newActivityCompletionClient(); activitiesImpl = new TestActivitiesImpl(completionClient); - worker.registerActivitiesImplementations(activitiesImpl); - - newWorkflowOptionsBuilder(taskList); + cadenceTestRule.getWorker(taskList).registerActivitiesImplementations(activitiesImpl); - newActivityOptions1(taskList); activitiesImpl.invocations.clear(); activitiesImpl.procResult.clear(); } @@ -327,72 +233,33 @@ public void tearDown() throws Throwable { if (activitiesImpl != null) { activitiesImpl.close(); } - if (testEnvironment != null) { - testEnvironment.close(); - } - for (ScheduledFuture result : delayedCallbacks) { - if (result.isDone() && !result.isCancelled()) { - try { - result.get(); - } catch (InterruptedException e) { - } catch (ExecutionException e) { - throw e.getCause(); - } - } - } - if (tracer != null) { - tracer.assertExpected(); - } + cadenceTestRule.stop(); } private void startWorkerFor(Class... workflowTypes) { + Worker worker = cadenceTestRule.getWorker(taskList); worker.registerWorkflowImplementationTypes(workflowTypes); - if (useExternalService) { - workerFactory.start(); - } else { - testEnvironment.start(); - } + cadenceTestRule.start(); } private void startWorkerFor(WorkflowImplementationOptions options, Class... workflowTypes) { + Worker worker = cadenceTestRule.getWorker(taskList); worker.registerWorkflowImplementationTypes(options, workflowTypes); - if (useExternalService) { - workerFactory.start(); - } else { - testEnvironment.start(); - } + cadenceTestRule.start(); } // TODO: Refactor testEnvironment to support testing through real service to avoid this // conditional switches void registerDelayedCallback(Duration delay, Runnable r) { - if (useExternalService) { - ScheduledFuture result = - scheduledExecutor.schedule(r, delay.toMillis(), TimeUnit.MILLISECONDS); - delayedCallbacks.add(result); - } else { - testEnvironment.registerDelayedCallback(delay, r); - } + cadenceTestRule.registerDelayedCallback(delay, r); } void sleep(Duration d) { - if (useExternalService) { - try { - Thread.sleep(d.toMillis()); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted", e); - } - } else { - testEnvironment.sleep(d); - } + cadenceTestRule.sleep(d); } long currentTimeMillis() { - if (useExternalService) { - return System.currentTimeMillis(); - } else { - return testEnvironment.currentTimeMillis(); - } + return cadenceTestRule.currentTimeMillis(); } public interface TestWorkflow1 { @@ -401,12 +268,6 @@ public interface TestWorkflow1 { String execute(String taskList); } - public interface TestWorkflowCrossDomain { - - @WorkflowMethod - String execute(String workflowId); - } - public interface TestWorkflowSignaled { @WorkflowMethod @@ -884,34 +745,6 @@ public void testAsyncActivityRetry() { assertEquals(activitiesImpl.toString(), 3, activitiesImpl.invocations.size()); } - /** - * Tests that history that was created before server side retry was supported is backwards - * compatible with the client that supports the server side retry. - */ - @Test - public void testAsyncActivityRetryReplay() throws Exception { - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testAsyncActivityRetryHistory.json", TestAsyncActivityRetry.class); - } - - /** - * Tests that history created before marker header change is backwards compatible with old markers - * generated without headers. - */ - @Test - public void testMutableSideEffectReplay() throws Exception { - // Avoid executing 4 times - if (!testName.getMethodName().equals("testAsyncActivityRetryReplay[Docker Sticky OFF]")) { - return; - } - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testMutableSideEffectBackwardCompatibility.json", TestMutableSideEffectWorkflowImpl.class); - } - public static class TestAsyncActivityRetryOptionsChange implements TestWorkflow1 { private TestActivities activities; @@ -1260,11 +1093,7 @@ public int execute(int count, String continueAsNewTaskList) { public void testContinueAsNew() { Worker w2; String continuedTaskList = this.taskList + "_continued"; - if (useExternalService) { - w2 = workerFactory.newWorker(continuedTaskList); - } else { - w2 = testEnvironment.newWorker(continuedTaskList); - } + w2 = cadenceTestRule.getWorker(continuedTaskList); w2.registerWorkflowImplementationTypes(TestContinueAsNewImpl.class); startWorkerFor(TestContinueAsNewImpl.class); @@ -1492,7 +1321,7 @@ public void testStart() { TestMultiargsWorkflowsFunc1 stubF1 = workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc1.class); - if (!useExternalService) { + if (!TestEnvironment.isUseDockerService()) { // Use worker that polls on a task list configured through @WorkflowMethod annotation of func1 assertResult(1, WorkflowClient.start(stubF1::func1, 1)); assertEquals(1, stubF1.func1(1)); // Check that duplicated start just returns the result. @@ -1566,97 +1395,92 @@ public void testStart() { } @Test + @RequiresTestService public void testMemo() { - if (testEnvironment != null) { - String testMemoKey = "testKey"; - String testMemoValue = "testValue"; - Map memo = new HashMap<>(); - memo.put(testMemoKey, testMemoValue); - - startWorkerFor(TestMultiargsWorkflowsImpl.class); - WorkflowOptions workflowOptions = newWorkflowOptionsBuilder(taskList).setMemo(memo).build(); - TestMultiargsWorkflowsFunc stubF = - workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); - WorkflowExecution executionF = WorkflowClient.start(stubF::func); + String testMemoKey = "testKey"; + String testMemoValue = "testValue"; + Map memo = new HashMap<>(); + memo.put(testMemoKey, testMemoValue); - GetWorkflowExecutionHistoryResponse historyResp = - WorkflowExecutionUtils.getHistoryPage( - new byte[] {}, testEnvironment.getWorkflowService(), DOMAIN, executionF); - HistoryEvent startEvent = historyResp.history.getEvents().get(0); - Memo memoFromEvent = startEvent.workflowExecutionStartedEventAttributes.getMemo(); - byte[] memoBytes = memoFromEvent.getFields().get(testMemoKey).array(); - String memoRetrieved = - JsonDataConverter.getInstance().fromData(memoBytes, String.class, String.class); - assertEquals(testMemoValue, memoRetrieved); - } + startWorkerFor(TestMultiargsWorkflowsImpl.class); + WorkflowOptions workflowOptions = newWorkflowOptionsBuilder(taskList).setMemo(memo).build(); + TestMultiargsWorkflowsFunc stubF = + workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); + WorkflowExecution executionF = WorkflowClient.start(stubF::func); + + GetWorkflowExecutionHistoryResponse historyResp = + WorkflowExecutionUtils.getHistoryPage( + new byte[] {}, workflowClient.getService(), DOMAIN, executionF); + HistoryEvent startEvent = historyResp.history.getEvents().get(0); + Memo memoFromEvent = startEvent.workflowExecutionStartedEventAttributes.getMemo(); + byte[] memoBytes = memoFromEvent.getFields().get(testMemoKey).array(); + String memoRetrieved = + JsonDataConverter.getInstance().fromData(memoBytes, String.class, String.class); + assertEquals(testMemoValue, memoRetrieved); } @Test + @RequiresTestService public void testSearchAttributes() { - if (testEnvironment != null) { - String testKeyString = "CustomKeywordField"; - String testValueString = "testKeyword"; - String testKeyInteger = "CustomIntField"; - Integer testValueInteger = 1; - String testKeyDateTime = "CustomDateTimeField"; - LocalDateTime testValueDateTime = LocalDateTime.now(); - String testKeyBool = "CustomBoolField"; - Boolean testValueBool = true; - String testKeyDouble = "CustomDoubleField"; - Double testValueDouble = 1.23; - - // add more type to test - Map searchAttr = new HashMap<>(); - searchAttr.put(testKeyString, testValueString); - searchAttr.put(testKeyInteger, testValueInteger); - searchAttr.put(testKeyDateTime, testValueDateTime); - searchAttr.put(testKeyBool, testValueBool); - searchAttr.put(testKeyDouble, testValueDouble); - - startWorkerFor(TestMultiargsWorkflowsImpl.class); - WorkflowOptions workflowOptions = - newWorkflowOptionsBuilder(taskList).setSearchAttributes(searchAttr).build(); - TestMultiargsWorkflowsFunc stubF = - workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); - WorkflowExecution executionF = WorkflowClient.start(stubF::func); - - GetWorkflowExecutionHistoryResponse historyResp = - WorkflowExecutionUtils.getHistoryPage( - new byte[] {}, testEnvironment.getWorkflowService(), DOMAIN, executionF); - HistoryEvent startEvent = historyResp.history.getEvents().get(0); - SearchAttributes searchAttrFromEvent = - startEvent.workflowExecutionStartedEventAttributes.getSearchAttributes(); - - byte[] searchAttrStringBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyString).array(); - String retrievedString = - JsonDataConverter.getInstance() - .fromData(searchAttrStringBytes, String.class, String.class); - assertEquals(testValueString, retrievedString); - byte[] searchAttrIntegerBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyInteger).array(); - Integer retrievedInteger = - JsonDataConverter.getInstance() - .fromData(searchAttrIntegerBytes, Integer.class, Integer.class); - assertEquals(testValueInteger, retrievedInteger); - byte[] searchAttrDateTimeBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyDateTime).array(); - LocalDateTime retrievedDateTime = - JsonDataConverter.getInstance() - .fromData(searchAttrDateTimeBytes, LocalDateTime.class, LocalDateTime.class); - assertEquals(testValueDateTime, retrievedDateTime); - byte[] searchAttrBoolBytes = searchAttrFromEvent.getIndexedFields().get(testKeyBool).array(); - Boolean retrievedBool = - JsonDataConverter.getInstance() - .fromData(searchAttrBoolBytes, Boolean.class, Boolean.class); - assertEquals(testValueBool, retrievedBool); - byte[] searchAttrDoubleBytes = - searchAttrFromEvent.getIndexedFields().get(testKeyDouble).array(); - Double retrievedDouble = - JsonDataConverter.getInstance() - .fromData(searchAttrDoubleBytes, Double.class, Double.class); - assertEquals(testValueDouble, retrievedDouble); - } + String testKeyString = "CustomKeywordField"; + String testValueString = "testKeyword"; + String testKeyInteger = "CustomIntField"; + Integer testValueInteger = 1; + String testKeyDateTime = "CustomDateTimeField"; + LocalDateTime testValueDateTime = LocalDateTime.now(); + String testKeyBool = "CustomBoolField"; + Boolean testValueBool = true; + String testKeyDouble = "CustomDoubleField"; + Double testValueDouble = 1.23; + + // add more type to test + Map searchAttr = new HashMap<>(); + searchAttr.put(testKeyString, testValueString); + searchAttr.put(testKeyInteger, testValueInteger); + searchAttr.put(testKeyDateTime, testValueDateTime); + searchAttr.put(testKeyBool, testValueBool); + searchAttr.put(testKeyDouble, testValueDouble); + + startWorkerFor(TestMultiargsWorkflowsImpl.class); + WorkflowOptions workflowOptions = + newWorkflowOptionsBuilder(taskList).setSearchAttributes(searchAttr).build(); + TestMultiargsWorkflowsFunc stubF = + workflowClient.newWorkflowStub(TestMultiargsWorkflowsFunc.class, workflowOptions); + WorkflowExecution executionF = WorkflowClient.start(stubF::func); + + GetWorkflowExecutionHistoryResponse historyResp = + WorkflowExecutionUtils.getHistoryPage( + new byte[] {}, workflowClient.getService(), DOMAIN, executionF); + HistoryEvent startEvent = historyResp.history.getEvents().get(0); + SearchAttributes searchAttrFromEvent = + startEvent.workflowExecutionStartedEventAttributes.getSearchAttributes(); + + byte[] searchAttrStringBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyString).array(); + String retrievedString = + JsonDataConverter.getInstance().fromData(searchAttrStringBytes, String.class, String.class); + assertEquals(testValueString, retrievedString); + byte[] searchAttrIntegerBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyInteger).array(); + Integer retrievedInteger = + JsonDataConverter.getInstance() + .fromData(searchAttrIntegerBytes, Integer.class, Integer.class); + assertEquals(testValueInteger, retrievedInteger); + byte[] searchAttrDateTimeBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyDateTime).array(); + LocalDateTime retrievedDateTime = + JsonDataConverter.getInstance() + .fromData(searchAttrDateTimeBytes, LocalDateTime.class, LocalDateTime.class); + assertEquals(testValueDateTime, retrievedDateTime); + byte[] searchAttrBoolBytes = searchAttrFromEvent.getIndexedFields().get(testKeyBool).array(); + Boolean retrievedBool = + JsonDataConverter.getInstance().fromData(searchAttrBoolBytes, Boolean.class, Boolean.class); + assertEquals(testValueBool, retrievedBool); + byte[] searchAttrDoubleBytes = + searchAttrFromEvent.getIndexedFields().get(testKeyDouble).array(); + Double retrievedDouble = + JsonDataConverter.getInstance().fromData(searchAttrDoubleBytes, Double.class, Double.class); + assertEquals(testValueDouble, retrievedDouble); } @Test @@ -2094,10 +1918,7 @@ public String execute(String taskList) { public void testUntypedChildStubWorkflowAsyncInvoke() { startWorkerFor(TestUntypedChildStubWorkflowAsyncInvoke.class, TestMultiargsWorkflowsImpl.class); - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(200)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(60)); - options.setTaskList(taskList); + WorkflowOptions.Builder options = newWorkflowOptionsBuilder(taskList); TestWorkflow1 client = workflowClient.newWorkflowStub(TestWorkflow1.class, options.build()); assertEquals(null, client.execute(taskList)); } @@ -2149,7 +1970,7 @@ public List getTrace() { public void testTimer() { startWorkerFor(TestTimerWorkflowImpl.class); WorkflowOptions options; - if (useExternalService) { + if (cadenceTestRule.isDockerService()) { options = newWorkflowOptionsBuilder(taskList).build(); } else { options = @@ -2158,9 +1979,9 @@ public void testTimer() { .build(); } TestWorkflow2 client = workflowClient.newWorkflowStub(TestWorkflow2.class, options); - String result = client.execute(useExternalService); + String result = client.execute(cadenceTestRule.isDockerService()); assertEquals("testTimer", result); - if (useExternalService) { + if (cadenceTestRule.isDockerService()) { tracer.setExpected( "executeWorkflow: testActivity", "registerQuery getTrace", @@ -2221,7 +2042,7 @@ public void testAsyncRetry() { TestWorkflow2.class, newWorkflowOptionsBuilder(taskList).build()); String result = null; try { - result = client.execute(useExternalService); + result = client.execute(cadenceTestRule.isDockerService()); fail("unreachable"); } catch (WorkflowException e) { assertTrue(e.getCause() instanceof IllegalThreadStateException); @@ -2288,7 +2109,7 @@ public void testAsyncRetryOptionsChange() { TestWorkflow2.class, newWorkflowOptionsBuilder(taskList).build()); String result = null; try { - result = client.execute(useExternalService); + result = client.execute(cadenceTestRule.isDockerService()); fail("unreachable"); } catch (WorkflowException e) { assertTrue(e.getCause() instanceof IllegalThreadStateException); @@ -2464,15 +2285,6 @@ public void mySignal(String value) { @Test public void testSignal() { - // Test getTrace through replay by a local worker. - Worker queryWorker; - if (useExternalService) { - WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient); - queryWorker = workerFactory.newWorker(taskList); - } else { - queryWorker = testEnvironment.newWorker(taskList); - } - queryWorker.registerWorkflowImplementationTypes(TestSignalWorkflowImpl.class); startWorkerFor(TestSignalWorkflowImpl.class); WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); String workflowId = UUID.randomUUID().toString(); @@ -2529,14 +2341,6 @@ public void mySignal(String value) { @Test public void testSignalingCompletedWorkflow() { - Worker queryWorker; - if (useExternalService) { - WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient); - queryWorker = workerFactory.newWorker(taskList); - } else { - queryWorker = testEnvironment.newWorker(taskList); - } - queryWorker.registerWorkflowImplementationTypes(TestSimpleWorkflowImpl.class); startWorkerFor(TestSimpleWorkflowImpl.class); String workflowId = UUID.randomUUID().toString(); @@ -2603,15 +2407,6 @@ public void mySignal(String value) { @Test public void testSignalWithStart() { - // Test getTrace through replay by a local worker. - Worker queryWorker; - if (useExternalService) { - WorkerFactory workerFactory = WorkerFactory.newInstance(workflowClient); - queryWorker = workerFactory.newWorker(taskList); - } else { - queryWorker = testEnvironment.newWorker(taskList); - } - queryWorker.registerWorkflowImplementationTypes(TestSignalWithStartWorkflowImpl.class); startWorkerFor(TestSignalWorkflowImpl.class); WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); String workflowId = UUID.randomUUID().toString(); @@ -2730,7 +2525,7 @@ public void testNoQueryThreadLeak() throws InterruptedException { int queryCount = 100; for (int i = 0; i < queryCount; i++) { assertEquals("some state", client.getState()); - if (useDockerService) { + if (cadenceTestRule.isDockerService()) { // Sleep a little bit to avoid server throttling error. Thread.sleep(50); } @@ -2759,19 +2554,10 @@ public void testQueryRejectionConditionDefault() { @Test public void testQueryRejectionConditionNotOpen() { startWorkerFor(TestNoQueryWorkflowImpl.class); - WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder(workflowClient.getOptions()) - .setQueryRejectCondition(QueryRejectCondition.NOT_OPEN) - .build(); - WorkflowClient wc; - if (useExternalService) { - wc = WorkflowClient.newInstance(service, clientOptions); - } else { - wc = testEnvironment.newWorkflowClient(clientOptions); - } WorkflowOptions.Builder optionsBuilder = newWorkflowOptionsBuilder(taskList); - QueryableWorkflow client = wc.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build()); + QueryableWorkflow client = + workflowClient.newWorkflowStub(QueryableWorkflow.class, optionsBuilder.build()); WorkflowClient.start(client::execute); sleep(Duration.ofSeconds(1)); assertEquals("some state", client.getState()); @@ -2801,10 +2587,14 @@ public void testSignalUntyped() { assertEquals("initial", client.query("QueryableWorkflow::getState", String.class)); client.signal("testSignal", "Hello "); sleep(Duration.ofMillis(500)); - while (!"Hello ".equals(client.query("QueryableWorkflow::getState", String.class))) {} + while (!"Hello ".equals(client.query("QueryableWorkflow::getState", String.class))) { + sleep(Duration.ofMillis(10)); + } assertEquals("Hello ", client.query("QueryableWorkflow::getState", String.class)); client.signal("testSignal", "World!"); - while (!"World!".equals(client.query("QueryableWorkflow::getState", String.class))) {} + while (!"World!".equals(client.query("QueryableWorkflow::getState", String.class))) { + sleep(Duration.ofMillis(10)); + } assertEquals("World!", client.query("QueryableWorkflow::getState", String.class)); assertEquals( "Hello World!", @@ -3195,7 +2985,7 @@ public String execute(String taskList, int delay) { @Test public void testChildWorkflowRetry() { AngryChildActivityImpl angryChildActivity = new AngryChildActivityImpl(); - worker.registerActivitiesImplementations(angryChildActivity); + cadenceTestRule.getWorker().registerActivitiesImplementations(angryChildActivity); startWorkerFor(TestChildWorkflowRetryWorkflow.class, AngryChild.class); WorkflowOptions.Builder options = new WorkflowOptions.Builder(); @@ -3216,12 +3006,7 @@ public WorkflowStub newUntypedWorkflowStub( }) .setDomain(DOMAIN) .build(); - WorkflowClient wc; - if (useExternalService) { - wc = WorkflowClient.newInstance(service, clientOptions); - } else { - wc = testEnvironment.newWorkflowClient(clientOptions); - } + WorkflowClient wc = cadenceTestRule.createWorkflowClient(clientOptions); TestWorkflow1 client = wc.newWorkflowStub(TestWorkflow1.class, options.build()); try { @@ -3236,19 +3021,6 @@ public WorkflowStub newUntypedWorkflowStub( assertEquals(3, angryChildActivity.getInvocationCount()); } - /** - * Tests that history that was created before server side retry was supported is backwards - * compatible with the client that supports the server side retry. - */ - @Test - public void testChildWorkflowRetryReplay() throws Exception { - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testChildWorkflowRetryHistory.json", TestChildWorkflowRetryWorkflow.class); - } - public static class TestChildWorkflowExecutionPromiseHandler implements TestWorkflow1 { private ITestNamedChild child; @@ -3275,18 +3047,10 @@ public String execute(String taskList) { public void testChildWorkflowExecutionPromiseHandler() { startWorkerFor(TestChildWorkflowExecutionPromiseHandler.class, TestNamedChild.class); - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(20)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(2)); - options.setTaskList(taskList); - WorkflowClient wc; - if (useExternalService) { - wc = workflowClient; - } else { - wc = testEnvironment.newWorkflowClient(); - } + WorkflowOptions.Builder options = newWorkflowOptionsBuilder(taskList); - TestWorkflow1 client = wc.newWorkflowStub(TestWorkflow1.class, options.build()); + TestWorkflow1 client = + cadenceTestRule.getWorkflowClient().newWorkflowStub(TestWorkflow1.class, options.build()); String result = client.execute(taskList); assertEquals("FOO", result); } @@ -3331,10 +3095,8 @@ public String execute(String greeting, String parentWorkflowID) { @Test public void testSignalExternalWorkflow() { startWorkerFor(TestSignalExternalWorkflow.class, SignalingChildImpl.class); - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(2000)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(60)); - options.setTaskList(taskList); + WorkflowOptions.Builder options = newWorkflowOptionsBuilder(taskList); + TestWorkflowSignaled client = workflowClient.newWorkflowStub(TestWorkflowSignaled.class, options.build()); assertEquals("Hello World!", client.execute()); @@ -3365,19 +3127,6 @@ public void signal1(String arg) { } } - public static class TestWorkflowCrossDomainImpl implements TestWorkflowCrossDomain { - - @Override - @WorkflowMethod - public String execute(String wfId) { - ExternalWorkflowStub externalWorkflow = Workflow.newUntypedExternalWorkflowStub(wfId); - SignalOptions options = - SignalOptions.newBuilder().setDomain(DOMAIN2).setSignalName("testSignal").build(); - externalWorkflow.signal(options, "World"); - return "Signaled External workflow"; - } - } - public static class UntypedSignalingChildImpl implements SignalingChild { @Override @@ -3400,67 +3149,6 @@ public void testUntypedSignalExternalWorkflow() { assertEquals("Hello World!", client.execute()); } - @Test - public void testSignalCrossDomainExternalWorkflow() - throws ExecutionException, InterruptedException { - WorkflowClientOptions clientOptions = - WorkflowClientOptions.newBuilder().setDomain(DOMAIN2).build(); - - TestEnvironmentOptions testOptions = - new TestEnvironmentOptions.Builder() - .setWorkflowClientOptions(clientOptions) - .setInterceptorFactory(tracer) - .setWorkerFactoryOptions( - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build()) - .build(); - - startWorkerFor(TestWorkflowCrossDomainImpl.class); - - WorkflowClient workflowClient2; - if (useExternalService) { - workflowClient2 = WorkflowClient.newInstance(service, clientOptions); - WorkerFactoryOptions factoryOptions = - WorkerFactoryOptions.newBuilder() - .setDisableStickyExecution(disableStickyExecution) - .build(); - WorkerFactory workerFactory2 = new WorkerFactory(workflowClient2, factoryOptions); - Worker worker2 = workerFactory2.newWorker(taskList + "2"); - worker2.registerWorkflowImplementationTypes(TestWorkflowSignaledSimple.class); - workerFactory2.start(); - } else { - TestWorkflowEnvironment testEnvironment2 = - TestWorkflowEnvironment.newInstance(wfService, testOptions); - Worker worker2 = testEnvironment2.newWorker(taskList + "2"); - workflowClient2 = testEnvironment2.newWorkflowClient(); - worker2.registerWorkflowImplementationTypes(TestWorkflowSignaledSimple.class); - testEnvironment2.start(); - } - - WorkflowOptions.Builder options = new WorkflowOptions.Builder(); - options.setExecutionStartToCloseTimeout(Duration.ofSeconds(30)); - options.setTaskStartToCloseTimeout(Duration.ofSeconds(30)); - options.setTaskList(taskList); - - WorkflowOptions.Builder options2 = new WorkflowOptions.Builder(); - String wfId = UUID.randomUUID().toString(); - options2.setWorkflowId(wfId); - options2.setExecutionStartToCloseTimeout(Duration.ofSeconds(30)); - options2.setTaskStartToCloseTimeout(Duration.ofSeconds(30)); - options2.setTaskList(taskList + "2"); - - TestWorkflowCrossDomain wf = - workflowClient.newWorkflowStub(TestWorkflowCrossDomain.class, options.build()); - - TestWorkflowSignaled simpleWorkflow = - workflowClient2.newWorkflowStub(TestWorkflowSignaled.class, options2.build()); - - CompletableFuture result = WorkflowClient.execute(simpleWorkflow::execute); - assertEquals("Signaled External workflow", wf.execute(wfId)); - assertEquals("Simple workflow signaled", result.get()); - } - public static class TestSignalExternalWorkflowFailure implements TestWorkflow1 { @Override @@ -3636,7 +3324,7 @@ public String execute(String taskList) { @Test public void testChildWorkflowAsyncRetry() { AngryChildActivityImpl angryChildActivity = new AngryChildActivityImpl(); - worker.registerActivitiesImplementations(angryChildActivity); + cadenceTestRule.getWorker().registerActivitiesImplementations(angryChildActivity); startWorkerFor(TestChildWorkflowAsyncRetryWorkflow.class, AngryChild.class); WorkflowOptions.Builder options = new WorkflowOptions.Builder(); @@ -3899,11 +3587,9 @@ public String execute(String testName) { } @Test + // Min interval in cron is 1min. So we will not test it against real service in CI. + @RequiresTestService public void testWorkflowWithCronSchedule() { - // Min interval in cron is 1min. So we will not test it against real service in Jenkins. - // Feel free to uncomment the line below and test in local. - Assume.assumeFalse("skipping as test will timeout", useExternalService); - startWorkerFor(TestWorkflowWithCronScheduleImpl.class); WorkflowStub client = @@ -3938,11 +3624,9 @@ public String execute(String taskList) { } @Test + // Min interval in cron is 1min. So we will not test it against real service in CI. + @RequiresTestService public void testChildWorkflowWithCronSchedule() { - // Min interval in cron is 1min. So we will not test it against real service in Jenkins. - // Feel free to uncomment the line below and test in local. - Assume.assumeFalse("skipping as test will timeout", useExternalService); - startWorkerFor(TestCronParentWorkflow.class, TestWorkflowWithCronScheduleImpl.class); WorkflowStub client = @@ -3952,7 +3636,7 @@ public void testChildWorkflowWithCronSchedule() { .setExecutionStartToCloseTimeout(Duration.ofHours(10)) .build()); client.start(testName.getMethodName()); - testEnvironment.sleep(Duration.ofHours(3)); + cadenceTestRule.sleep(Duration.ofHours(3)); client.cancel(); try { @@ -4627,9 +4311,8 @@ public void testGetVersion() { } @Test + @RequiresDockerService public void testDelayStart() { - assumeTrue("skipping for non docker tests", useExternalService); - int delaySeconds = 5; startWorkerFor(TestGetVersionWorkflowImpl.class); WorkflowOptions options = @@ -4670,9 +4353,8 @@ public String execute(String taskList) { } @Test + @RequiresTestService public void testGetVersion2() { - Assume.assumeFalse("skipping for docker tests", useExternalService); - startWorkerFor(TestGetVersionWorkflow2Impl.class); TestWorkflow1 workflowStub = workflowClient.newWorkflowStub( @@ -4929,28 +4611,6 @@ public String execute(String taskList) { } } - @Test - public void testGetVersionAdded() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistory.json", TestGetVersionAddedImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - - @Test - public void testGetVersionAddedWithCadenceChangeVersion() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistoryWithCadenceChangeVersion.json", TestGetVersionAddedImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - public static class TestGetVersionRemovedImpl implements TestWorkflow1 { @Override @@ -4962,17 +4622,6 @@ public String execute(String taskList) { } } - @Test - public void testGetVersionRemoved() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistory.json", TestGetVersionRemovedImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - public static class TestGetVersionRemoveAndAddImpl implements TestWorkflow1 { @Override @@ -4985,17 +4634,6 @@ public String execute(String taskList) { } } - @Test - public void testGetVersionRemoveAndAdd() { - try { - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionHistory.json", TestGetVersionRemoveAndAddImpl.class); - } catch (Exception e) { - e.printStackTrace(); - fail(); - } - } - public interface DeterminismFailingWorkflow { @WorkflowMethod @@ -5051,13 +4689,8 @@ public void testNonDeterministicWorkflowPolicyFailWorkflow() { new WorkflowImplementationOptions.Builder() .setNonDeterministicWorkflowPolicy(FailWorkflow) .build(); - worker.registerWorkflowImplementationTypes( - implementationOptions, DeterminismFailingWorkflowImpl.class); - if (useExternalService) { - workerFactory.start(); - } else { - testEnvironment.start(); - } + startWorkerFor(implementationOptions, DeterminismFailingWorkflowImpl.class); + WorkflowOptions options = new WorkflowOptions.Builder() .setExecutionStartToCloseTimeout(Duration.ofSeconds(1)) @@ -5170,7 +4803,9 @@ public List query(List arg) { @Test public void testGenericParametersWorkflow() throws ExecutionException, InterruptedException { - worker.registerActivitiesImplementations(new GenericParametersActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new GenericParametersActivityImpl()); startWorkerFor(GenericParametersWorkflowImpl.class); GenericParametersWorkflow workflowStub = workflowClient.newWorkflowStub( @@ -5256,7 +4891,9 @@ public String execute(String taskList) { @Test public void testNonSerializableExceptionInActivity() { - worker.registerActivitiesImplementations(new NonSerializableExceptionActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new NonSerializableExceptionActivityImpl()); startWorkerFor(TestNonSerializableExceptionInActivityWorkflow.class); TestWorkflow1 workflowStub = workflowClient.newWorkflowStub( @@ -5311,7 +4948,9 @@ public String execute(String taskList) { @Test public void testNonSerializableArgumentsInActivity() { - worker.registerActivitiesImplementations(new NonDeserializableExceptionActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new NonDeserializableExceptionActivityImpl()); startWorkerFor(TestNonSerializableArgumentsInActivityWorkflow.class); TestWorkflow1 workflowStub = workflowClient.newWorkflowStub( @@ -5402,7 +5041,9 @@ public String execute(int activityCount, String taskList) { @Ignore // Requires DEBUG_TIMEOUTS=true public void testLargeHistory() { final int activityCount = 1000; - worker.registerActivitiesImplementations(new TestLargeWorkflowActivityImpl()); + cadenceTestRule + .getWorker() + .registerActivitiesImplementations(new TestLargeWorkflowActivityImpl()); startWorkerFor(TestLargeHistory.class); TestLargeWorkflow workflowStub = workflowClient.newWorkflowStub( @@ -5708,21 +5349,13 @@ public void testSignalOrderingWorkflow() { WorkflowClient.start(workflowStub::run); // Suspend polling so that all the signals will be received in the same decision task. - if (useExternalService) { - workerFactory.suspendPolling(); - } else { - testEnvironment.getWorkerFactory().suspendPolling(); - } + cadenceTestRule.suspendPolling(); workflowStub.signal("test1"); workflowStub.signal("test2"); workflowStub.signal("test3"); - if (useExternalService) { - workerFactory.resumePolling(); - } else { - testEnvironment.getWorkerFactory().resumePolling(); - } + cadenceTestRule.resumePolling(); List result = workflowStub.run(); List expected = Arrays.asList("test1", "test2", "test3"); @@ -5768,29 +5401,6 @@ public String execute(String taskList) { } } - @Test - public void testWorkflowReset() throws Exception { - // Leave the following code to generate history. - // startWorkerFor(TestWorkflowResetReplayWorkflow.class, TestMultiargsWorkflowsImpl.class); - // TestWorkflow1 workflowStub = - // workflowClient.newWorkflowStub( - // TestWorkflow1.class, newWorkflowOptionsBuilder(taskList).build()); - // workflowStub.execute(taskList); - // - // try { - // Thread.sleep(60000000); - // } catch (InterruptedException e) { - // e.printStackTrace(); - // } - - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "resetWorkflowHistory.json", TestWorkflowResetReplayWorkflow.class); - } - public interface GreetingWorkflow { @WorkflowMethod @@ -5837,19 +5447,6 @@ public void createGreeting(String name) { } } - // Server doesn't guarantee that the timer fire timestamp is larger or equal of the - // expected fire time. This test ensures that client still fires timer in this case. - @Test - public void testTimerFiringTimestampEarlierThanExpected() throws Exception { - - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", stickyOff); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "timerfiring.json", TimerFiringWorkflowImpl.class); - } - public interface TestCompensationWorkflow { @WorkflowMethod void compensate(); @@ -5982,11 +5579,7 @@ public void testExceptionInSignal() throws InterruptedException { // Suspend polling so that decision tasks are not retried. Otherwise it will affect our thread // count. - if (useExternalService) { - workerFactory.suspendPolling(); - } else { - testEnvironment.getWorkerFactory().suspendPolling(); - } + cadenceTestRule.suspendPolling(); // Wait for decision task retry to finish. Thread.sleep(10000); @@ -6139,7 +5732,7 @@ public String getState() { public void testGetVersionRetry() throws ExecutionException, InterruptedException { TestActivities activity = mock(TestActivities.class); when(activity.activity1(1)).thenReturn(1); - worker.registerActivitiesImplementations(activity); + cadenceTestRule.getWorker().registerActivitiesImplementations(activity); startWorkerFor(TestGetVersionWorkflowRetryImpl.class); TestWorkflow3 workflowStub = @@ -6153,16 +5746,6 @@ public void testGetVersionRetry() throws ExecutionException, InterruptedExceptio assertEquals("activity1", workflowStub.getState()); } - @Test - public void testGetVersionWithRetryReplay() throws Exception { - // Avoid executing 4 times - Assume.assumeFalse("skipping for docker tests", useExternalService); - Assume.assumeFalse("skipping for sticky off", disableStickyExecution); - - WorkflowReplayer.replayWorkflowExecutionFromResource( - "testGetVersionWithRetryHistory.json", TestGetVersionWorkflowRetryImpl.class); - } - @Test(expected = ExecutionException.class) public void testWorkflowTimesOutWhenNoOverridesProvided() throws Exception { startWorkerFor(TestWorkflowActivityOptionOverride.class); @@ -6224,10 +5807,10 @@ public void mySignal(String value) { } @Test + @RequiresTestService public void testEnqueueWorkflow() throws Exception { // The docker service isn't set up for async calls. Only run against the TestEnvironment version // of this test - assumeTrue("The docker service doesn't support async calls", !useDockerService); startWorkerFor(TestEnqueueWorkflow.class); WorkflowStub stub =