Skip to content

Commit 3a70b4c

Browse files
authored
Add App Logs Collection feature (#10156)
Add application logs collection based on log framework instrumentation that hooks when log event are generated to be written. Log events are sent to Logs backend through Datadog agent's Event Platform (Evp) proxy. We are reusing a previous infrastructure used for uploading logs for CI Visibility (see #7082). We are adding support for Logback. This feature is opt-in through DD_APP_LOGS_COLLECTION_ENABLED config. Logs are sent as structured log layout and allow to add any additional attributes. For now we are adding directly trace correlation ids (traceId and spanId) and give the logs injection feature for free and without any setup on the log framework side. Smoke tests are based on Logs Injection tests.
1 parent c2f3bfb commit 3a70b4c

File tree

20 files changed

+395
-51
lines changed

20 files changed

+395
-51
lines changed

dd-java-agent/agent-bootstrap/src/main/java/datadog/trace/bootstrap/Agent.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import datadog.trace.api.gateway.SubscriptionService;
4848
import datadog.trace.api.git.EmbeddedGitInfoBuilder;
4949
import datadog.trace.api.git.GitInfoProvider;
50+
import datadog.trace.api.intake.Intake;
5051
import datadog.trace.api.profiling.ProfilingEnablement;
5152
import datadog.trace.api.scopemanager.ScopeListener;
5253
import datadog.trace.bootstrap.benchmark.StaticEventLogger;
@@ -129,6 +130,7 @@ private enum AgentFeature {
129130
CODE_ORIGIN(TraceInstrumentationConfig.CODE_ORIGIN_FOR_SPANS_ENABLED, false),
130131
DATA_JOBS(GeneralConfig.DATA_JOBS_ENABLED, false),
131132
AGENTLESS_LOG_SUBMISSION(GeneralConfig.AGENTLESS_LOG_SUBMISSION_ENABLED, false),
133+
APP_LOGS_COLLECTION(GeneralConfig.APP_LOGS_COLLECTION_ENABLED, false),
132134
LLMOBS(LlmObsConfig.LLMOBS_ENABLED, false),
133135
LLMOBS_AGENTLESS(LlmObsConfig.LLMOBS_AGENTLESS_ENABLED, false),
134136
FEATURE_FLAGGING(FeatureFlaggingConfig.FLAGGING_PROVIDER_ENABLED, false);
@@ -190,6 +192,7 @@ public boolean isEnabledByDefault() {
190192
private static boolean codeOriginEnabled = false;
191193
private static boolean distributedDebuggerEnabled = false;
192194
private static boolean agentlessLogSubmissionEnabled = false;
195+
private static boolean appLogsCollectionEnabled = false;
193196
private static boolean featureFlaggingEnabled = false;
194197

195198
private static void safelySetContextClassLoader(ClassLoader classLoader) {
@@ -275,6 +278,7 @@ public static void start(
275278
exceptionReplayEnabled = isFeatureEnabled(AgentFeature.EXCEPTION_REPLAY);
276279
codeOriginEnabled = isFeatureEnabled(AgentFeature.CODE_ORIGIN);
277280
agentlessLogSubmissionEnabled = isFeatureEnabled(AgentFeature.AGENTLESS_LOG_SUBMISSION);
281+
appLogsCollectionEnabled = isFeatureEnabled(AgentFeature.APP_LOGS_COLLECTION);
278282
llmObsEnabled = isFeatureEnabled(AgentFeature.LLMOBS);
279283
featureFlaggingEnabled = isFeatureEnabled(AgentFeature.FEATURE_FLAGGING);
280284

@@ -1131,15 +1135,16 @@ private static void maybeStartFeatureFlagging(final Class<?> scoClass, final Obj
11311135
}
11321136

11331137
private static void maybeInstallLogsIntake(Class<?> scoClass, Object sco) {
1134-
if (agentlessLogSubmissionEnabled) {
1138+
if (agentlessLogSubmissionEnabled || appLogsCollectionEnabled) {
11351139
StaticEventLogger.begin("Logs Intake");
11361140

11371141
try {
11381142
final Class<?> logsIntakeSystemClass =
11391143
AGENT_CLASSLOADER.loadClass("datadog.trace.logging.intake.LogsIntakeSystem");
11401144
final Method logsIntakeInstallerMethod =
1141-
logsIntakeSystemClass.getMethod("install", scoClass);
1142-
logsIntakeInstallerMethod.invoke(null, sco);
1145+
logsIntakeSystemClass.getMethod("install", scoClass, Intake.class);
1146+
logsIntakeInstallerMethod.invoke(
1147+
null, sco, agentlessLogSubmissionEnabled ? Intake.LOGS : Intake.EVENT_PLATFORM);
11431148
} catch (final Throwable e) {
11441149
log.warn("Not installing Logs Intake subsystem", e);
11451150
}

dd-java-agent/agent-logs-intake/src/main/java/datadog/trace/logging/intake/LogsDispatcher.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,8 @@ public void dispatch(List<Map<String, Object>> messages) {
8585

8686
private void flush(StringBuilder batch) {
8787
try {
88-
RequestBody requestBody = RequestBody.create(JSON, batch.toString());
88+
String json = batch.toString();
89+
RequestBody requestBody = RequestBody.create(JSON, json);
8990
RequestBody gzippedRequestBody = OkHttpUtils.gzippedRequestBodyOf(requestBody);
9091
backendApi.post("logs", gzippedRequestBody, IGNORE_RESPONSE, null, true);
9192
} catch (IOException e) {

dd-java-agent/agent-logs-intake/src/main/java/datadog/trace/logging/intake/LogsIntakeSystem.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datadog.communication.BackendApiFactory;
44
import datadog.communication.ddagent.SharedCommunicationObjects;
55
import datadog.trace.api.Config;
6+
import datadog.trace.api.intake.Intake;
67
import datadog.trace.api.logging.intake.LogsIntake;
78
import org.slf4j.Logger;
89
import org.slf4j.LoggerFactory;
@@ -11,15 +12,15 @@ public class LogsIntakeSystem {
1112

1213
private static final Logger LOGGER = LoggerFactory.getLogger(LogsIntakeSystem.class);
1314

14-
public static void install(SharedCommunicationObjects sco) {
15+
public static void install(SharedCommunicationObjects sco, Intake intake) {
1516
Config config = Config.get();
16-
if (!config.isAgentlessLogSubmissionEnabled()) {
17-
LOGGER.debug("Agentless logs intake is disabled");
17+
if (!config.isAgentlessLogSubmissionEnabled() && !config.isAppLogsCollectionEnabled()) {
18+
LOGGER.debug("Agentless logs intake and logs capture are disabled");
1819
return;
1920
}
2021

2122
BackendApiFactory apiFactory = new BackendApiFactory(config, sco);
22-
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory);
23+
LogsWriterImpl writer = new LogsWriterImpl(config, apiFactory, intake);
2324
sco.whenReady(writer::start);
2425

2526
LogsIntake.registerWriter(writer);

dd-java-agent/agent-logs-intake/src/main/java/datadog/trace/logging/intake/LogsWriterImpl.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@ public class LogsWriterImpl implements LogsWriter {
2727

2828
private final Map<String, Object> commonTags;
2929
private final BackendApiFactory apiFactory;
30+
private final Intake intake;
3031
private final BlockingQueue<Map<String, Object>> messageQueue;
3132
private final Thread messagePollingThread;
3233

33-
public LogsWriterImpl(Config config, BackendApiFactory apiFactory) {
34+
public LogsWriterImpl(Config config, BackendApiFactory apiFactory, Intake intake) {
3435
this.apiFactory = apiFactory;
36+
this.intake = intake;
3537

3638
commonTags = new HashMap<>();
3739
commonTags.put("ddsource", "java");
@@ -87,7 +89,7 @@ public void log(Map<String, Object> message) {
8789
}
8890

8991
private void logPollingLoop() {
90-
BackendApi backendApi = apiFactory.createBackendApi(Intake.LOGS);
92+
BackendApi backendApi = apiFactory.createBackendApi(intake);
9193
LogsDispatcher logsDispatcher = new LogsDispatcher(backendApi);
9294

9395
while (!Thread.currentThread().isInterrupted()) {

dd-java-agent/instrumentation/log4j/log4j-2.0/src/main/java/datadog/trace/instrumentation/log4j2/DatadogAppender.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package datadog.trace.instrumentation.log4j2;
22

3+
import datadog.trace.api.Config;
4+
import datadog.trace.api.CorrelationIdentifier;
35
import datadog.trace.api.logging.intake.LogsIntake;
46
import java.io.PrintWriter;
57
import java.io.StringWriter;
@@ -13,8 +15,11 @@ public class DatadogAppender extends AbstractAppender {
1315

1416
private static final int MAX_STACKTRACE_STRING_LENGTH = 16 * 1_024;
1517

16-
public DatadogAppender(String name, Filter filter) {
18+
private final boolean appLogsCollectionEnabled;
19+
20+
public DatadogAppender(String name, Filter filter, Config config) {
1721
super(name, filter, null);
22+
this.appLogsCollectionEnabled = config.isAppLogsCollectionEnabled();
1823
}
1924

2025
@Override
@@ -40,7 +45,6 @@ private Map<String, Object> map(final LogEvent event) {
4045
Map<String, Object> thrownLog = new HashMap<>();
4146
thrownLog.put("message", thrown.getMessage());
4247
thrownLog.put("name", thrown.getClass().getCanonicalName());
43-
4448
// TODO consider using structured stack trace layout
4549
// (see
4650
// org.apache.logging.log4j.layout.template.json.resolver.ExceptionResolver#createStackTraceResolver)
@@ -55,19 +59,29 @@ private Map<String, Object> map(final LogEvent event) {
5559

5660
log.put("thrown", thrownLog);
5761
}
58-
59-
log.put("contextMap", event.getContextMap());
6062
log.put("endOfBatch", event.isEndOfBatch());
6163
log.put("loggerFqcn", event.getLoggerFqcn());
62-
63-
StackTraceElement source = event.getSource();
64-
Map<String, Object> sourceLog = new HashMap<>();
65-
sourceLog.put("class", source.getClassName());
66-
sourceLog.put("method", source.getMethodName());
67-
sourceLog.put("file", source.getFileName());
68-
sourceLog.put("line", source.getLineNumber());
69-
log.put("source", sourceLog);
70-
64+
if (appLogsCollectionEnabled) {
65+
// skip log source for now as this is expensive
66+
// will be later introduce with Log Origin and optimisations
67+
String traceId = CorrelationIdentifier.getTraceId();
68+
if (traceId != null && !traceId.equals("0")) {
69+
log.put("dd.trace_id", traceId);
70+
}
71+
String spanId = CorrelationIdentifier.getSpanId();
72+
if (spanId != null && !spanId.equals("0")) {
73+
log.put("dd.span_id", spanId);
74+
}
75+
} else {
76+
log.put("contextMap", event.getContextMap());
77+
StackTraceElement source = event.getSource();
78+
Map<String, Object> sourceLog = new HashMap<>();
79+
sourceLog.put("class", source.getClassName());
80+
sourceLog.put("method", source.getMethodName());
81+
sourceLog.put("file", source.getFileName());
82+
sourceLog.put("line", source.getLineNumber());
83+
log.put("source", sourceLog);
84+
}
7185
return log;
7286
}
7387
}

dd-java-agent/instrumentation/log4j/log4j-2.0/src/main/java/datadog/trace/instrumentation/log4j2/LoggerConfigInstrumentation.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@ public LoggerConfigInstrumentation() {
2424

2525
@Override
2626
public boolean isApplicable(Set<TargetSystem> enabledSystems) {
27-
return super.isApplicable(enabledSystems)
28-
&& InstrumenterConfig.get().isAgentlessLogSubmissionEnabled();
27+
return (super.isApplicable(enabledSystems)
28+
&& InstrumenterConfig.get().isAgentlessLogSubmissionEnabled())
29+
|| InstrumenterConfig.get().isAppLogsCollectionEnabled();
2930
}
3031

3132
@Override
@@ -62,10 +63,10 @@ public static void onExit(@Advice.This LoggerConfig loggerConfig) {
6263
}
6364
}
6465

65-
DatadogAppender appender = new DatadogAppender("datadog", null);
66+
Config config = Config.get();
67+
DatadogAppender appender = new DatadogAppender("datadog", null, config);
6668
appender.start();
6769

68-
Config config = Config.get();
6970
Level level = Level.valueOf(config.getAgentlessLogSubmissionLevel());
7071
loggerConfig.addAppender(appender, level, null);
7172
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import datadog.trace.agent.test.InstrumentationSpecification
2+
import datadog.trace.api.config.GeneralConfig
3+
import datadog.trace.api.logging.intake.LogsIntake
4+
import datadog.trace.api.logging.intake.LogsWriter
5+
import org.apache.logging.log4j.LogManager
6+
import org.apache.logging.log4j.core.appender.AbstractAppender
7+
import org.junit.jupiter.api.Assumptions
8+
import spock.util.environment.Jvm
9+
10+
class Log4jDatadogAppenderAppLogCollectionTest extends InstrumentationSpecification {
11+
12+
private static DummyLogsWriter logsWriter
13+
14+
def setupSpec() {
15+
logsWriter = new DummyLogsWriter()
16+
LogsIntake.registerWriter(logsWriter)
17+
}
18+
19+
@Override
20+
void configurePreAgent() {
21+
super.configurePreAgent()
22+
injectSysConfig(GeneralConfig.APP_LOGS_COLLECTION_ENABLED, "true")
23+
}
24+
25+
def "test datadog appender registration"() {
26+
setup:
27+
ensureLog4jVersionCompatibleWithCurrentJVM()
28+
29+
def logger = LogManager.getLogger(Log4jDatadogAppenderAppLogCollectionTest)
30+
31+
when:
32+
logger.error("A test message")
33+
34+
then:
35+
!logsWriter.messages.empty
36+
37+
def message = logsWriter.messages.poll()
38+
"A test message" == message.get("message")
39+
"ERROR" == message.get("level")
40+
"Log4jDatadogAppenderAppLogCollectionTest" == message.get("loggerName")
41+
}
42+
43+
private static ensureLog4jVersionCompatibleWithCurrentJVM() {
44+
try {
45+
// init class to see if UnsupportedClassVersionError gets thrown
46+
AbstractAppender.package
47+
} catch (UnsupportedClassVersionError e) {
48+
Assumptions.assumeTrue(false, "Latest Log4j2 release requires Java 17, current JVM: " + Jvm.current.javaVersion)
49+
}
50+
}
51+
52+
private static final class DummyLogsWriter implements LogsWriter {
53+
private final Queue<Map<String, Object>> messages = new ArrayDeque<>()
54+
55+
@Override
56+
void log(Map<String, Object> message) {
57+
messages.offer(message)
58+
}
59+
60+
@Override
61+
void start() {
62+
// no op
63+
}
64+
65+
@Override
66+
void shutdown() {
67+
// no op
68+
}
69+
}
70+
}

dd-java-agent/instrumentation/logback-1.0/src/main/java/datadog/trace/instrumentation/logback/LogbackLoggerInstrumentation.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,21 @@
1818
import com.google.auto.service.AutoService;
1919
import datadog.trace.agent.tooling.Instrumenter;
2020
import datadog.trace.agent.tooling.InstrumenterModule;
21+
import datadog.trace.api.Config;
22+
import datadog.trace.api.InstrumenterConfig;
2123
import datadog.trace.bootstrap.InstrumentationContext;
2224
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
2325
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2426
import java.util.Map;
27+
import java.util.Set;
2528
import net.bytebuddy.asm.Advice;
2629

2730
@AutoService(InstrumenterModule.class)
2831
public class LogbackLoggerInstrumentation extends InstrumenterModule.Tracing
2932
implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice {
3033

3134
public LogbackLoggerInstrumentation() {
32-
super("logback");
35+
super("logback", "logs-intake", "logs-intake-logback");
3336
}
3437

3538
@Override
@@ -43,6 +46,16 @@ public Map<String, String> contextStore() {
4346
"ch.qos.logback.classic.spi.ILoggingEvent", AgentSpanContext.class.getName());
4447
}
4548

49+
@Override
50+
public boolean isApplicable(Set<TargetSystem> enabledSystems) {
51+
return super.isApplicable(enabledSystems) || Config.get().isAppLogsCollectionEnabled();
52+
}
53+
54+
@Override
55+
public String[] helperClassNames() {
56+
return new String[] {LogsIntakeHelper.class.getName()};
57+
}
58+
4659
@Override
4760
public void methodAdvice(MethodTransformer transformer) {
4861
transformer.applyAdvice(
@@ -52,10 +65,19 @@ public void methodAdvice(MethodTransformer transformer) {
5265
.and(takesArguments(1))
5366
.and(takesArgument(0, named("ch.qos.logback.classic.spi.ILoggingEvent"))),
5467
LogbackLoggerInstrumentation.class.getName() + "$CallAppendersAdvice");
68+
if (InstrumenterConfig.get().isAppLogsCollectionEnabled()) {
69+
transformer.applyAdvice(
70+
isMethod()
71+
.and(isPublic())
72+
.and(named("callAppenders"))
73+
.and(takesArguments(1))
74+
.and(takesArgument(0, named("ch.qos.logback.classic.spi.ILoggingEvent"))),
75+
LogbackLoggerInstrumentation.class.getName() + "$CallAppendersAdvice2");
76+
}
5577
}
5678

5779
public static class CallAppendersAdvice {
58-
@Advice.OnMethodEnter
80+
@Advice.OnMethodEnter(suppress = Throwable.class)
5981
public static void onEnter(@Advice.Argument(0) ILoggingEvent event) {
6082
AgentSpan span = activeSpan();
6183

@@ -65,4 +87,11 @@ public static void onEnter(@Advice.Argument(0) ILoggingEvent event) {
6587
}
6688
}
6789
}
90+
91+
public static class CallAppendersAdvice2 {
92+
@Advice.OnMethodEnter(suppress = Throwable.class)
93+
public static void onEnter(@Advice.Argument(0) ILoggingEvent event) {
94+
LogsIntakeHelper.log(event);
95+
}
96+
}
6897
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package datadog.trace.instrumentation.logback;
2+
3+
import ch.qos.logback.classic.spi.ILoggingEvent;
4+
import ch.qos.logback.classic.spi.StackTraceElementProxy;
5+
import datadog.trace.api.CorrelationIdentifier;
6+
import datadog.trace.api.logging.intake.LogsIntake;
7+
import java.util.Arrays;
8+
import java.util.HashMap;
9+
import java.util.Map;
10+
import java.util.stream.Collectors;
11+
12+
public class LogsIntakeHelper {
13+
14+
public static void log(ILoggingEvent event) {
15+
LogsIntake.log(map(event));
16+
}
17+
18+
private static Map<String, Object> map(ILoggingEvent event) {
19+
Map<String, Object> log = new HashMap<>();
20+
log.put("thread", event.getThreadName());
21+
log.put("level", event.getLevel().levelStr);
22+
log.put("loggerName", event.getLoggerName());
23+
log.put("message", event.getFormattedMessage());
24+
if (event.getThrowableProxy() != null) {
25+
Map<String, Object> thrownLog = new HashMap<>();
26+
thrownLog.put("message", event.getThrowableProxy().getMessage());
27+
thrownLog.put("name", event.getThrowableProxy().getClassName());
28+
String stackTraceString =
29+
Arrays.stream(event.getThrowableProxy().getStackTraceElementProxyArray())
30+
.map(StackTraceElementProxy::getSTEAsString)
31+
.collect(Collectors.joining(" "));
32+
thrownLog.put("extendedStackTrace", stackTraceString);
33+
log.put("thrown", thrownLog);
34+
}
35+
String traceId = CorrelationIdentifier.getTraceId();
36+
if (traceId != null && !traceId.equals("0")) {
37+
log.put("dd.trace_id", traceId);
38+
}
39+
String spanId = CorrelationIdentifier.getSpanId();
40+
if (spanId != null && !spanId.equals("0")) {
41+
log.put("dd.span_id", spanId);
42+
}
43+
return log;
44+
}
45+
}

0 commit comments

Comments
 (0)