Skip to content

Commit 777c3bc

Browse files
committed
Add unit tests for History Iterator in Replayer
1 parent fb7d322 commit 777c3bc

File tree

3 files changed

+254
-30
lines changed

3 files changed

+254
-30
lines changed

src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -231,14 +231,8 @@ public DecisionEvents next() {
231231
}
232232
decisionEvents.add(events.next());
233233
}
234-
DecisionEvents result =
235-
new DecisionEvents(
236-
newEvents,
237-
decisionEvents,
238-
replay,
239-
replayCurrentTimeMilliseconds,
240-
nextDecisionEventId);
241-
return result;
234+
return new DecisionEvents(
235+
newEvents, decisionEvents, replay, replayCurrentTimeMilliseconds, nextDecisionEventId);
242236
}
243237
}
244238

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,9 @@ private void processEvent(HistoryEvent event) {
180180
context.handleChildWorkflowExecutionTimedOut(event);
181181
break;
182182
case DecisionTaskCompleted:
183-
// NOOP
184-
break;
185183
case DecisionTaskScheduled:
184+
case WorkflowExecutionTimedOut:
185+
case WorkflowExecutionTerminated:
186186
// NOOP
187187
break;
188188
case DecisionTaskStarted:
@@ -208,12 +208,6 @@ private void processEvent(HistoryEvent event) {
208208
case WorkflowExecutionStarted:
209209
handleWorkflowExecutionStarted(event);
210210
break;
211-
case WorkflowExecutionTerminated:
212-
// NOOP
213-
break;
214-
case WorkflowExecutionTimedOut:
215-
// NOOP
216-
break;
217211
case ActivityTaskScheduled:
218212
decisionsHelper.handleActivityTaskScheduled(event);
219213
break;
@@ -227,11 +221,8 @@ private void processEvent(HistoryEvent event) {
227221
context.handleMarkerRecorded(event);
228222
break;
229223
case WorkflowExecutionCompleted:
230-
break;
231224
case WorkflowExecutionFailed:
232-
break;
233225
case WorkflowExecutionCanceled:
234-
break;
235226
case WorkflowExecutionContinuedAsNew:
236227
break;
237228
case TimerStarted:
@@ -410,7 +401,7 @@ private Map<String, WorkflowQueryResult> getQueryResults(Map<String, WorkflowQue
410401
return queries
411402
.entrySet()
412403
.stream()
413-
.collect(Collectors.toMap(q -> q.getKey(), q -> queryWorkflow(q.getValue())));
404+
.collect(Collectors.toMap(Map.Entry::getKey, q -> queryWorkflow(q.getValue())));
414405
}
415406

416407
private WorkflowQueryResult queryWorkflow(WorkflowQuery query) {
@@ -632,9 +623,9 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis
632623
private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200);
633624
private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4);
634625
private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis());
635-
private Duration decisionTaskStartToCloseTimeout;
626+
private final Duration decisionTaskStartToCloseTimeout;
636627

637-
private final Duration decisionTaskRemainingTime() {
628+
private Duration decisionTaskRemainingTime() {
638629
Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart);
639630
return decisionTaskStartToCloseTimeout.minus(passed);
640631
}
@@ -715,14 +706,11 @@ public HistoryEvent next() {
715706
}
716707
if (!current.hasNext()) {
717708
log.error(
718-
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:"
719-
+ request.execution.workflowId
720-
+ ", runID:"
721-
+ request.execution.runId
722-
+ ", domain:"
723-
+ request.domain
724-
+ " token:"
725-
+ Arrays.toString(request.getNextPageToken()));
709+
"GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:{}, runID:{}, domain:{} token:{}",
710+
request.execution.workflowId,
711+
request.execution.runId,
712+
request.domain,
713+
Arrays.toString(request.getNextPageToken()));
726714
throw new Error(
727715
"GetWorkflowExecutionHistory return empty history, maybe a bug in server");
728716
}
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.replay;
19+
20+
import static org.junit.Assert.*;
21+
import static org.mockito.Mockito.*;
22+
23+
import com.uber.cadence.*;
24+
import com.uber.cadence.client.WorkflowClientOptions;
25+
import com.uber.cadence.internal.worker.SingleWorkerOptions;
26+
import com.uber.cadence.serviceclient.IWorkflowService;
27+
import java.lang.reflect.Constructor;
28+
import java.lang.reflect.InvocationTargetException;
29+
import java.lang.reflect.Method;
30+
import java.time.Duration;
31+
import java.util.*;
32+
import org.apache.thrift.TException;
33+
import org.junit.Before;
34+
import org.junit.Test;
35+
import org.mockito.Mock;
36+
import org.mockito.MockitoAnnotations;
37+
38+
public class ReplaceDeciderDecisionTaskWithHistoryIteratorTest {
39+
@Mock private IWorkflowService mockService;
40+
41+
@Mock private DecisionContextImpl mockContext;
42+
43+
@Mock private DecisionsHelper mockedHelper;
44+
45+
private static final int MAXIMUM_PAGE_SIZE = 10000;
46+
private final String WORKFLOW_ID = "testWorkflowId";
47+
private final String RUN_ID = "testRunId";
48+
private final String DOMAIN = "testDomain";
49+
private final String START_PAGE_TOKEN = "testPageToken";
50+
private final WorkflowExecution WORKFLOW_EXECUTION =
51+
new WorkflowExecution().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID);
52+
private final HistoryEvent START_EVENT =
53+
new HistoryEvent()
54+
.setWorkflowExecutionStartedEventAttributes(new WorkflowExecutionStartedEventAttributes())
55+
.setEventId(1);
56+
private final History HISTORY = new History().setEvents(Collections.singletonList(START_EVENT));
57+
private final PollForDecisionTaskResponse task =
58+
new PollForDecisionTaskResponse()
59+
.setWorkflowExecution(WORKFLOW_EXECUTION)
60+
.setHistory(HISTORY)
61+
.setNextPageToken(START_PAGE_TOKEN.getBytes());
62+
63+
private Object iterator;
64+
65+
private void setupDecisionTaskWithHistoryIteratorImpl() {
66+
try {
67+
// Find the inner class first
68+
Class<?> innerClass = findDecisionTaskWithHistoryIteratorImplClass();
69+
70+
// Get the constructor with the specific parameter types
71+
Constructor<?> constructor =
72+
innerClass.getDeclaredConstructor(
73+
ReplayDecider.class, PollForDecisionTaskResponse.class, Duration.class);
74+
constructor.setAccessible(true);
75+
76+
when(mockedHelper.getTask()).thenReturn(task);
77+
when(mockContext.getDomain()).thenReturn(DOMAIN);
78+
79+
// Create an instance of the outer class
80+
ReplayDecider outerInstance =
81+
new ReplayDecider(
82+
mockService,
83+
DOMAIN,
84+
new WorkflowType().setName("testWorkflow"),
85+
null,
86+
mockedHelper,
87+
SingleWorkerOptions.newBuilder()
88+
.setMetricsScope(WorkflowClientOptions.defaultInstance().getMetricsScope())
89+
.build(),
90+
null);
91+
92+
// Create the instance
93+
iterator = constructor.newInstance(outerInstance, task, Duration.ofSeconds(10));
94+
} catch (Exception e) {
95+
e.printStackTrace();
96+
throw new RuntimeException("Failed to set up test: " + e.getMessage(), e);
97+
}
98+
}
99+
100+
// Helper method to find the inner class
101+
private Class<?> findDecisionTaskWithHistoryIteratorImplClass() {
102+
for (Class<?> declaredClass : ReplayDecider.class.getDeclaredClasses()) {
103+
if (declaredClass.getSimpleName().equals("DecisionTaskWithHistoryIteratorImpl")) {
104+
return declaredClass;
105+
}
106+
}
107+
throw new RuntimeException("Could not find DecisionTaskWithHistoryIteratorImpl inner class");
108+
}
109+
110+
@Before
111+
public void setUp() {
112+
MockitoAnnotations.openMocks(this);
113+
setupDecisionTaskWithHistoryIteratorImpl();
114+
}
115+
116+
@Test
117+
public void testGetHistoryWithSinglePageOfEvents()
118+
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
119+
// Arrange
120+
List<HistoryEvent> events = Arrays.asList(createMockHistoryEvent(2), createMockHistoryEvent(3));
121+
History mockHistory = new History().setEvents(events);
122+
when(mockService.GetWorkflowExecutionHistory(
123+
new GetWorkflowExecutionHistoryRequest()
124+
.setDomain(DOMAIN)
125+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
126+
.setExecution(WORKFLOW_EXECUTION)
127+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
128+
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(mockHistory));
129+
130+
// Act & Assert
131+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
132+
133+
Object result = wrapperMethod.invoke(iterator);
134+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
135+
assertTrue(historyIterator.hasNext());
136+
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
137+
assertTrue(historyIterator.hasNext());
138+
assertEquals(events.get(0).getEventId(), historyIterator.next().getEventId());
139+
assertTrue(historyIterator.hasNext());
140+
assertEquals(events.get(1).getEventId(), historyIterator.next().getEventId());
141+
assertFalse(historyIterator.hasNext());
142+
}
143+
144+
@Test
145+
public void testGetHistoryWithMultiplePages()
146+
throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
147+
// First page events
148+
List<HistoryEvent> firstPageEvents =
149+
Arrays.asList(createMockHistoryEvent(1), createMockHistoryEvent(2));
150+
History firstHistory = new History().setEvents(firstPageEvents);
151+
String firstPageToken = "firstPageToken";
152+
when(mockService.GetWorkflowExecutionHistory(
153+
eq(
154+
new GetWorkflowExecutionHistoryRequest()
155+
.setDomain(DOMAIN)
156+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
157+
.setExecution(WORKFLOW_EXECUTION)
158+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
159+
.thenReturn(
160+
new GetWorkflowExecutionHistoryResponse()
161+
.setHistory(firstHistory)
162+
.setNextPageToken(firstPageToken.getBytes()));
163+
164+
// Second page events
165+
List<HistoryEvent> secondPageEvents =
166+
Arrays.asList(createMockHistoryEvent(3), createMockHistoryEvent(4));
167+
History secondHistory = new History().setEvents(secondPageEvents);
168+
when(mockService.GetWorkflowExecutionHistory(
169+
eq(
170+
new GetWorkflowExecutionHistoryRequest()
171+
.setDomain(DOMAIN)
172+
.setNextPageToken(firstPageToken.getBytes())
173+
.setExecution(WORKFLOW_EXECUTION)
174+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE))))
175+
.thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(secondHistory));
176+
177+
// Act & Assert
178+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
179+
180+
Object result = wrapperMethod.invoke(iterator);
181+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
182+
// Check first page events
183+
assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId());
184+
assertEquals(firstPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
185+
assertEquals(firstPageEvents.get(1).getEventId(), historyIterator.next().getEventId());
186+
187+
// Check second page events
188+
assertEquals(secondPageEvents.get(0).getEventId(), historyIterator.next().getEventId());
189+
assertEquals(secondPageEvents.get(1).getEventId(), historyIterator.next().getEventId());
190+
191+
assertFalse(historyIterator.hasNext());
192+
}
193+
194+
@Test(expected = Error.class)
195+
public void testGetHistoryWithTimeout()
196+
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
197+
when(mockService.GetWorkflowExecutionHistory(
198+
new GetWorkflowExecutionHistoryRequest()
199+
.setDomain(DOMAIN)
200+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
201+
.setExecution(WORKFLOW_EXECUTION)
202+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
203+
.thenThrow(new TException());
204+
205+
// Act & Assert
206+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
207+
208+
Object result = wrapperMethod.invoke(iterator);
209+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
210+
historyIterator.next();
211+
212+
historyIterator.next(); // This should throw an Error due to timeout
213+
}
214+
215+
@Test(expected = Error.class)
216+
public void testEmptyHistory()
217+
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException {
218+
when(mockService.GetWorkflowExecutionHistory(
219+
new GetWorkflowExecutionHistoryRequest()
220+
.setDomain(DOMAIN)
221+
.setNextPageToken(START_PAGE_TOKEN.getBytes())
222+
.setExecution(WORKFLOW_EXECUTION)
223+
.setMaximumPageSize(MAXIMUM_PAGE_SIZE)))
224+
.thenReturn(
225+
new GetWorkflowExecutionHistoryResponse()
226+
.setHistory(new History().setEvents(new ArrayList<>())));
227+
228+
// Act & Assert
229+
Method wrapperMethod = iterator.getClass().getMethod("getHistory");
230+
231+
Object result = wrapperMethod.invoke(iterator);
232+
Iterator<HistoryEvent> historyIterator = (Iterator<HistoryEvent>) result;
233+
historyIterator.next();
234+
235+
historyIterator.next(); // This should throw an Error due to timeout
236+
}
237+
238+
// Helper method to create mock HistoryEvent
239+
private HistoryEvent createMockHistoryEvent(int eventId) {
240+
return new HistoryEvent().setEventId(eventId);
241+
}
242+
}

0 commit comments

Comments
 (0)