Skip to content

Commit 0a5090f

Browse files
committed
Add Logs Capture feature
Add logs capture based on log framework instrumentation that hooks when log event are generated to be written. Log events are sent to Logs backend through Datadog agent's Event Platform (Evp) proxy. We are reusing a previous infrastructure used for uploading logs for CI Visibility (see #7082). We are adding support for Logback. This feature is opt-in through DD_LOGS_CAPTURE_ENABLED config. Logs are sent as structured log layout and allow to add any additional attributes. For now we are adding directly trace correlation ids (traceId and spanId) and give the logs injection feature for free and without any setup on the log framework side. Smoke tests are based on Logs Injection tests.
1 parent 41b6202 commit 0a5090f

File tree

16 files changed

+336
-37
lines changed

16 files changed

+336
-37
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import datadog.trace.api.gateway.SubscriptionService;
4848
import datadog.trace.api.git.EmbeddedGitInfoBuilder;
4949
import datadog.trace.api.git.GitInfoProvider;
50+
import datadog.trace.api.intake.Intake;
5051
import datadog.trace.api.profiling.ProfilingEnablement;
5152
import datadog.trace.api.scopemanager.ScopeListener;
5253
import datadog.trace.bootstrap.benchmark.StaticEventLogger;
@@ -129,6 +130,7 @@ private enum AgentFeature {
129130
CODE_ORIGIN(TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED, false),
130131
DATA_JOBS(GeneralConfig.DATA_JOBS_ENABLED, false),
131132
AGENTLESS_LOG_SUBMISSION(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED, false),
133+
LOGS_CAPTURE(GeneralConfig.LOGS_CAPTURE_ENABLED, false),
132134
LLMOBS(LlmObsConfig.LLMOBS_ENABLED, false),
133135
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false),
134136
FEATURE_FLAGGING(FeatureFlaggingConfig.FLAGGING_PROVIDER_ENABLED, false);
@@ -190,6 +192,7 @@ public boolean isEnabledByDefault() {
190192
private static boolean codeOriginEnabled = false;
191193
private static boolean distributedDebuggerEnabled = false;
192194
private static boolean agentlessLogSubmissionEnabled = false;
195+
private static boolean logsCaptureEnabled = false;
193196
private static boolean featureFlaggingEnabled = false;
194197

195198
private static void safelySetContextClassLoader(ClassLoader classLoader) {
@@ -275,6 +278,7 @@ public static void start(
275278
exceptionReplayEnabled = isFeatureEnabled(AgentFeature.EXCEPTION_REPLAY);
276279
codeOriginEnabled = isFeatureEnabled(AgentFeature.CODE_ORIGIN);
277280
agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION);
281+
logsCaptureEnabled = isFeatureEnabled(AgentFeature.LOGS_CAPTURE);
278282
llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS);
279283
featureFlaggingEnabled = isFeatureEnabled(AgentFeature.FEATURE_FLAGGING);
280284

@@ -1122,15 +1126,16 @@ private static void maybeStartFeatureFlagging(final Class<?> scoClass, final Obj
11221126
}
11231127

11241128
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
1125-
if (agentlessLogSubmissionEnabled) {
1129+
if (agentlessLogSubmissionEnabled || logsCaptureEnabled) {
11261130
StaticEventLogger.begin("Logs Intake");
11271131

11281132
try {
11291133
final Class<?> logsIntakeSystemClass =
11301134
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
11311135
final Method logsIntakeInstallerMethod =
1132-
logsIntakeSystemClass.getMethod("install", scoClass);
1133-
logsIntakeInstallerMethod.invoke(null, sco);
1136+
logsIntakeSystemClass.getMethod("install", scoClass, Intake.class);
1137+
logsIntakeInstallerMethod.invoke(
1138+
null, sco, agentlessLogSubmissionEnabled ? Intake.LOGS : Intake.EVENT_PLATFORM);
11341139
} catch (final Throwable e) {
11351140
log.warn("Not installing Logs Intake subsystem", e);
11361141
}

dd-java-agent/agent-logs-intake/src/main/java/datadog/trace/logging/intake/LogsDispatcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public void dispatch(List<Map<String, Object>> messages) {
8585

8686
private void flush(StringBuilder batch) {
8787
try {
88-
RequestBody requestBody = RequestBody.create(JSON, batch.toString());
88+
String json = batch.toString();
89+
RequestBody requestBody = RequestBody.create(JSON, json);
8990
RequestBody gzippedRequestBody = OkHttpUtils.gzippedRequestBodyOf(requestBody);
9091
backendApi.post("logs", gzippedRequestBody, IGNORE_RESPONSE, null, true);
9192
} catch (IOException e) {

dd-java-agent/agent-logs-intake/src/main/java/datadog/trace/logging/intake/LogsIntakeSystem.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datadog.communication.BackendApiFactory;
44
import datadog.communication.ddagent.SharedCommunicationObjects;
55
import datadog.trace.api.Config;
6+
import datadog.trace.api.intake.Intake;
67
import datadog.trace.api.logging.intake.LogsIntake;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
@@ -11,15 +12,15 @@ public class LogsIntakeSystem {
1112

1213
private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);
1314

14-
public static void install(SharedCommunicationObjects sco) {
15+
public static void install(SharedCommunicationObjects sco, Intake intake) {
1516
Config config = Config.get();
16-
if (!config.isAgentlessLogSubmissionEnabled()) {
17-
LOGGER.debug("Agentless logs intake is disabled");
17+
if (!config.isAgentlessLogSubmissionEnabled() && !config.isLogsCaptureEnabled()) {
18+
LOGGER.debug("Agentless logs intake and logs capture are disabled");
1819
return;
1920
}
2021

2122
BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
22-
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
23+
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory, intake);
2324
sco.whenReady(writer::start);
2425

2526
LogsIntake.registerWriter(writer);

dd-java-agent/agent-logs-intake/src/main/java/datadog/trace/logging/intake/LogsWriterImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ public class LogsWriterImpl implements LogsWriter {
2727

2828
private final Map<String, Object> commonTags;
2929
private final BackendApiFactory apiFactory;
30+
private final Intake intake;
3031
private final BlockingQueue<Map<String, Object>> messageQueue;
3132
private final Thread messagePollingThread;
3233

33-
public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
34+
public LogsWriterImpl(Config config, BackendApiFactory apiFactory, Intake intake) {
3435
this.apiFactory = apiFactory;
36+
this.intake = intake;
3537

3638
commonTags = new HashMap<>();
3739
commonTags.put("ddsource", "java");
@@ -87,7 +89,7 @@ public void log(Map<String, Object> message) {
8789
}
8890

8991
private void logPollingLoop() {
90-
BackendApi backendApi = apiFactory.createBackendApi(Intake.LOGS);
92+
BackendApi backendApi = apiFactory.createBackendApi(intake);
9193
LogsDispatcher logsDispatcher = new LogsDispatcher(backendApi);
9294

9395
while (!Thread.currentThread().isInterrupted()) {

dd-java-agent/instrumentation/log4j/log4j-2.0/src/main/java/datadog/trace/instrumentation/log4j2/DatadogAppender.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.instrumentation.log4j2;
22

3+
import datadog.trace.api.Config;
4+
import datadog.trace.api.CorrelationIdentifier;
35
import datadog.trace.api.logging.intake.LogsIntake;
46
import java.io.PrintWriter;
57
import java.io.StringWriter;
@@ -13,8 +15,11 @@ public class DatadogAppender extends AbstractAppender {
1315

1416
private static final int MAX_STACKTRACE_STRING_LENGTH = 16 * 1_024;
1517

16-
public DatadogAppender(String name, Filter filter) {
18+
private final boolean logsCaptureEnabled;
19+
20+
public DatadogAppender(String name, Filter filter, Config config) {
1721
super(name, filter, null);
22+
this.logsCaptureEnabled = config.isLogsCaptureEnabled();
1823
}
1924

2025
@Override
@@ -40,7 +45,6 @@ private Map<String, Object> map(final LogEvent event) {
4045
Map<String, Object> thrownLog = new HashMap<>();
4146
thrownLog.put("message", thrown.getMessage());
4247
thrownLog.put("name", thrown.getClass().getCanonicalName());
43-
4448
// TODO consider using structured stack trace layout
4549
// (see
4650
// org.apache.logging.log4j.layout.template.json.resolver.ExceptionResolver#createStackTraceResolver)
@@ -55,19 +59,29 @@ private Map<String, Object> map(final LogEvent event) {
5559

5660
log.put("thrown", thrownLog);
5761
}
58-
59-
log.put("contextMap", event.getContextMap());
6062
log.put("endOfBatch", event.isEndOfBatch());
6163
log.put("loggerFqcn", event.getLoggerFqcn());
62-
63-
StackTraceElement source = event.getSource();
64-
Map<String, Object> sourceLog = new HashMap<>();
65-
sourceLog.put("class", source.getClassName());
66-
sourceLog.put("method", source.getMethodName());
67-
sourceLog.put("file", source.getFileName());
68-
sourceLog.put("line", source.getLineNumber());
69-
log.put("source", sourceLog);
70-
64+
if (logsCaptureEnabled) {
65+
// skip log source for now as this is expensive
66+
// will be later introduce with Log Origin and optimisations
67+
String traceId = CorrelationIdentifier.getTraceId();
68+
if (traceId != null && !traceId.equals("0")) {
69+
log.put("dd.trace_id", traceId);
70+
}
71+
String spanId = CorrelationIdentifier.getSpanId();
72+
if (spanId != null && !spanId.equals("0")) {
73+
log.put("dd.span_id", spanId);
74+
}
75+
} else {
76+
log.put("contextMap", event.getContextMap());
77+
StackTraceElement source = event.getSource();
78+
Map<String, Object> sourceLog = new HashMap<>();
79+
sourceLog.put("class", source.getClassName());
80+
sourceLog.put("method", source.getMethodName());
81+
sourceLog.put("file", source.getFileName());
82+
sourceLog.put("line", source.getLineNumber());
83+
log.put("source", sourceLog);
84+
}
7185
return log;
7286
}
7387
}

dd-java-agent/instrumentation/log4j/log4j-2.0/src/main/java/datadog/trace/instrumentation/log4j2/LoggerConfigInstrumentation.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ public LoggerConfigInstrumentation() {
2424

2525
@Override
2626
public boolean isApplicable(Set<TargetSystem> enabledSystems) {
27-
return super.isApplicable(enabledSystems)
28-
&& InstrumenterConfig.get().isAgentlessLogSubmissionEnabled();
27+
return (super.isApplicable(enabledSystems)
28+
&& InstrumenterConfig.get().isAgentlessLogSubmissionEnabled())
29+
|| Config.get().isLogsCaptureEnabled();
2930
}
3031

3132
@Override
@@ -62,10 +63,10 @@ public static void onExit(@Advice.This LoggerConfig loggerConfig) {
6263
}
6364
}
6465

65-
DatadogAppender appender = new DatadogAppender("datadog", null);
66+
Config config = Config.get();
67+
DatadogAppender appender = new DatadogAppender("datadog", null, config);
6668
appender.start();
6769

68-
Config config = Config.get();
6970
Level level = Level.valueOf(config.getAgentlessLogSubmissionLevel());
7071
loggerConfig.addAppender(appender, level, null);
7172
}
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import datadog.trace.agent.test.InstrumentationSpecification
2+
import datadog.trace.api.config.GeneralConfig
3+
import datadog.trace.api.logging.intake.LogsIntake
4+
import datadog.trace.api.logging.intake.LogsWriter
5+
import java.nio.file.Files
6+
import java.nio.file.Path
7+
import org.apache.logging.log4j.LogManager
8+
import org.apache.logging.log4j.core.appender.AbstractAppender
9+
import org.junit.jupiter.api.Assumptions
10+
import spock.util.environment.Jvm
11+
12+
class Log4jDatadogAppenderLogCaptureTest extends InstrumentationSpecification {
13+
14+
private static DummyLogsWriter logsWriter
15+
16+
def setupSpec() {
17+
logsWriter = new DummyLogsWriter()
18+
LogsIntake.registerWriter(logsWriter)
19+
}
20+
21+
@Override
22+
void configurePreAgent() {
23+
super.configurePreAgent()
24+
injectSysConfig(GeneralConfig.LOGS_CAPTURE_ENABLED, "true")
25+
}
26+
27+
def "test datadog appender registration"() {
28+
setup:
29+
ensureLog4jVersionCompatibleWithCurrentJVM()
30+
31+
def logger = LogManager.getLogger(Log4jDatadogAppenderLogCaptureTest)
32+
33+
when:
34+
logger.error("A test message")
35+
36+
then:
37+
!logsWriter.messages.empty
38+
39+
def message = logsWriter.messages.poll()
40+
"A test message" == message.get("message")
41+
"ERROR" == message.get("level")
42+
"Log4jDatadogAppenderLogCaptureTest" == message.get("loggerName")
43+
}
44+
45+
private static ensureLog4jVersionCompatibleWithCurrentJVM() {
46+
try {
47+
// init class to see if UnsupportedClassVersionError gets thrown
48+
AbstractAppender.package
49+
} catch (UnsupportedClassVersionError e) {
50+
Assumptions.assumeTrue(false, "Latest Log4j2 release requires Java 17, current JVM: " + Jvm.current.javaVersion)
51+
}
52+
}
53+
54+
private static final class DummyLogsWriter implements LogsWriter {
55+
private final Queue<Map<String, Object>> messages = new ArrayDeque<>()
56+
57+
@Override
58+
void log(Map<String, Object> message) {
59+
messages.offer(message)
60+
}
61+
62+
@Override
63+
void start() {
64+
// no op
65+
}
66+
67+
@Override
68+
void shutdown() {
69+
// no op
70+
}
71+
}
72+
}

dd-java-agent/instrumentation/logback-1.0/src/main/java/datadog/trace/instrumentation/logback/LogbackLoggerInstrumentation.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,20 @@
1818
import com.google.auto.service.AutoService;
1919
import datadog.trace.agent.tooling.Instrumenter;
2020
import datadog.trace.agent.tooling.InstrumenterModule;
21+
import datadog.trace.api.Config;
2122
import datadog.trace.bootstrap.InstrumentationContext;
2223
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2324
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2425
import java.util.Map;
26+
import java.util.Set;
2527
import net.bytebuddy.asm.Advice;
2628

2729
@AutoService(InstrumenterModule.class)
2830
public class LogbackLoggerInstrumentation extends InstrumenterModule.Tracing
2931
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
3032

3133
public LogbackLoggerInstrumentation() {
32-
super("logback");
34+
super("logback", "logs-intake", "logs-intake-logback");
3335
}
3436

3537
@Override
@@ -43,6 +45,16 @@ public Map<String, String> contextStore() {
4345
"ch.qos.logback.classic.spi.ILoggingEvent", AgentSpanContext.class.getName());
4446
}
4547

48+
@Override
49+
public boolean isApplicable(Set<TargetSystem> enabledSystems) {
50+
return super.isApplicable(enabledSystems) || Config.get().isLogsCaptureEnabled();
51+
}
52+
53+
@Override
54+
public String[] helperClassNames() {
55+
return new String[] {LogsIntakeHelper.class.getName()};
56+
}
57+
4658
@Override
4759
public void methodAdvice(MethodTransformer transformer) {
4860
transformer.applyAdvice(
@@ -52,6 +64,15 @@ public void methodAdvice(MethodTransformer transformer) {
5264
.and(takesArguments(1))
5365
.and(takesArgument(0, named("ch.qos.logback.classic.spi.ILoggingEvent"))),
5466
LogbackLoggerInstrumentation.class.getName() + "$CallAppendersAdvice");
67+
if (Config.get().isLogsCaptureEnabled()) {
68+
transformer.applyAdvice(
69+
isMethod()
70+
.and(isPublic())
71+
.and(named("callAppenders"))
72+
.and(takesArguments(1))
73+
.and(takesArgument(0, named("ch.qos.logback.classic.spi.ILoggingEvent"))),
74+
LogbackLoggerInstrumentation.class.getName() + "$CallAppendersAdvice2");
75+
}
5576
}
5677

5778
public static class CallAppendersAdvice {
@@ -65,4 +86,11 @@ public static void onEnter(@Advice.Argument(0) ILoggingEvent event) {
6586
}
6687
}
6788
}
89+
90+
public static class CallAppendersAdvice2 {
91+
@Advice.OnMethodEnter
92+
public static void onEnter(@Advice.Argument(0) ILoggingEvent event) {
93+
LogsIntakeHelper.log(event);
94+
}
95+
}
6896
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package datadog.trace.instrumentation.logback;
2+
3+
import ch.qos.logback.classic.spi.ILoggingEvent;
4+
import ch.qos.logback.classic.spi.StackTraceElementProxy;
5+
import datadog.trace.api.CorrelationIdentifier;
6+
import datadog.trace.api.logging.intake.LogsIntake;
7+
import java.util.Arrays;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.stream.Collectors;
11+
12+
public class LogsIntakeHelper {
13+
14+
public static void log(ILoggingEvent event) {
15+
LogsIntake.log(map(event));
16+
}
17+
18+
private static Map<String, Object> map(ILoggingEvent event) {
19+
Map<String, Object> log = new HashMap<>();
20+
log.put("thread", event.getThreadName());
21+
log.put("level", event.getLevel().levelStr);
22+
log.put("loggerName", event.getLoggerName());
23+
log.put("message", event.getFormattedMessage());
24+
if (event.getThrowableProxy() != null) {
25+
Map<String, Object> thrownLog = new HashMap<>();
26+
thrownLog.put("message", event.getThrowableProxy().getMessage());
27+
thrownLog.put("name", event.getThrowableProxy().getClassName());
28+
String stackTraceString =
29+
Arrays.stream(event.getThrowableProxy().getStackTraceElementProxyArray())
30+
.map(StackTraceElementProxy::getSTEAsString)
31+
.collect(Collectors.joining(" "));
32+
thrownLog.put("extendedStackTrace", stackTraceString);
33+
log.put("thrown", thrownLog);
34+
}
35+
String traceId = CorrelationIdentifier.getTraceId();
36+
if (traceId != null && !traceId.equals("0")) {
37+
log.put("dd.trace_id", traceId);
38+
}
39+
String spanId = CorrelationIdentifier.getSpanId();
40+
if (spanId != null && !spanId.equals("0")) {
41+
log.put("dd.span_id", spanId);
42+
}
43+
return log;
44+
}
45+
}

0 commit comments

Comments
 (0)