diff --git a/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java b/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java index 45c9f2dcc..6c43f5f87 100644 --- a/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java +++ b/src/main/java/com/uber/cadence/internal/replay/HistoryHelper.java @@ -231,14 +231,8 @@ public DecisionEvents next() { } decisionEvents.add(events.next()); } - DecisionEvents result = - new DecisionEvents( - newEvents, - decisionEvents, - replay, - replayCurrentTimeMilliseconds, - nextDecisionEventId); - return result; + return new DecisionEvents( + newEvents, decisionEvents, replay, replayCurrentTimeMilliseconds, nextDecisionEventId); } } diff --git a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java index 650322ba5..574d012c1 100644 --- a/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java +++ b/src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java @@ -19,6 +19,7 @@ import static com.uber.cadence.worker.NonDeterministicWorkflowPolicy.FailWorkflow; +import com.google.common.annotations.VisibleForTesting; import com.uber.cadence.EventType; import com.uber.cadence.GetWorkflowExecutionHistoryRequest; import com.uber.cadence.GetWorkflowExecutionHistoryResponse; @@ -180,9 +181,9 @@ private void processEvent(HistoryEvent event) { context.handleChildWorkflowExecutionTimedOut(event); break; case DecisionTaskCompleted: - // NOOP - break; case DecisionTaskScheduled: + case WorkflowExecutionTimedOut: + case WorkflowExecutionTerminated: // NOOP break; case DecisionTaskStarted: @@ -208,12 +209,6 @@ private void processEvent(HistoryEvent event) { case WorkflowExecutionStarted: handleWorkflowExecutionStarted(event); break; - case WorkflowExecutionTerminated: - // NOOP - break; - case WorkflowExecutionTimedOut: - // NOOP - break; case ActivityTaskScheduled: decisionsHelper.handleActivityTaskScheduled(event); break; @@ -227,11 +222,8 @@ private void processEvent(HistoryEvent event) { context.handleMarkerRecorded(event); break; case WorkflowExecutionCompleted: - break; case WorkflowExecutionFailed: - break; case WorkflowExecutionCanceled: - break; case WorkflowExecutionContinuedAsNew: break; case TimerStarted: @@ -410,7 +402,7 @@ private Map getQueryResults(Map q.getKey(), q -> queryWorkflow(q.getValue()))); + .collect(Collectors.toMap(Map.Entry::getKey, q -> queryWorkflow(q.getValue()))); } private WorkflowQueryResult queryWorkflow(WorkflowQuery query) { @@ -632,9 +624,9 @@ private class DecisionTaskWithHistoryIteratorImpl implements DecisionTaskWithHis private final Duration retryServiceOperationInitialInterval = Duration.ofMillis(200); private final Duration retryServiceOperationMaxInterval = Duration.ofSeconds(4); private final Duration paginationStart = Duration.ofMillis(System.currentTimeMillis()); - private Duration decisionTaskStartToCloseTimeout; + private final Duration decisionTaskStartToCloseTimeout; - private final Duration decisionTaskRemainingTime() { + private Duration decisionTaskRemainingTime() { Duration passed = Duration.ofMillis(System.currentTimeMillis()).minus(paginationStart); return decisionTaskStartToCloseTimeout.minus(passed); } @@ -643,6 +635,7 @@ private final Duration decisionTaskRemainingTime() { private Iterator current; private byte[] nextPageToken; + @VisibleForTesting DecisionTaskWithHistoryIteratorImpl( PollForDecisionTaskResponse task, Duration decisionTaskStartToCloseTimeout) { this.task = Objects.requireNonNull(task); @@ -692,7 +685,7 @@ public HistoryEvent next() { .setExpiration(decisionTaskRemainingTime) .setInitialInterval(retryServiceOperationInitialInterval) .setMaximumInterval(retryServiceOperationMaxInterval) - .build(); + .validateBuildWithDefaults(); GetWorkflowExecutionHistoryRequest request = new GetWorkflowExecutionHistoryRequest(); request @@ -715,14 +708,11 @@ public HistoryEvent next() { } if (!current.hasNext()) { log.error( - "GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:" - + request.execution.workflowId - + ", runID:" - + request.execution.runId - + ", domain:" - + request.domain - + " token:" - + Arrays.toString(request.getNextPageToken())); + "GetWorkflowExecutionHistory returns an empty history, maybe a bug in server, workflowID:{}, runID:{}, domain:{} token:{}", + request.execution.workflowId, + request.execution.runId, + request.domain, + Arrays.toString(request.getNextPageToken())); throw new Error( "GetWorkflowExecutionHistory return empty history, maybe a bug in server"); } diff --git a/src/test/java/com/uber/cadence/internal/replay/ReplaceDeciderDecisionTaskWithHistoryIteratorTest.java b/src/test/java/com/uber/cadence/internal/replay/ReplaceDeciderDecisionTaskWithHistoryIteratorTest.java new file mode 100644 index 000000000..133251be7 --- /dev/null +++ b/src/test/java/com/uber/cadence/internal/replay/ReplaceDeciderDecisionTaskWithHistoryIteratorTest.java @@ -0,0 +1,241 @@ +/* + * Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Modifications copyright (C) 2017 Uber Technologies, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"). You may not + * use this file except in compliance with the License. A copy of the License is + * located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.uber.cadence.internal.replay; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import com.uber.cadence.*; +import com.uber.cadence.client.WorkflowClientOptions; +import com.uber.cadence.internal.worker.SingleWorkerOptions; +import com.uber.cadence.serviceclient.IWorkflowService; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.time.Duration; +import java.util.*; +import org.apache.thrift.TException; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class ReplaceDeciderDecisionTaskWithHistoryIteratorTest { + @Mock private IWorkflowService mockService; + + @Mock private DecisionContextImpl mockContext; + + @Mock private DecisionsHelper mockedHelper; + + private static final int MAXIMUM_PAGE_SIZE = 10000; + private final String WORKFLOW_ID = "testWorkflowId"; + private final String RUN_ID = "testRunId"; + private final String DOMAIN = "testDomain"; + private final String START_PAGE_TOKEN = "testPageToken"; + private final WorkflowExecution WORKFLOW_EXECUTION = + new WorkflowExecution().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID); + private final HistoryEvent START_EVENT = + new HistoryEvent() + .setWorkflowExecutionStartedEventAttributes(new WorkflowExecutionStartedEventAttributes()) + .setEventId(1); + private final History HISTORY = new History().setEvents(Collections.singletonList(START_EVENT)); + private final PollForDecisionTaskResponse task = + new PollForDecisionTaskResponse() + .setWorkflowExecution(WORKFLOW_EXECUTION) + .setHistory(HISTORY) + .setNextPageToken(START_PAGE_TOKEN.getBytes()); + + private Object iterator; + + private void setupDecisionTaskWithHistoryIteratorImpl() { + try { + // Find the inner class first + Class innerClass = findDecisionTaskWithHistoryIteratorImplClass(); + + // Get the constructor with the specific parameter types + Constructor constructor = + innerClass.getDeclaredConstructor( + ReplayDecider.class, PollForDecisionTaskResponse.class, Duration.class); + + when(mockedHelper.getTask()).thenReturn(task); + when(mockContext.getDomain()).thenReturn(DOMAIN); + + // Create an instance of the outer class + ReplayDecider outerInstance = + new ReplayDecider( + mockService, + DOMAIN, + new WorkflowType().setName("testWorkflow"), + null, + mockedHelper, + SingleWorkerOptions.newBuilder() + .setMetricsScope(WorkflowClientOptions.defaultInstance().getMetricsScope()) + .build(), + null); + + // Create the instance + iterator = constructor.newInstance(outerInstance, task, Duration.ofSeconds(10)); + } catch (Exception e) { + e.printStackTrace(); + throw new RuntimeException("Failed to set up test: " + e.getMessage(), e); + } + } + + // Helper method to find the inner class + private Class findDecisionTaskWithHistoryIteratorImplClass() { + for (Class declaredClass : ReplayDecider.class.getDeclaredClasses()) { + if (declaredClass.getSimpleName().equals("DecisionTaskWithHistoryIteratorImpl")) { + return declaredClass; + } + } + throw new RuntimeException("Could not find DecisionTaskWithHistoryIteratorImpl inner class"); + } + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + setupDecisionTaskWithHistoryIteratorImpl(); + } + + @Test + public void testGetHistoryWithSinglePageOfEvents() + throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + // Arrange + List events = Arrays.asList(createMockHistoryEvent(2), createMockHistoryEvent(3)); + History mockHistory = new History().setEvents(events); + when(mockService.GetWorkflowExecutionHistory( + new GetWorkflowExecutionHistoryRequest() + .setDomain(DOMAIN) + .setNextPageToken(START_PAGE_TOKEN.getBytes()) + .setExecution(WORKFLOW_EXECUTION) + .setMaximumPageSize(MAXIMUM_PAGE_SIZE))) + .thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(mockHistory)); + + // Act & Assert + Method wrapperMethod = iterator.getClass().getMethod("getHistory"); + + Object result = wrapperMethod.invoke(iterator); + Iterator historyIterator = (Iterator) result; + assertTrue(historyIterator.hasNext()); + assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId()); + assertTrue(historyIterator.hasNext()); + assertEquals(events.get(0).getEventId(), historyIterator.next().getEventId()); + assertTrue(historyIterator.hasNext()); + assertEquals(events.get(1).getEventId(), historyIterator.next().getEventId()); + assertFalse(historyIterator.hasNext()); + } + + @Test + public void testGetHistoryWithMultiplePages() + throws TException, NoSuchMethodException, InvocationTargetException, IllegalAccessException { + // First page events + List firstPageEvents = + Arrays.asList(createMockHistoryEvent(1), createMockHistoryEvent(2)); + History firstHistory = new History().setEvents(firstPageEvents); + String firstPageToken = "firstPageToken"; + when(mockService.GetWorkflowExecutionHistory( + eq( + new GetWorkflowExecutionHistoryRequest() + .setDomain(DOMAIN) + .setNextPageToken(START_PAGE_TOKEN.getBytes()) + .setExecution(WORKFLOW_EXECUTION) + .setMaximumPageSize(MAXIMUM_PAGE_SIZE)))) + .thenReturn( + new GetWorkflowExecutionHistoryResponse() + .setHistory(firstHistory) + .setNextPageToken(firstPageToken.getBytes())); + + // Second page events + List secondPageEvents = + Arrays.asList(createMockHistoryEvent(3), createMockHistoryEvent(4)); + History secondHistory = new History().setEvents(secondPageEvents); + when(mockService.GetWorkflowExecutionHistory( + eq( + new GetWorkflowExecutionHistoryRequest() + .setDomain(DOMAIN) + .setNextPageToken(firstPageToken.getBytes()) + .setExecution(WORKFLOW_EXECUTION) + .setMaximumPageSize(MAXIMUM_PAGE_SIZE)))) + .thenReturn(new GetWorkflowExecutionHistoryResponse().setHistory(secondHistory)); + + // Act & Assert + Method wrapperMethod = iterator.getClass().getMethod("getHistory"); + + Object result = wrapperMethod.invoke(iterator); + Iterator historyIterator = (Iterator) result; + // Check first page events + assertEquals(START_EVENT.getEventId(), historyIterator.next().getEventId()); + assertEquals(firstPageEvents.get(0).getEventId(), historyIterator.next().getEventId()); + assertEquals(firstPageEvents.get(1).getEventId(), historyIterator.next().getEventId()); + + // Check second page events + assertEquals(secondPageEvents.get(0).getEventId(), historyIterator.next().getEventId()); + assertEquals(secondPageEvents.get(1).getEventId(), historyIterator.next().getEventId()); + + assertFalse(historyIterator.hasNext()); + } + + @Test(expected = Error.class) + public void testGetHistoryFailure() + throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException { + when(mockService.GetWorkflowExecutionHistory( + new GetWorkflowExecutionHistoryRequest() + .setDomain(DOMAIN) + .setNextPageToken(START_PAGE_TOKEN.getBytes()) + .setExecution(WORKFLOW_EXECUTION) + .setMaximumPageSize(MAXIMUM_PAGE_SIZE))) + .thenThrow(new TException()); + + // Act & Assert + Method wrapperMethod = iterator.getClass().getMethod("getHistory"); + + Object result = wrapperMethod.invoke(iterator); + Iterator historyIterator = (Iterator) result; + historyIterator.next(); + + historyIterator.next(); // This should throw an Error due to timeout + } + + @Test(expected = Error.class) + public void testEmptyHistory() + throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, TException { + when(mockService.GetWorkflowExecutionHistory( + new GetWorkflowExecutionHistoryRequest() + .setDomain(DOMAIN) + .setNextPageToken(START_PAGE_TOKEN.getBytes()) + .setExecution(WORKFLOW_EXECUTION) + .setMaximumPageSize(MAXIMUM_PAGE_SIZE))) + .thenReturn( + new GetWorkflowExecutionHistoryResponse() + .setHistory(new History().setEvents(new ArrayList<>()))); + + // Act & Assert + Method wrapperMethod = iterator.getClass().getMethod("getHistory"); + + Object result = wrapperMethod.invoke(iterator); + Iterator historyIterator = (Iterator) result; + historyIterator.next(); + + historyIterator.next(); // This should throw an Error due to timeout + } + + // Helper method to create mock HistoryEvent + private HistoryEvent createMockHistoryEvent(int eventId) { + return new HistoryEvent().setEventId(eventId); + } +} diff --git a/src/test/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternalTest.java b/src/test/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternalTest.java index 31e3116c7..6baf517c0 100644 --- a/src/test/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternalTest.java +++ b/src/test/java/com/uber/cadence/internal/sync/TestActivityEnvironmentInternalTest.java @@ -126,8 +126,6 @@ public void testWorkflowServiceWrapperMethodDelegation() throws Exception { // Prepare test cases List testCases = prepareMethodTestCases(); - System.out.println(testCases); - // Test each method for (MethodTestCase testCase : testCases) { try {