Skip to content

Commit c2b941a

Browse files
Add unit test for concurrent WFT (#1822)
Add unit test for concurrent WFT
1 parent 16755a1 commit c2b941a

File tree

1 file changed

+196
-0
lines changed

1 file changed

+196
-0
lines changed
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/*
2+
* Copyright (C) 2022 Temporal Technologies, Inc. All Rights Reserved.
3+
*
4+
* Copyright (C) 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
5+
*
6+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this material except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package io.temporal.internal.worker;
22+
23+
import static java.nio.charset.StandardCharsets.UTF_8;
24+
import static junit.framework.TestCase.assertEquals;
25+
import static org.junit.Assert.*;
26+
import static org.mockito.ArgumentMatchers.any;
27+
import static org.mockito.Mockito.*;
28+
29+
import com.google.protobuf.ByteString;
30+
import com.uber.m3.tally.RootScopeBuilder;
31+
import com.uber.m3.tally.Scope;
32+
import com.uber.m3.util.ImmutableMap;
33+
import io.temporal.api.common.v1.WorkflowExecution;
34+
import io.temporal.api.common.v1.WorkflowType;
35+
import io.temporal.api.workflowservice.v1.*;
36+
import io.temporal.common.reporter.TestStatsReporter;
37+
import io.temporal.serviceclient.WorkflowServiceStubs;
38+
import io.temporal.worker.MetricsType;
39+
import java.util.UUID;
40+
import java.util.concurrent.*;
41+
import org.junit.Test;
42+
import org.mockito.stubbing.Answer;
43+
import org.slf4j.Logger;
44+
import org.slf4j.LoggerFactory;
45+
46+
public class WorkflowWorkerTest {
47+
private static final Logger log = LoggerFactory.getLogger(WorkflowWorkerTest.class);
48+
private final TestStatsReporter reporter = new TestStatsReporter();
49+
private static final String WORKFLOW_ID = "test-workflow-id";
50+
private static final String RUN_ID = "test-run-id";
51+
private static final String WORKFLOW_TYPE = "test-workflow-type";
52+
53+
@Test
54+
public void concurrentPollRequestLockTest() throws Exception {
55+
// Test that if the server sends multiple concurrent workflow tasks for the same workflow the
56+
// SDK holds the lock during all processing.
57+
WorkflowServiceStubs client = mock(WorkflowServiceStubs.class);
58+
when(client.getServerCapabilities())
59+
.thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build());
60+
61+
WorkflowRunLockManager runLockManager = new WorkflowRunLockManager();
62+
63+
Scope metricsScope =
64+
new RootScopeBuilder()
65+
.reporter(reporter)
66+
.reportEvery(com.uber.m3.util.Duration.ofMillis(1));
67+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, runLockManager, metricsScope);
68+
69+
WorkflowTaskHandler taskHandler = mock(WorkflowTaskHandler.class);
70+
when(taskHandler.isAnyTypeSupported()).thenReturn(true);
71+
72+
EagerActivityDispatcher eagerActivityDispatcher = mock(EagerActivityDispatcher.class);
73+
WorkflowWorker worker =
74+
new WorkflowWorker(
75+
client,
76+
"default",
77+
"task_queue",
78+
"sticky_task_queue",
79+
SingleWorkerOptions.newBuilder()
80+
.setIdentity("test_identity")
81+
.setBuildId(UUID.randomUUID().toString())
82+
.setPollerOptions(PollerOptions.newBuilder().setPollThreadCount(3).build())
83+
.setMetricsScope(metricsScope)
84+
.build(),
85+
runLockManager,
86+
cache,
87+
taskHandler,
88+
eagerActivityDispatcher);
89+
90+
WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
91+
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
92+
when(client.blockingStub()).thenReturn(blockingStub);
93+
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
94+
95+
PollWorkflowTaskQueueResponse pollResponse =
96+
PollWorkflowTaskQueueResponse.newBuilder()
97+
.setTaskToken(ByteString.copyFrom("token", UTF_8))
98+
.setWorkflowExecution(
99+
WorkflowExecution.newBuilder().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID).build())
100+
.setWorkflowType(WorkflowType.newBuilder().setName(WORKFLOW_TYPE).build())
101+
.build();
102+
103+
CountDownLatch pollTaskQueueLatch = new CountDownLatch(1);
104+
CountDownLatch blockPollTaskQueueLatch = new CountDownLatch(1);
105+
106+
when(blockingStub.pollWorkflowTaskQueue(any(PollWorkflowTaskQueueRequest.class)))
107+
.thenReturn(pollResponse)
108+
.thenReturn(pollResponse)
109+
.thenAnswer(
110+
(Answer<PollWorkflowTaskQueueResponse>)
111+
invocation -> {
112+
pollTaskQueueLatch.countDown();
113+
return pollResponse;
114+
})
115+
.thenAnswer(
116+
(Answer<PollWorkflowTaskQueueResponse>)
117+
invocation -> {
118+
blockPollTaskQueueLatch.await();
119+
return null;
120+
});
121+
122+
CountDownLatch handleTaskLatch = new CountDownLatch(1);
123+
when(taskHandler.handleWorkflowTask(any(PollWorkflowTaskQueueResponse.class)))
124+
.thenAnswer(
125+
(Answer<WorkflowTaskHandler.Result>)
126+
invocation -> {
127+
// Slightly larger than the lock timeout hard coded in WorkflowWorker
128+
handleTaskLatch.countDown();
129+
Thread.sleep(6000);
130+
131+
return new WorkflowTaskHandler.Result(
132+
WORKFLOW_TYPE,
133+
RespondWorkflowTaskCompletedRequest.newBuilder().build(),
134+
null,
135+
null,
136+
null,
137+
false,
138+
(id) -> {
139+
// verify the lock is still being held
140+
assertEquals(runLockManager.totalLocks(), 1);
141+
});
142+
});
143+
144+
// Mock the server responding to a workflow task complete with another workflow task
145+
CountDownLatch respondTaskLatch = new CountDownLatch(1);
146+
when(blockingStub.respondWorkflowTaskCompleted(any(RespondWorkflowTaskCompletedRequest.class)))
147+
.thenAnswer(
148+
(Answer<RespondWorkflowTaskCompletedResponse>)
149+
invocation -> {
150+
// verify the lock is still being held
151+
assertEquals(runLockManager.totalLocks(), 1);
152+
return RespondWorkflowTaskCompletedResponse.newBuilder()
153+
.setWorkflowTask(pollResponse)
154+
.build();
155+
})
156+
.thenAnswer(
157+
(Answer<RespondWorkflowTaskCompletedResponse>)
158+
invocation -> {
159+
// verify the lock is still being held
160+
assertEquals(runLockManager.totalLocks(), 1);
161+
respondTaskLatch.countDown();
162+
return RespondWorkflowTaskCompletedResponse.newBuilder().build();
163+
});
164+
165+
assertTrue(worker.start());
166+
// Wait until we have got all the polls
167+
pollTaskQueueLatch.await();
168+
// Wait until the worker handles at least one WFT
169+
handleTaskLatch.await();
170+
// Sleep to allow metrics to be published
171+
Thread.sleep(100);
172+
// Since all polls have the same runID only one should get through, the other two should be
173+
// blocked
174+
assertEquals(runLockManager.totalLocks(), 1);
175+
// Verify 3 slots have been used
176+
reporter.assertGauge(
177+
MetricsType.WORKER_TASK_SLOTS_AVAILABLE,
178+
ImmutableMap.of("worker_type", "WorkflowWorker"),
179+
97.0);
180+
// Wait for the worker to respond, by this time the other blocked tasks should have timed out
181+
respondTaskLatch.await();
182+
// Sleep to allow metrics to be published
183+
Thread.sleep(100);
184+
// No task should have the lock anymore
185+
assertEquals(runLockManager.totalLocks(), 0);
186+
// All slots should be available
187+
reporter.assertGauge(
188+
MetricsType.WORKER_TASK_SLOTS_AVAILABLE,
189+
ImmutableMap.of("worker_type", "WorkflowWorker"),
190+
100.0);
191+
// Cleanup
192+
worker.shutdown(new ShutdownManager(), true).get();
193+
// Verify we only handled two tasks
194+
verify(taskHandler, times(2)).handleWorkflowTask(any());
195+
}
196+
}

0 commit comments

Comments
 (0)