Skip to content

Commit 5c464e8

Browse files
Fix issue with isReplaying causing direct query to spam logs (#2087)
Fix direct query causing logs when replaying in certain cases
1 parent 08b220c commit 5c464e8

File tree

2 files changed

+92
-6
lines changed

2 files changed

+92
-6
lines changed

temporal-sdk/src/main/java/io/temporal/internal/statemachines/WFTBuffer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,9 @@ public Optional<HistoryEvent> getWorkflowTaskCompletedEvent() {
6969

7070
/**
7171
* @return Should the buffer be fetched. true if a whole history for a workflow task is
72-
* accumulated or events can't be attributed to a completed workflow task
72+
* accumulated or events can't be attributed to a completed workflow task. The whole history
73+
* includes the unprocessed history events before the WorkflowTaskStarted and the command
74+
* events after the WorkflowTaskCompleted.
7375
*/
7476
public boolean addEvent(HistoryEvent event, boolean hasNextEvent) {
7577
if (readyToFetch.size() > 0) {
@@ -84,6 +86,13 @@ private void handleEvent(HistoryEvent event, boolean hasNextEvent) {
8486
// flush buffer
8587
flushBuffer();
8688

89+
// If the last event in history is a WORKFLOW_TASK_COMPLETED, because say we received a direct
90+
// query,
91+
// we need to return it as a batch.
92+
if (WFTState.Started.equals(wftSequenceState)
93+
&& event.getEventType().equals(EventType.EVENT_TYPE_WORKFLOW_TASK_COMPLETED)) {
94+
workflowTaskCompletedEvent = Optional.of(event);
95+
}
8796
// exit the sequence
8897
wftSequenceState = WFTState.None;
8998
readyToFetch.add(event);
@@ -148,16 +157,16 @@ private void addToBuffer(HistoryEvent event) {
148157
public EventBatch fetch() {
149158
if (readyToFetch.size() == 1) {
150159
HistoryEvent event = readyToFetch.get(0);
151-
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
160+
Optional<HistoryEvent> wftCompleted = workflowTaskCompletedEvent;
152161
workflowTaskCompletedEvent = Optional.empty();
153162
readyToFetch.clear();
154-
return new EventBatch(wftStarted, Collections.singletonList(event));
163+
return new EventBatch(wftCompleted, Collections.singletonList(event));
155164
} else {
156165
List<HistoryEvent> result = new ArrayList<>(readyToFetch);
157-
Optional<HistoryEvent> wftStarted = workflowTaskCompletedEvent;
166+
Optional<HistoryEvent> wftCompleted = workflowTaskCompletedEvent;
158167
workflowTaskCompletedEvent = Optional.empty();
159168
readyToFetch.clear();
160-
return new EventBatch(wftStarted, result);
169+
return new EventBatch(wftCompleted, result);
161170
}
162171
}
163172
}

temporal-sdk/src/test/java/io/temporal/workflow/queryTests/DirectQueryReplaysDontSpamLogWithWorkflowExecutionExceptionsTest.java

Lines changed: 78 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,19 @@
2525
import ch.qos.logback.classic.Logger;
2626
import ch.qos.logback.classic.spi.ILoggingEvent;
2727
import ch.qos.logback.core.read.ListAppender;
28+
import io.temporal.activity.ActivityOptions;
29+
import io.temporal.api.common.v1.WorkflowExecution;
30+
import io.temporal.client.WorkflowClient;
2831
import io.temporal.client.WorkflowException;
32+
import io.temporal.common.RetryOptions;
33+
import io.temporal.failure.ActivityFailure;
2934
import io.temporal.failure.ApplicationFailure;
3035
import io.temporal.internal.Issue;
3136
import io.temporal.testing.internal.SDKTestWorkflowRule;
37+
import io.temporal.workflow.Workflow;
38+
import io.temporal.workflow.shared.TestActivities;
3239
import io.temporal.workflow.shared.TestWorkflows;
40+
import java.time.Duration;
3341
import java.util.concurrent.atomic.AtomicInteger;
3442
import org.junit.Before;
3543
import org.junit.Rule;
@@ -49,7 +57,10 @@ public class DirectQueryReplaysDontSpamLogWithWorkflowExecutionExceptionsTest {
4957

5058
@Rule
5159
public SDKTestWorkflowRule testWorkflowRule =
52-
SDKTestWorkflowRule.newBuilder().setWorkflowTypes(TestWorkflowNonRetryableFlag.class).build();
60+
SDKTestWorkflowRule.newBuilder()
61+
.setWorkflowTypes(TestWorkflowNonRetryableFlag.class, LogAndKeepRunningWorkflow.class)
62+
.setActivityImplementations(new TestActivities.TestActivitiesImpl())
63+
.build();
5364

5465
@Before
5566
public void setUp() throws Exception {
@@ -79,6 +90,72 @@ public void queriedWorkflowFailureDoesntProduceAdditionalLogs() {
7990
workflowExecuteRunnableLoggerAppender.list.size());
8091
}
8192

93+
@Test
94+
public void queriedWorkflowFailureDoesntProduceAdditionalLogsWhenWorkflowIsNotCompleted() {
95+
TestWorkflows.QueryableWorkflow workflow =
96+
testWorkflowRule.newWorkflowStub(TestWorkflows.QueryableWorkflow.class);
97+
98+
WorkflowExecution execution = WorkflowClient.start(workflow::execute);
99+
100+
assertEquals("my-state", workflow.getState());
101+
assertEquals("There was only one execution.", 1, workflowCodeExecutionCount.get());
102+
103+
testWorkflowRule.invalidateWorkflowCache();
104+
assertEquals("my-state", workflow.getState());
105+
assertEquals(
106+
"There was two executions - one original and one full replay for query.",
107+
2,
108+
workflowCodeExecutionCount.get());
109+
110+
workflow.mySignal("exit");
111+
assertEquals("my-state", workflow.getState());
112+
assertEquals(
113+
"There was three executions - one original and two full replays for query.",
114+
3,
115+
workflowCodeExecutionCount.get());
116+
assertEquals(
117+
"Only the original exception should be logged.",
118+
1,
119+
workflowExecuteRunnableLoggerAppender.list.size());
120+
}
121+
122+
public static class LogAndKeepRunningWorkflow implements TestWorkflows.QueryableWorkflow {
123+
private final org.slf4j.Logger logger =
124+
Workflow.getLogger("io.temporal.internal.sync.WorkflowExecutionHandler");
125+
private final TestActivities.VariousTestActivities activities =
126+
Workflow.newActivityStub(
127+
TestActivities.VariousTestActivities.class,
128+
ActivityOptions.newBuilder()
129+
.setStartToCloseTimeout(Duration.ofSeconds(10))
130+
.setRetryOptions(RetryOptions.newBuilder().setMaximumAttempts(1).build())
131+
.build());
132+
private boolean exit;
133+
134+
@Override
135+
public String execute() {
136+
workflowCodeExecutionCount.incrementAndGet();
137+
while (true) {
138+
try {
139+
activities.throwIO();
140+
} catch (ActivityFailure e) {
141+
logger.error("Unexpected error on activity", e);
142+
Workflow.await(() -> exit);
143+
return "exit";
144+
}
145+
}
146+
}
147+
148+
@Override
149+
public String getState() {
150+
return "my-state";
151+
}
152+
153+
@Override
154+
public void mySignal(String value) {
155+
exit = true;
156+
}
157+
}
158+
82159
public static class TestWorkflowNonRetryableFlag implements TestWorkflows.TestWorkflowWithQuery {
83160

84161
@Override

0 commit comments

Comments
 (0)