Skip to content

Commit f6bf98a

Browse files
authored
implement replay aware logging (#153)
* implement replay aware logging * review comments * use list appender in logging test * remove an unnecessary line in logger test
1 parent 64d3cfb commit f6bf98a

16 files changed

+759
-17
lines changed

src/main/java/com/uber/cadence/internal/logging/ReplayAwareLogger.java

Lines changed: 485 additions & 0 deletions
Large diffs are not rendered by default.

src/main/java/com/uber/cadence/internal/replay/DecisionContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,5 +112,9 @@ Consumer<Exception> signalWorkflowExecution(
112112
*/
113113
Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> callback);
114114

115+
/** @return scope to be used for metrics reporting. */
115116
Scope getMetricsScope();
117+
118+
/** @return whether we do logging during decision replay. */
119+
boolean getEnableLoggingInReplay();
116120
}

src/main/java/com/uber/cadence/internal/replay/DecisionContextImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,21 +44,30 @@ final class DecisionContextImpl implements DecisionContext, HistoryEventHandler
4444

4545
private Scope metricsScope;
4646

47+
private final boolean enableLoggingInReplay;
48+
4749
DecisionContextImpl(
4850
DecisionsHelper decisionsHelper,
4951
String domain,
5052
PollForDecisionTaskResponse decisionTask,
51-
WorkflowExecutionStartedEventAttributes startedAttributes) {
53+
WorkflowExecutionStartedEventAttributes startedAttributes,
54+
boolean enableLoggingInReplay) {
5255
this.activityClient = new ActivityDecisionContext(decisionsHelper);
5356
this.workflowContext = new WorkflowContext(domain, decisionTask, startedAttributes);
5457
this.workflowClient = new WorkflowDecisionContext(decisionsHelper, workflowContext);
5558
this.workflowClock = new ClockDecisionContext(decisionsHelper);
59+
this.enableLoggingInReplay = enableLoggingInReplay;
5660
}
5761

5862
public void setMetricsScope(Scope metricsScope) {
5963
this.metricsScope = new ReplayAwareScope(metricsScope, this, workflowClock::currentTimeMillis);
6064
}
6165

66+
@Override
67+
public boolean getEnableLoggingInReplay() {
68+
return enableLoggingInReplay;
69+
}
70+
6271
@Override
6372
public Scope getMetricsScope() {
6473
return metricsScope;

src/main/java/com/uber/cadence/internal/replay/ReplayDecider.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class ReplayDecider {
7676
ReplayWorkflow workflow,
7777
HistoryHelper historyHelper,
7878
DecisionsHelper decisionsHelper,
79-
Scope metricsScope) {
79+
Scope metricsScope,
80+
boolean enableLoggingInReplay) {
8081
this.workflow = workflow;
8182
this.historyHelper = historyHelper;
8283
this.decisionsHelper = decisionsHelper;
@@ -87,7 +88,8 @@ class ReplayDecider {
8788
decisionsHelper,
8889
domain,
8990
decisionTask,
90-
historyHelper.getWorkflowExecutionStartedEventAttributes());
91+
historyHelper.getWorkflowExecutionStartedEventAttributes(),
92+
enableLoggingInReplay);
9193
context.setMetricsScope(metricsScope);
9294
}
9395

src/main/java/com/uber/cadence/internal/replay/ReplayDecisionTaskHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.uber.cadence.internal.metrics.MetricsType;
3030
import com.uber.cadence.internal.worker.DecisionTaskHandler;
3131
import com.uber.cadence.internal.worker.DecisionTaskWithHistoryIterator;
32+
import com.uber.cadence.internal.worker.SingleWorkerOptions;
3233
import com.uber.m3.tally.Scope;
3334
import java.io.PrintWriter;
3435
import java.io.StringWriter;
@@ -44,12 +45,14 @@ public final class ReplayDecisionTaskHandler implements DecisionTaskHandler {
4445
private final ReplayWorkflowFactory workflowFactory;
4546
private final String domain;
4647
private final Scope metricsScope;
48+
private final boolean enableLoggingInReplay;
4749

4850
public ReplayDecisionTaskHandler(
49-
String domain, ReplayWorkflowFactory asyncWorkflowFactory, Scope metricsScope) {
51+
String domain, ReplayWorkflowFactory asyncWorkflowFactory, SingleWorkerOptions options) {
5052
this.domain = domain;
5153
this.workflowFactory = asyncWorkflowFactory;
52-
this.metricsScope = metricsScope;
54+
this.metricsScope = options.getMetricsScope();
55+
this.enableLoggingInReplay = options.getEnableLoggingInReplay();
5356
}
5457

5558
@Override
@@ -161,6 +164,7 @@ private ReplayDecider createDecider(HistoryHelper historyHelper) throws Exceptio
161164
WorkflowType workflowType = decisionTask.getWorkflowType();
162165
DecisionsHelper decisionsHelper = new DecisionsHelper(decisionTask);
163166
ReplayWorkflow workflow = workflowFactory.getWorkflow(workflowType);
164-
return new ReplayDecider(domain, workflow, historyHelper, decisionsHelper, metricsScope);
167+
return new ReplayDecider(
168+
domain, workflow, historyHelper, decisionsHelper, metricsScope, enableLoggingInReplay);
165169
}
166170
}

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -522,5 +522,10 @@ public Consumer<Exception> createTimer(long delaySeconds, Consumer<Exception> ca
522522
public Scope getMetricsScope() {
523523
throw new UnsupportedOperationException("not implemented");
524524
}
525+
526+
@Override
527+
public boolean getEnableLoggingInReplay() {
528+
return false;
529+
}
525530
}
526531
}

src/main/java/com/uber/cadence/internal/sync/SyncDecisionContext.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,4 +484,8 @@ private RuntimeException mapSignalWorkflowException(Exception failure) {
484484
public Scope getMetricsScope() {
485485
return context.getMetricsScope();
486486
}
487+
488+
public boolean isLoggingEnabledInReplay() {
489+
return context.getEnableLoggingInReplay();
490+
}
487491
}

src/main/java/com/uber/cadence/internal/sync/SyncWorkflowWorker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ public SyncWorkflowWorker(
5555
factory =
5656
new POJOWorkflowImplementationFactory(
5757
options.getDataConverter(), workflowThreadPool, interceptorFactory);
58-
DecisionTaskHandler taskHandler =
59-
new ReplayDecisionTaskHandler(domain, factory, options.getMetricsScope());
58+
DecisionTaskHandler taskHandler = new ReplayDecisionTaskHandler(domain, factory, options);
6059
worker = new WorkflowWorker(service, domain, taskList, options, taskHandler);
6160
this.options = options;
6261
}

src/main/java/com/uber/cadence/internal/sync/TestWorkflowEnvironmentInternal.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ public Worker newWorker(String taskList) {
118118
new WorkerOptions.Builder()
119119
.setInterceptorFactory(testEnvironmentOptions.getInterceptorFactory())
120120
.setMetricsScope(testEnvironmentOptions.getMetricsScope())
121+
.setEnableLoggingInReplay(testEnvironmentOptions.isLoggingEnabledInReplay())
121122
.build());
122123
workers.add(result);
123124
return result;

src/main/java/com/uber/cadence/internal/sync/WorkflowInternal.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import com.uber.cadence.common.RetryOptions;
2626
import com.uber.cadence.internal.common.CheckedExceptionWrapper;
2727
import com.uber.cadence.internal.common.InternalUtils;
28+
import com.uber.cadence.internal.logging.ReplayAwareLogger;
2829
import com.uber.cadence.workflow.ActivityStub;
2930
import com.uber.cadence.workflow.CancellationScope;
3031
import com.uber.cadence.workflow.ChildWorkflowOptions;
@@ -48,6 +49,8 @@
4849
import java.util.List;
4950
import java.util.Optional;
5051
import java.util.function.Supplier;
52+
import org.slf4j.Logger;
53+
import org.slf4j.LoggerFactory;
5154

5255
/**
5356
* Never reference directly. It is public only because Java doesn't have internal package support.
@@ -312,4 +315,20 @@ public static void sleep(Duration duration) {
312315
public static Scope getMetricsScope() {
313316
return getRootDecisionContext().getMetricsScope();
314317
}
318+
319+
private static boolean isLoggingEnabledInReplay() {
320+
return getRootDecisionContext().isLoggingEnabledInReplay();
321+
}
322+
323+
public static Logger getLogger(Class<?> clazz) {
324+
Logger logger = LoggerFactory.getLogger(clazz);
325+
return new ReplayAwareLogger(
326+
logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
327+
}
328+
329+
public static Logger getLogger(String name) {
330+
Logger logger = LoggerFactory.getLogger(name);
331+
return new ReplayAwareLogger(
332+
logger, WorkflowInternal::isReplaying, WorkflowInternal::isLoggingEnabledInReplay);
333+
}
315334
}

0 commit comments

Comments
 (0)