Skip to content

Commit b668b27

Browse files
authored
set nextPageToken on GetWorkflowExecutionHistoryRequest (#212)
1 parent 4e820ba commit b668b27

File tree

3 files changed

+208
-4
lines changed

3 files changed

+208
-4
lines changed

src/main/java/com/uber/cadence/internal/replay/Decider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,4 @@ public interface Decider {
2929
byte[] query(PollForDecisionTaskResponse decisionTask, WorkflowQuery query) throws Throwable;
3030

3131
void close();
32-
}
32+
}

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -502,9 +502,12 @@ public HistoryEvent next() {
502502
.build();
503503

504504
GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest();
505-
request.setDomain(context.getDomain());
506-
request.setExecution(task.getWorkflowExecution());
507-
request.setMaximumPageSize(MAXIMUM_PAGE_SIZE);
505+
request
506+
.setDomain(context.getDomain())
507+
.setExecution(task.getWorkflowExecution())
508+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)
509+
.setNextPageToken(nextPageToken);
510+
508511
try {
509512
GetWorkflowExecutionHistoryResponse r =
510513
Retryer.retryWithResult(
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.worker;
19+
20+
import static org.junit.Assert.assertNotNull;
21+
import com.uber.cadence.activity.ActivityMethod;
22+
import com.uber.cadence.activity.ActivityOptions;
23+
import com.uber.cadence.client.WorkflowClient;
24+
import com.uber.cadence.client.WorkflowOptions;
25+
import com.uber.cadence.client.WorkflowStub;
26+
import com.uber.cadence.testing.TestEnvironmentOptions;
27+
import com.uber.cadence.testing.TestWorkflowEnvironment;
28+
import com.uber.cadence.workflow.Async;
29+
import com.uber.cadence.workflow.Promise;
30+
import com.uber.cadence.workflow.Workflow;
31+
import com.uber.cadence.workflow.WorkflowMethod;
32+
import java.time.Duration;
33+
import java.util.*;
34+
35+
import org.junit.Ignore;
36+
import org.junit.Rule;
37+
import org.junit.Test;
38+
import org.junit.rules.TestName;
39+
import org.junit.runner.RunWith;
40+
import org.junit.runners.Parameterized;
41+
import org.slf4j.Logger;
42+
import org.slf4j.LoggerFactory;
43+
44+
@RunWith(Parameterized.class)
45+
public class WorkerStressTests {
46+
public static final String DOMAIN = "UnitTest";
47+
48+
private static final boolean skipDockerService =
49+
Boolean.parseBoolean(System.getenv("SKIP_DOCKER_SERVICE"));
50+
51+
@Parameterized.Parameter public boolean useExternalService;
52+
53+
@Parameterized.Parameters(name = "{1}")
54+
public static Object[] data() {
55+
if (skipDockerService) {
56+
return new Object[][] {{false, "TestService"}};
57+
} else {
58+
return new Object[][] {{true, "Docker"}, {false, "TestService"}};
59+
}
60+
}
61+
62+
@Parameterized.Parameter(1)
63+
public String testType;
64+
65+
@Rule public TestName testName = new TestName();
66+
67+
// Todo: Write a unit test specifically to test DecisionTaskWithHistoryIteratorImpl
68+
@Ignore("Takes a long time to run")
69+
@Test
70+
public void longHistoryWorkflowsCompleteSuccessfully() {
71+
72+
// Arrange
73+
String taskListName = "veryLongWorkflow";
74+
75+
TestEnvironmentWrapper wrapper =
76+
new TestEnvironmentWrapper(new Worker.FactoryOptions.Builder().setmaxWorkflowThreadCount(200).Build());
77+
Worker.Factory factory = wrapper.getWorkerFactory();
78+
Worker worker = factory.newWorker(taskListName, new WorkerOptions.Builder().build());
79+
worker.registerWorkflowImplementationTypes(ActivitiesWorkflowImpl.class);
80+
worker.registerActivitiesImplementations(new ActivitiesImpl());
81+
factory.start();
82+
83+
WorkflowOptions workflowOptions =
84+
new WorkflowOptions.Builder()
85+
.setTaskList(taskListName)
86+
.setExecutionStartToCloseTimeout(Duration.ofSeconds(250))
87+
.setTaskStartToCloseTimeout(Duration.ofSeconds(30))
88+
.build();
89+
WorkflowStub workflow =
90+
wrapper
91+
.getWorkflowClient()
92+
.newUntypedWorkflowStub("ActivitiesWorkflow::execute", workflowOptions);
93+
94+
// Act
95+
// This will yeild around 10000 events which is above the page limit returned by the server.
96+
WorkflowParams w = new WorkflowParams();
97+
w.CadenceSleep = Duration.ofSeconds(0);
98+
w.ChainSequence = 50;
99+
w.ConcurrentCount = 50;
100+
w.PayloadSizeBytes = 10000;
101+
w.TaskListName = taskListName;
102+
103+
workflow.start(w);
104+
assertNotNull("I'm done.", workflow.getResult(String.class));
105+
wrapper.close();
106+
}
107+
108+
// Todo: refactor TestEnvironment to toggle between real and test service.
109+
private class TestEnvironmentWrapper {
110+
111+
private TestWorkflowEnvironment testEnv;
112+
private Worker.Factory factory;
113+
114+
public TestEnvironmentWrapper(Worker.FactoryOptions options) {
115+
if (options == null) {
116+
options = new Worker.FactoryOptions.Builder().Build();
117+
}
118+
factory = new Worker.Factory(DOMAIN, options);
119+
TestEnvironmentOptions testOptions =
120+
new TestEnvironmentOptions.Builder().setDomain(DOMAIN).setFactoryOptions(options).build();
121+
testEnv = TestWorkflowEnvironment.newInstance(testOptions);
122+
}
123+
124+
private Worker.Factory getWorkerFactory() {
125+
return useExternalService ? factory : testEnv.getWorkerFactory();
126+
}
127+
128+
private WorkflowClient getWorkflowClient() {
129+
return useExternalService ? WorkflowClient.newInstance(DOMAIN) : testEnv.newWorkflowClient();
130+
}
131+
132+
private void close() {
133+
factory.shutdown(Duration.ofSeconds(1));
134+
testEnv.close();
135+
}
136+
}
137+
138+
public static class WorkflowParams {
139+
140+
public int ChainSequence;
141+
public int ConcurrentCount;
142+
public String TaskListName;
143+
public int PayloadSizeBytes;
144+
public Duration CadenceSleep;
145+
}
146+
147+
public interface ActivitiesWorkflow {
148+
149+
@WorkflowMethod()
150+
String execute(WorkflowParams params);
151+
}
152+
153+
public static class ActivitiesWorkflowImpl implements ActivitiesWorkflow {
154+
private static final Logger log = LoggerFactory.getLogger(ActivitiesWorkflowImpl.class);
155+
156+
@Override
157+
public String execute(WorkflowParams params) {
158+
SleepActivity activity =
159+
Workflow.newActivityStub(
160+
SleepActivity.class,
161+
new ActivityOptions.Builder()
162+
.setTaskList(params.TaskListName)
163+
.setScheduleToStartTimeout(Duration.ofMinutes(1))
164+
.setStartToCloseTimeout(Duration.ofMinutes(1))
165+
.setHeartbeatTimeout(Duration.ofSeconds(20))
166+
.build());
167+
168+
for (int i = 0; i < params.ChainSequence; i++) {
169+
List<Promise<Void>> promises = new ArrayList<>();
170+
for (int j = 0; j < params.ConcurrentCount; j++) {
171+
byte[] bytes = new byte[params.PayloadSizeBytes];
172+
new Random().nextBytes(bytes);
173+
Promise<Void> promise = Async.procedure(activity::sleep, i, j, bytes);
174+
promises.add(promise);
175+
}
176+
177+
for (Promise<Void> promise : promises) {
178+
promise.get();
179+
}
180+
181+
Workflow.sleep(params.CadenceSleep);
182+
}
183+
return "I'm done";
184+
}
185+
}
186+
187+
public interface SleepActivity {
188+
189+
@ActivityMethod()
190+
void sleep(int chain, int concurrency, byte[] bytes);
191+
}
192+
193+
public static class ActivitiesImpl implements SleepActivity {
194+
private static final Logger log = LoggerFactory.getLogger("sleep-activity");
195+
196+
@Override
197+
public void sleep(int chain, int concurrency, byte[] bytes) {
198+
log.info("sleep called");
199+
}
200+
}
201+
}

0 commit comments

Comments
 (0)