Skip to content

Commit 1fb089f

Browse files
Avoid incrementing WFT failure metric on respond failure (#1887)
Avoid incrementing WFT failure metric on respond failure
1 parent dac53be commit 1fb089f

File tree

2 files changed

+98
-3
lines changed

2 files changed

+98
-3
lines changed

temporal-sdk/src/main/java/io/temporal/internal/worker/WorkflowWorker.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,6 @@ public void handle(WorkflowTask task) throws Exception {
360360
}
361361
} catch (Exception e) {
362362
logExceptionDuringResultReporting(e, currentTask, result);
363-
workflowTypeScope.counter(MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER).inc(1);
364363
// if we failed to report the workflow task completion back to the server,
365364
// our cached version of the workflow may be more advanced than the server is aware of.
366365
// We should discard this execution and perform a clean replay based on what server
@@ -370,8 +369,6 @@ public void handle(WorkflowTask task) throws Exception {
370369
throw e;
371370
}
372371

373-
// this should be after sendReply, otherwise we may log
374-
// WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER twice if sendReply throws
375372
if (result.getTaskFailed() != null) {
376373
// we don't trigger the counter in case of the legacy query
377374
// (which never has taskFailed set)

temporal-sdk/src/test/java/io/temporal/internal/worker/WorkflowWorkerTest.java

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,102 @@ public void concurrentPollRequestLockTest() throws Exception {
193193
// Verify we only handled two tasks
194194
verify(taskHandler, times(2)).handleWorkflowTask(any());
195195
}
196+
197+
@Test
198+
public void respondWorkflowTaskFailureMetricTest() throws Exception {
199+
// Test that if the SDK gets a failure on RespondWorkflowTaskCompleted it does not increment
200+
// workflow_task_execution_failed.
201+
WorkflowServiceStubs client = mock(WorkflowServiceStubs.class);
202+
when(client.getServerCapabilities())
203+
.thenReturn(() -> GetSystemInfoResponse.Capabilities.newBuilder().build());
204+
205+
WorkflowRunLockManager runLockManager = new WorkflowRunLockManager();
206+
207+
Scope metricsScope =
208+
new RootScopeBuilder()
209+
.reporter(reporter)
210+
.reportEvery(com.uber.m3.util.Duration.ofMillis(1));
211+
WorkflowExecutorCache cache = new WorkflowExecutorCache(10, runLockManager, metricsScope);
212+
213+
WorkflowTaskHandler taskHandler = mock(WorkflowTaskHandler.class);
214+
when(taskHandler.isAnyTypeSupported()).thenReturn(true);
215+
216+
EagerActivityDispatcher eagerActivityDispatcher = mock(EagerActivityDispatcher.class);
217+
WorkflowWorker worker =
218+
new WorkflowWorker(
219+
client,
220+
"default",
221+
"task_queue",
222+
"sticky_task_queue",
223+
SingleWorkerOptions.newBuilder()
224+
.setIdentity("test_identity")
225+
.setBuildId(UUID.randomUUID().toString())
226+
.setPollerOptions(PollerOptions.newBuilder().setPollThreadCount(1).build())
227+
.setMetricsScope(metricsScope)
228+
.build(),
229+
runLockManager,
230+
cache,
231+
taskHandler,
232+
eagerActivityDispatcher);
233+
234+
WorkflowServiceGrpc.WorkflowServiceBlockingStub blockingStub =
235+
mock(WorkflowServiceGrpc.WorkflowServiceBlockingStub.class);
236+
when(client.blockingStub()).thenReturn(blockingStub);
237+
when(blockingStub.withOption(any(), any())).thenReturn(blockingStub);
238+
239+
PollWorkflowTaskQueueResponse pollResponse =
240+
PollWorkflowTaskQueueResponse.newBuilder()
241+
.setTaskToken(ByteString.copyFrom("token", UTF_8))
242+
.setWorkflowExecution(
243+
WorkflowExecution.newBuilder().setWorkflowId(WORKFLOW_ID).setRunId(RUN_ID).build())
244+
.setWorkflowType(WorkflowType.newBuilder().setName(WORKFLOW_TYPE).build())
245+
.build();
246+
247+
CountDownLatch pollTaskQueueLatch = new CountDownLatch(1);
248+
CountDownLatch blockPollTaskQueueLatch = new CountDownLatch(1);
249+
250+
when(blockingStub.pollWorkflowTaskQueue(any(PollWorkflowTaskQueueRequest.class)))
251+
.thenReturn(pollResponse)
252+
.thenAnswer(
253+
(Answer<PollWorkflowTaskQueueResponse>)
254+
invocation -> {
255+
pollTaskQueueLatch.countDown();
256+
blockPollTaskQueueLatch.await();
257+
return null;
258+
});
259+
;
260+
261+
CountDownLatch handleTaskLatch = new CountDownLatch(1);
262+
263+
when(taskHandler.handleWorkflowTask(any(PollWorkflowTaskQueueResponse.class)))
264+
.thenAnswer(
265+
(Answer<WorkflowTaskHandler.Result>)
266+
invocation -> {
267+
handleTaskLatch.countDown();
268+
269+
return new WorkflowTaskHandler.Result(
270+
WORKFLOW_TYPE,
271+
RespondWorkflowTaskCompletedRequest.newBuilder().build(),
272+
null,
273+
null,
274+
null,
275+
false,
276+
null);
277+
});
278+
279+
when(blockingStub.respondWorkflowTaskCompleted(any(RespondWorkflowTaskCompletedRequest.class)))
280+
.thenThrow(new RuntimeException());
281+
282+
assertTrue(worker.start());
283+
// Wait until we have got all the polls
284+
pollTaskQueueLatch.await();
285+
// Wait until the worker handles at least one WFT
286+
handleTaskLatch.await();
287+
// Cleanup
288+
worker.shutdown(new ShutdownManager(), false).get();
289+
// Make sure we don't report workflow task failure
290+
reporter.assertNoMetric(
291+
MetricsType.WORKFLOW_TASK_EXECUTION_FAILURE_COUNTER,
292+
ImmutableMap.of("worker_type", "WorkflowWorker", "workflow_type", "test-workflow-type"));
293+
}
196294
}

0 commit comments

Comments
 (0)