Skip to content

Commit 9eac91b

Browse files
authored
add logging context (#168)
* add logging context * fix unit tests
1 parent 8d1c881 commit 9eac91b

File tree

7 files changed

+124
-30
lines changed

7 files changed

+124
-30
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2012-2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Modifications copyright (C) 2017 Uber Technologies, Inc.
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License"). You may not
7+
* use this file except in compliance with the License. A copy of the License is
8+
* located at
9+
*
10+
* http://aws.amazon.com/apache2.0
11+
*
12+
* or in the "license" file accompanying this file. This file is distributed on
13+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
14+
* express or implied. See the License for the specific language governing
15+
* permissions and limitations under the License.
16+
*/
17+
18+
package com.uber.cadence.internal.logging;
19+
20+
public class LoggerTag {
21+
public static final String ACTIVITY_ID = "ActivityID";
22+
public static final String ACTIVITY_TYPE = "ActivityType";
23+
public static final String DOMAIN = "Domain";
24+
public static final String EVENT_ID = "EventID";
25+
public static final String EVENT_TYPE = "EventType";
26+
public static final String RUN_ID = "RunID";
27+
public static final String TASK_LIST = "TaskList";
28+
public static final String TIMER_ID = "TimerID";
29+
public static final String WORKFLOW_ID = "WorkflowID";
30+
public static final String WORKFLOW_TYPE = "WorkflowType";
31+
public static final String WORKER_ID = "WorkerID";
32+
public static final String WORKER_TYPE = "WorkerType";
33+
public static final String SIDE_EFFECT_ID = "SideEffectID";
34+
public static final String CHILD_WORKFLOW_ID = "ChildWorkflowID";
35+
}

src/main/java/com/uber/cadence/internal/sync/DeterministicRunnerImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ public WorkflowExecution getWorkflowExecution() {
417417

418418
@Override
419419
public WorkflowType getWorkflowType() {
420-
throw new UnsupportedOperationException("not implemented");
420+
return new WorkflowType().setName("dummy-workflow");
421421
}
422422

423423
@Override
@@ -443,22 +443,22 @@ public int getExecutionStartToCloseTimeoutSeconds() {
443443

444444
@Override
445445
public String getTaskList() {
446-
throw new UnsupportedOperationException("not implemented");
446+
return "dummy-task-list";
447447
}
448448

449449
@Override
450450
public String getDomain() {
451-
throw new UnsupportedOperationException("not implemented");
451+
return "dummy-domain";
452452
}
453453

454454
@Override
455455
public String getWorkflowId() {
456-
throw new UnsupportedOperationException("not implemented");
456+
return "dummy-workflow-id";
457457
}
458458

459459
@Override
460460
public String getRunId() {
461-
throw new UnsupportedOperationException("not implemented");
461+
return "dummy-run-id";
462462
}
463463

464464
@Override

src/main/java/com/uber/cadence/internal/sync/WorkflowThreadImpl.java

Lines changed: 34 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package com.uber.cadence.internal.sync;
1919

20+
import com.uber.cadence.internal.logging.LoggerTag;
21+
import com.uber.cadence.internal.replay.DecisionContext;
2022
import com.uber.cadence.workflow.Promise;
2123
import java.io.PrintWriter;
2224
import java.io.StringWriter;
@@ -32,6 +34,7 @@
3234
import java.util.function.Supplier;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
37+
import org.slf4j.MDC;
3538

3639
class WorkflowThreadImpl implements WorkflowThread {
3740

@@ -41,22 +44,25 @@ class WorkflowThreadImpl implements WorkflowThread {
4144
*/
4245
class RunnableWrapper implements Runnable {
4346

44-
private final WorkflowThreadContext context;
47+
private final WorkflowThreadContext threadContext;
48+
private final DecisionContext decisionContext;
4549
private String originalName;
4650
private String name;
4751
private CancellationScopeImpl cancellationScope;
4852

4953
RunnableWrapper(
50-
WorkflowThreadContext context,
54+
WorkflowThreadContext threadContext,
55+
DecisionContext decisionContext,
5156
String name,
5257
boolean detached,
5358
CancellationScopeImpl parent,
5459
Runnable runnable) {
55-
this.context = context;
60+
this.threadContext = threadContext;
61+
this.decisionContext = decisionContext;
5662
this.name = name;
5763
cancellationScope = new CancellationScopeImpl(detached, runnable, parent);
5864
if (context.getStatus() != Status.CREATED) {
59-
throw new IllegalStateException("context not in CREATED state");
65+
throw new IllegalStateException("threadContext not in CREATED state");
6066
}
6167
}
6268

@@ -66,14 +72,20 @@ public void run() {
6672
originalName = thread.getName();
6773
thread.setName(name);
6874
DeterministicRunnerImpl.setCurrentThreadInternal(WorkflowThreadImpl.this);
75+
decisionContext.getWorkflowId();
76+
MDC.put(LoggerTag.WORKFLOW_ID, decisionContext.getWorkflowId());
77+
MDC.put(LoggerTag.WORKFLOW_TYPE, decisionContext.getWorkflowType().getName());
78+
MDC.put(LoggerTag.RUN_ID, decisionContext.getRunId());
79+
MDC.put(LoggerTag.TASK_LIST, decisionContext.getTaskList());
80+
MDC.put(LoggerTag.DOMAIN, decisionContext.getDomain());
6981
try {
7082
// initialYield blocks thread until the first runUntilBlocked is called.
7183
// Otherwise r starts executing without control of the sync.
72-
context.initialYield();
84+
threadContext.initialYield();
7385
cancellationScope.run();
7486
} catch (DestroyWorkflowThreadError e) {
75-
if (!context.isDestroyRequested()) {
76-
context.setUnhandledException(e);
87+
if (!threadContext.isDestroyRequested()) {
88+
threadContext.setUnhandledException(e);
7789
}
7890
} catch (Error e) {
7991
// Error aborts decision, not fails the workflow.
@@ -85,10 +97,10 @@ public void run() {
8597
log.error(
8698
String.format("Workflow thread \"%s\" run failed with Error:\n%s", name, stackTrace));
8799
}
88-
context.setUnhandledException(e);
100+
threadContext.setUnhandledException(e);
89101
} catch (CancellationException e) {
90102
if (!isCancelRequested()) {
91-
context.setUnhandledException(e);
103+
threadContext.setUnhandledException(e);
92104
}
93105
log.debug(String.format("Workflow thread \"%s\" run cancelled", name));
94106
} catch (Throwable e) {
@@ -102,12 +114,13 @@ public void run() {
102114
"Workflow thread \"%s\" run failed with unhandled exception:\n%s",
103115
name, stackTrace));
104116
}
105-
context.setUnhandledException(e);
117+
threadContext.setUnhandledException(e);
106118
} finally {
107119
DeterministicRunnerImpl.setCurrentThreadInternal(null);
108-
context.setStatus(Status.DONE);
120+
threadContext.setStatus(Status.DONE);
109121
thread.setName(originalName);
110122
thread = null;
123+
MDC.clear();
111124
}
112125
}
113126

@@ -165,7 +178,14 @@ public void setName(String name) {
165178
if (name == null) {
166179
name = "workflow-" + super.hashCode();
167180
}
168-
this.task = new RunnableWrapper(context, name, detached, parentCancellationScope, runnable);
181+
this.task =
182+
new RunnableWrapper(
183+
context,
184+
runner.getDecisionContext().getContext(),
185+
name,
186+
detached,
187+
parentCancellationScope,
188+
runnable);
169189
}
170190

171191
@Override
@@ -283,8 +303,8 @@ public Throwable getUnhandledException() {
283303
}
284304

285305
/**
286-
* Evaluates function in the context of the coroutine without unblocking it. Used to get current
287-
* coroutine status, like stack trace.
306+
* Evaluates function in the threadContext of the coroutine without unblocking it. Used to get
307+
* current coroutine status, like stack trace.
288308
*
289309
* @param function Parameter is reason for current goroutine blockage.
290310
*/

src/main/java/com/uber/cadence/internal/worker/ActivityWorker.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.uber.cadence.WorkflowExecution;
2929
import com.uber.cadence.common.RetryOptions;
3030
import com.uber.cadence.internal.common.Retryer;
31+
import com.uber.cadence.internal.logging.LoggerTag;
3132
import com.uber.cadence.internal.metrics.MetricsType;
3233
import com.uber.cadence.internal.worker.ActivityTaskHandler.Result;
3334
import com.uber.cadence.serviceclient.IWorkflowService;
@@ -40,6 +41,7 @@
4041
import org.apache.thrift.TException;
4142
import org.slf4j.Logger;
4243
import org.slf4j.LoggerFactory;
44+
import org.slf4j.MDC;
4345

4446
public final class ActivityWorker implements SuspendableWorker {
4547

@@ -181,6 +183,11 @@ public void handle(
181183
Duration.ofNanos(
182184
task.task.getStartedTimestamp() - task.task.getScheduledTimestamp()));
183185

186+
MDC.put(LoggerTag.ACTIVITY_ID, task.task.getActivityId());
187+
MDC.put(LoggerTag.ACTIVITY_TYPE, task.task.getActivityType().getName());
188+
MDC.put(LoggerTag.WORKFLOW_ID, task.task.getWorkflowExecution().getWorkflowId());
189+
MDC.put(LoggerTag.RUN_ID, task.task.getWorkflowExecution().getRunId());
190+
184191
try {
185192
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_EXEC_LATENCY).start();
186193
ActivityTaskHandler.Result response =
@@ -200,6 +207,11 @@ public void handle(
200207
Stopwatch sw = options.getMetricsScope().timer(MetricsType.ACTIVITY_RESP_LATENCY).start();
201208
sendReply(task.task, new Result(null, null, cancelledRequest, null));
202209
sw.stop();
210+
} finally {
211+
MDC.remove(LoggerTag.ACTIVITY_ID);
212+
MDC.remove(LoggerTag.ACTIVITY_TYPE);
213+
MDC.remove(LoggerTag.WORKFLOW_ID);
214+
MDC.remove(LoggerTag.RUN_ID);
203215
}
204216
}
205217

src/main/java/com/uber/cadence/internal/worker/PollTask.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package com.uber.cadence.internal.worker;
1919

20+
import com.uber.cadence.internal.logging.LoggerTag;
2021
import com.uber.cadence.serviceclient.IWorkflowService;
2122
import java.util.concurrent.Semaphore;
2223
import java.util.concurrent.SynchronousQueue;
2324
import java.util.concurrent.ThreadPoolExecutor;
2425
import java.util.concurrent.TimeUnit;
26+
import org.slf4j.MDC;
2527

2628
/**
2729
* Assumes that there is only one instance of PollTask per worker as it contains thread pool and
@@ -84,6 +86,8 @@ public void run() throws Exception {
8486
pollSemaphore.acquire();
8587
// we will release the semaphore in a finally clause
8688
synchronousSemaphoreRelease = true;
89+
MDC.put(LoggerTag.DOMAIN, domain);
90+
MDC.put(LoggerTag.TASK_LIST, taskList);
8791
final T task = handler.poll(service, domain, taskList);
8892
if (task == null) {
8993
return;
@@ -92,6 +96,8 @@ public void run() throws Exception {
9296
try {
9397
taskExecutor.execute(
9498
() -> {
99+
MDC.put(LoggerTag.DOMAIN, domain);
100+
MDC.put(LoggerTag.TASK_LIST, taskList);
95101
try {
96102
handler.handle(service, domain, taskList, task);
97103
} catch (Throwable ee) {
@@ -101,6 +107,8 @@ public void run() throws Exception {
101107
.uncaughtException(Thread.currentThread(), handler.wrapFailure(task, ee));
102108
} finally {
103109
pollSemaphore.release();
110+
MDC.remove(LoggerTag.DOMAIN);
111+
MDC.remove(LoggerTag.TASK_LIST);
104112
}
105113
});
106114
} catch (Error | Exception e) {
@@ -111,6 +119,8 @@ public void run() throws Exception {
111119
if (synchronousSemaphoreRelease) {
112120
pollSemaphore.release();
113121
}
122+
MDC.remove(LoggerTag.DOMAIN);
123+
MDC.remove(LoggerTag.TASK_LIST);
114124
}
115125
}
116126
}

src/main/java/com/uber/cadence/internal/worker/WorkflowWorker.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.uber.cadence.common.RetryOptions;
3636
import com.uber.cadence.internal.common.Retryer;
3737
import com.uber.cadence.internal.common.WorkflowExecutionUtils;
38+
import com.uber.cadence.internal.logging.LoggerTag;
3839
import com.uber.cadence.internal.metrics.MetricsType;
3940
import com.uber.cadence.serviceclient.IWorkflowService;
4041
import com.uber.m3.tally.Stopwatch;
@@ -45,6 +46,7 @@
4546
import org.apache.thrift.TException;
4647
import org.slf4j.Logger;
4748
import org.slf4j.LoggerFactory;
49+
import org.slf4j.MDC;
4850

4951
public final class WorkflowWorker implements SuspendableWorker {
5052

@@ -188,17 +190,26 @@ private TaskHandlerImpl(DecisionTaskHandler handler) {
188190
public void handle(
189191
IWorkflowService service, String domain, String taskList, PollForDecisionTaskResponse task)
190192
throws Exception {
191-
Stopwatch sw =
192-
options.getMetricsScope().timer(MetricsType.DECISION_EXECUTION_LATENCY).start();
193-
DecisionTaskHandler.Result response =
194-
handler.handleDecisionTask(new DecisionTaskWithHistoryIteratorImpl(task));
195-
sw.stop();
196-
197-
sw = options.getMetricsScope().timer(MetricsType.DECISION_RESPONSE_LATENCY).start();
198-
sendReply(service, task.getTaskToken(), response);
199-
sw.stop();
200-
201-
options.getMetricsScope().counter(MetricsType.DECISION_TASK_COMPLETED_COUNTER).inc(1);
193+
MDC.put(LoggerTag.WORKFLOW_ID, task.getWorkflowExecution().getWorkflowId());
194+
MDC.put(LoggerTag.WORKFLOW_TYPE, task.getWorkflowType().getName());
195+
MDC.put(LoggerTag.RUN_ID, task.getWorkflowExecution().getRunId());
196+
try {
197+
Stopwatch sw =
198+
options.getMetricsScope().timer(MetricsType.DECISION_EXECUTION_LATENCY).start();
199+
DecisionTaskHandler.Result response =
200+
handler.handleDecisionTask(new DecisionTaskWithHistoryIteratorImpl(task));
201+
sw.stop();
202+
203+
sw = options.getMetricsScope().timer(MetricsType.DECISION_RESPONSE_LATENCY).start();
204+
sendReply(service, task.getTaskToken(), response);
205+
sw.stop();
206+
207+
options.getMetricsScope().counter(MetricsType.DECISION_TASK_COMPLETED_COUNTER).inc(1);
208+
} finally {
209+
MDC.remove(LoggerTag.WORKFLOW_ID);
210+
MDC.remove(LoggerTag.WORKFLOW_TYPE);
211+
MDC.remove(LoggerTag.RUN_ID);
212+
}
202213
}
203214

204215
@Override

src/test/java/com/uber/cadence/workflow/LoggerTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
package com.uber.cadence.workflow;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertTrue;
2122

2223
import ch.qos.logback.classic.LoggerContext;
2324
import ch.qos.logback.classic.spi.ILoggingEvent;
2425
import ch.qos.logback.core.read.ListAppender;
2526
import com.uber.cadence.client.WorkflowClient;
2627
import com.uber.cadence.client.WorkflowOptions;
28+
import com.uber.cadence.internal.logging.LoggerTag;
2729
import com.uber.cadence.testing.TestEnvironmentOptions;
2830
import com.uber.cadence.testing.TestEnvironmentOptions.Builder;
2931
import com.uber.cadence.testing.TestWorkflowEnvironment;
@@ -134,6 +136,10 @@ private int matchingLines(String message) {
134136
int i = 0;
135137
for (ILoggingEvent event : listAppender.list) {
136138
if (event.getFormattedMessage().contains(message)) {
139+
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.WORKFLOW_ID));
140+
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.WORKFLOW_TYPE));
141+
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.RUN_ID));
142+
assertTrue(event.getMDCPropertyMap().containsKey(LoggerTag.TASK_LIST));
137143
i++;
138144
}
139145
}

0 commit comments

Comments
 (0)