Skip to content

Commit 6326167

Browse files
authored
Allow instrumented Spark trace linked to Openlineage originated context (#7450)
Generate Spark application's trace id and root span id if OpenLineage parent context is present so that it can be linked with upstream task via SpanLink.
1 parent 14a80e9 commit 6326167

File tree

5 files changed

+272
-0
lines changed

5 files changed

+272
-0
lines changed

dd-java-agent/instrumentation/spark/spark_2.12/src/main/java/datadog/trace/instrumentation/spark/Spark212Instrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public String[] helperClassNames() {
1515
return new String[] {
1616
packageName + ".AbstractDatadogSparkListener",
1717
packageName + ".DatabricksParentContext",
18+
packageName + ".OpenlineageParentContext",
1819
packageName + ".DatadogSpark212Listener",
1920
packageName + ".RemoveEldestHashMap",
2021
packageName + ".SparkAggregatedTaskMetrics",

dd-java-agent/instrumentation/spark/spark_2.13/src/main/java/datadog/trace/instrumentation/spark/Spark213Instrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public String[] helperClassNames() {
1515
return new String[] {
1616
packageName + ".AbstractDatadogSparkListener",
1717
packageName + ".DatabricksParentContext",
18+
packageName + ".OpenlineageParentContext",
1819
packageName + ".DatadogSpark213Listener",
1920
packageName + ".RemoveEldestHashMap",
2021
packageName + ".SparkAggregatedTaskMetrics",

dd-java-agent/instrumentation/spark/src/main/java/datadog/trace/instrumentation/spark/AbstractDatadogSparkListener.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.HashMap;
2626
import java.util.List;
2727
import java.util.Map;
28+
import java.util.Optional;
2829
import java.util.Properties;
2930
import java.util.UUID;
3031
import org.apache.spark.ExceptionFailure;
@@ -187,12 +188,37 @@ private void initApplicationSpanIfNotInitialized() {
187188
}
188189

189190
captureApplicationParameters(builder);
191+
captureOpenlineageContextIfPresent(builder);
190192

191193
applicationSpan = builder.start();
192194
setDataJobsSamplingPriority(applicationSpan);
193195
applicationSpan.setMeasured(true);
194196
}
195197

198+
private void captureOpenlineageContextIfPresent(AgentTracer.SpanBuilder builder) {
199+
Optional<OpenlineageParentContext> openlineageParentContext =
200+
OpenlineageParentContext.from(sparkConf);
201+
202+
if (openlineageParentContext.isPresent()) {
203+
OpenlineageParentContext context = openlineageParentContext.get();
204+
builder.asChildOf(context);
205+
206+
builder.withSpanId(context.getChildRootSpanId());
207+
208+
log.debug(
209+
"Captured Openlineage context: {}, with child trace_id: {}, child root span id: {}",
210+
context,
211+
context.getTraceId(),
212+
context.getChildRootSpanId());
213+
214+
builder.withTag("openlineage_parent_job_namespace", context.getParentJobNamespace());
215+
builder.withTag("openlineage_parent_job_name", context.getParentJobName());
216+
builder.withTag("openlineage_parent_run_id", context.getParentRunId());
217+
} else {
218+
log.debug("Openlineage context not found");
219+
}
220+
}
221+
196222
@Override
197223
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
198224
log.info(
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package datadog.trace.instrumentation.spark;
2+
3+
import datadog.trace.api.DDSpanId;
4+
import datadog.trace.api.DDTraceId;
5+
import datadog.trace.api.sampling.PrioritySampling;
6+
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
7+
import datadog.trace.bootstrap.instrumentation.api.AgentTraceCollector;
8+
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
9+
import datadog.trace.bootstrap.instrumentation.api.PathwayContext;
10+
import java.nio.ByteBuffer;
11+
import java.nio.charset.StandardCharsets;
12+
import java.security.MessageDigest;
13+
import java.security.NoSuchAlgorithmException;
14+
import java.util.Collections;
15+
import java.util.Map;
16+
import java.util.Optional;
17+
import java.util.regex.Pattern;
18+
import org.apache.spark.SparkConf;
19+
import org.slf4j.Logger;
20+
import org.slf4j.LoggerFactory;
21+
22+
public class OpenlineageParentContext implements AgentSpan.Context {
23+
private static final Logger log = LoggerFactory.getLogger(OpenlineageParentContext.class);
24+
private static final Pattern UUID =
25+
Pattern.compile(
26+
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$");
27+
28+
private final DDTraceId traceId;
29+
private final long spanId;
30+
private final long childRootSpanId;
31+
32+
private final String parentJobNamespace;
33+
private final String parentJobName;
34+
private final String parentRunId;
35+
36+
public static final String OPENLINEAGE_PARENT_JOB_NAMESPACE =
37+
"spark.openlineage.parentJobNamespace";
38+
public static final String OPENLINEAGE_PARENT_JOB_NAME = "spark.openlineage.parentJobName";
39+
public static final String OPENLINEAGE_PARENT_RUN_ID = "spark.openlineage.parentRunId";
40+
41+
public static Optional<OpenlineageParentContext> from(SparkConf sparkConf) {
42+
if (!sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAMESPACE)
43+
|| !sparkConf.contains(OPENLINEAGE_PARENT_JOB_NAME)
44+
|| !sparkConf.contains(OPENLINEAGE_PARENT_RUN_ID)) {
45+
return Optional.empty();
46+
}
47+
48+
String parentJobNamespace = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAMESPACE);
49+
String parentJobName = sparkConf.get(OPENLINEAGE_PARENT_JOB_NAME);
50+
String parentRunId = sparkConf.get(OPENLINEAGE_PARENT_RUN_ID);
51+
52+
if (!UUID.matcher(parentRunId).matches()) {
53+
return Optional.empty();
54+
}
55+
56+
return Optional.of(
57+
new OpenlineageParentContext(parentJobNamespace, parentJobName, parentRunId));
58+
}
59+
60+
OpenlineageParentContext(String parentJobNamespace, String parentJobName, String parentRunId) {
61+
log.debug(
62+
"Creating OpenlineageParentContext with parentJobNamespace: {}, parentJobName: {}, parentRunId: {}",
63+
parentJobNamespace,
64+
parentJobName,
65+
parentRunId);
66+
67+
this.parentJobNamespace = parentJobNamespace;
68+
this.parentJobName = parentJobName;
69+
this.parentRunId = parentRunId;
70+
71+
MessageDigest digest = null;
72+
try {
73+
digest = MessageDigest.getInstance("SHA-256");
74+
} catch (NoSuchAlgorithmException e) {
75+
log.debug("Unable to find SHA-256 algorithm", e);
76+
}
77+
78+
if (digest != null && parentJobNamespace != null && parentRunId != null) {
79+
traceId = computeTraceId(digest, parentJobNamespace, parentJobName, parentRunId);
80+
spanId = DDSpanId.ZERO;
81+
82+
childRootSpanId =
83+
computeChildRootSpanId(digest, parentJobNamespace, parentJobName, parentRunId);
84+
} else {
85+
traceId = DDTraceId.ZERO;
86+
spanId = DDSpanId.ZERO;
87+
88+
childRootSpanId = DDSpanId.ZERO;
89+
}
90+
91+
log.debug("Created OpenlineageParentContext with traceId: {}, spanId: {}", traceId, spanId);
92+
}
93+
94+
private long computeChildRootSpanId(
95+
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
96+
byte[] inputBytes =
97+
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
98+
byte[] hash = digest.digest(inputBytes);
99+
100+
return ByteBuffer.wrap(hash).getLong();
101+
}
102+
103+
private DDTraceId computeTraceId(
104+
MessageDigest digest, String parentJobNamespace, String parentJobName, String parentRunId) {
105+
byte[] inputBytes =
106+
(parentJobNamespace + parentJobName + parentRunId).getBytes(StandardCharsets.UTF_8);
107+
byte[] hash = digest.digest(inputBytes);
108+
109+
return DDTraceId.from(ByteBuffer.wrap(hash).getLong());
110+
}
111+
112+
@Override
113+
public DDTraceId getTraceId() {
114+
return traceId;
115+
}
116+
117+
@Override
118+
public long getSpanId() {
119+
return spanId;
120+
}
121+
122+
public long getChildRootSpanId() {
123+
return childRootSpanId;
124+
}
125+
126+
@Override
127+
public AgentTraceCollector getTraceCollector() {
128+
return AgentTracer.NoopAgentTraceCollector.INSTANCE;
129+
}
130+
131+
@Override
132+
public int getSamplingPriority() {
133+
return PrioritySampling.USER_KEEP;
134+
}
135+
136+
@Override
137+
public Iterable<Map.Entry<String, String>> baggageItems() {
138+
return Collections.<String, String>emptyMap().entrySet();
139+
}
140+
141+
@Override
142+
public PathwayContext getPathwayContext() {
143+
return null;
144+
}
145+
146+
public String getParentJobNamespace() {
147+
return parentJobNamespace;
148+
}
149+
150+
public String getParentJobName() {
151+
return parentJobName;
152+
}
153+
154+
public String getParentRunId() {
155+
return parentRunId;
156+
}
157+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package datadog.trace.instrumentation.spark
2+
3+
import datadog.trace.api.DDSpanId
4+
import org.apache.spark.SparkConf
5+
import spock.lang.Specification
6+
7+
class OpenlineageParentContextTest extends Specification {
8+
def "should create none empty OpenLineageParentContext using SHA-256 for TraceID and root span SpanId if all required fields are present" () {
9+
given:
10+
SparkConf mockSparkConf = Mock(SparkConf)
11+
12+
when:
13+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
14+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
15+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
16+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
17+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
18+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> parentRunId
19+
20+
then:
21+
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
22+
parentContext.isPresent()
23+
24+
parentContext.get().getParentJobNamespace() == "default"
25+
parentContext.get().getParentJobName() == "dag-push-to-s3-spark.upload_to_s3"
26+
parentContext.get().getParentRunId() == expectedParentRunId
27+
28+
parentContext.get().traceId.toLong() == expectedTraceId
29+
parentContext.get().spanId == DDSpanId.ZERO
30+
parentContext.get().childRootSpanId == expectedRootSpanId
31+
32+
where:
33+
parentRunId | expectedParentRunId | expectedTraceId | expectedRootSpanId
34+
"ad3b6baa-8d88-3b38-8dbe-f06232249a84" | "ad3b6baa-8d88-3b38-8dbe-f06232249a84" | 0xa475569dbce5e6cfL | 0xa475569dbce5e6cfL
35+
"ad3b6baa-8d88-3b38-8dbe-f06232249a85" | "ad3b6baa-8d88-3b38-8dbe-f06232249a85" | 0x31da6680bd14991bL | 0x31da6680bd14991bL
36+
}
37+
38+
def "should create empty OpenLineageParentContext if any required field is missing" () {
39+
given:
40+
SparkConf mockSparkConf = Mock(SparkConf)
41+
42+
when:
43+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> jobNamespacePresent
44+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> jobNamePresent
45+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runIdPresent
46+
47+
then:
48+
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
49+
parentContext.isPresent() == expected
50+
51+
where:
52+
jobNamespacePresent | jobNamePresent | runIdPresent | expected
53+
true | true | false | false
54+
true | false | true | false
55+
false | true | true | false
56+
true | false | false | false
57+
false | true | false | false
58+
false | false | true | false
59+
false | false | false | false
60+
}
61+
62+
def "should only generate a non-empty OpenlineageParentContext if parentRunId is a valid UUID" () {
63+
given:
64+
SparkConf mockSparkConf = Mock(SparkConf)
65+
66+
when:
67+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> true
68+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> true
69+
mockSparkConf.contains(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> true
70+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAMESPACE) >> "default"
71+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_JOB_NAME) >> "dag-push-to-s3-spark.upload_to_s3"
72+
mockSparkConf.get(OpenlineageParentContext.OPENLINEAGE_PARENT_RUN_ID) >> runId
73+
74+
then:
75+
Optional<OpenlineageParentContext> parentContext = OpenlineageParentContext.from(mockSparkConf)
76+
parentContext.isPresent() == expected
77+
78+
where:
79+
runId | expected
80+
"6afeb6ee-729d-37f7-ad73-b8e6f47ca694" | true
81+
" " | false
82+
"invalid-uuid" | false
83+
"6afeb6ee-729d-37f7-b8e6f47ca694" | false
84+
"6AFEB6EE-729D-37F7-AD73-B8E6F47CA694" | true
85+
}
86+
}
87+

0 commit comments

Comments
 (0)