Skip to content

Commit 65e3fdd

Browse files
committed
Model changes to allow OpenTelemtry context propagation
1 parent 80ba916 commit 65e3fdd

File tree

14 files changed

+286
-62
lines changed

14 files changed

+286
-62
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -855,8 +855,13 @@ class BeamModulePlugin implements Plugin<Project> {
855855
netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final",
856856
netty_transport : "io.netty:netty-transport:$netty_version",
857857
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
858-
opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version
858+
opentelemetry_api : "io.opentelemetry:opentelemetry-api:$opentelemetry_version",
859859
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions
860+
opentelemetry_context : "io.opentelemetry:opentelemetry-context:$opentelemetry_version",
861+
opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_version-alpha",
862+
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk:$opentelemetry_version",
863+
opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp:$opentelemetry_version",
864+
opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:$opentelemetry_version",
860865
postgres : "org.postgresql:postgresql:$postgres_version",
861866
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
862867
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",

model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,8 @@ message Elements {
761761
// extensible and backward compatible
762762
message ElementMetadata {
763763
optional DrainMode.Enum drain = 1;
764+
optional string traceparent = 2;
765+
optional string tracestate = 3;
764766
}
765767

766768
// Represent the encoded user timer for a given instruction, transform and

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ dependencies {
112112
implementation library.java.google_http_client
113113
implementation library.java.google_http_client_gson
114114
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
115+
implementation library.java.opentelemetry_context
115116
implementation library.java.hamcrest
116117
implementation library.java.jackson_annotations
117118
implementation library.java.jackson_core

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2121

2222
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23+
import io.opentelemetry.context.Context;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.io.OutputStream;
@@ -1403,6 +1404,11 @@ public PaneInfo getPaneInfo() {
14031404
return null;
14041405
}
14051406

1407+
@Override
1408+
public @Nullable Context getContext() {
1409+
return null;
1410+
}
1411+
14061412
@Override
14071413
public @Nullable Long getRecordOffset() {
14081414
return null;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.Objects;
@@ -64,6 +65,11 @@ public boolean causedByDrain() {
6465
return false;
6566
}
6667

68+
@Override
69+
public @Nullable Context getContext() {
70+
return null;
71+
}
72+
6773
@Override
6874
public Iterable<WindowedValue<T>> explodeWindows() {
6975
return Collections.emptyList();

runners/spark/spark_runner.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ dependencies {
167167
implementation library.java.jackson_annotations
168168
implementation library.java.slf4j_api
169169
implementation library.java.joda_time
170+
implementation library.java.opentelemetry_context
170171
implementation library.java.commons_lang3
171172
implementation library.java.args4j
172173
implementation project(path: ":model:fn-execution", configuration: "shadow")

runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.spark.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.io.Serializable;
2122
import java.util.Collection;
2223
import java.util.Collections;
@@ -115,6 +116,11 @@ public PaneInfo getPaneInfo() {
115116
return null;
116117
}
117118

119+
@Override
120+
public @Nullable Context getContext() {
121+
return null;
122+
}
123+
118124
@Override
119125
public boolean causedByDrain() {
120126
return false;

sdks/java/core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ dependencies {
9898
shadow library.java.jackson_databind
9999
shadow platform(library.java.opentelemetry_bom)
100100
shadow library.java.opentelemetry_api
101+
shadow library.java.opentelemetry_context
101102
shadow library.java.slf4j_api
102103
shadow library.java.snappy_java
103104
shadow library.java.joda_time

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWind
490490
getMutableOutput(tag)
491491
.add(
492492
ValueInSingleWindow.of(
493-
output, timestamp, window, PaneInfo.NO_FIRING, null, null));
493+
output, timestamp, window, PaneInfo.NO_FIRING, null, null, null));
494494
}
495495
};
496496
}
@@ -623,7 +623,7 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
623623
getMutableOutput(tag)
624624
.add(
625625
ValueInSingleWindow.of(
626-
output, timestamp, element.getWindow(), element.getPaneInfo(), null, null));
626+
output, timestamp, element.getWindow(), element.getPaneInfo(), null, null, null));
627627
}
628628

629629
@Override
@@ -635,7 +635,7 @@ public <T> void outputWindowedValue(
635635
PaneInfo paneInfo) {
636636
for (BoundedWindow w : windows) {
637637
getMutableOutput(tag)
638-
.add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null));
638+
.add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null, null));
639639
}
640640
}
641641
}

sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.values;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.util.Collection;
2122
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2223
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -50,5 +51,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {
5051

5152
OutputBuilder<T> setCausedByDrain(boolean causedByDrain);
5253

54+
OutputBuilder<T> setContext(@Nullable Context context);
55+
5356
void output();
5457
}

0 commit comments

Comments
 (0)