Skip to content

Commit 69f9591

Browse files
committed
Model changes to allow OpenTelemtry context propagation
1 parent 437cd3c commit 69f9591

File tree

13 files changed

+276
-62
lines changed

13 files changed

+276
-62
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -857,8 +857,13 @@ class BeamModulePlugin implements Plugin<Project> {
857857
netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final",
858858
netty_transport : "io.netty:netty-transport:$netty_version",
859859
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
860-
opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version
860+
opentelemetry_api : "io.opentelemetry:opentelemetry-api:$opentelemetry_version",
861861
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions
862+
opentelemetry_context : "io.opentelemetry:opentelemetry-context:$opentelemetry_version",
863+
opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_version-alpha",
864+
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk:$opentelemetry_version",
865+
opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp:$opentelemetry_version",
866+
opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:$opentelemetry_version",
862867
postgres : "org.postgresql:postgresql:$postgres_version",
863868
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
864869
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ dependencies {
112112
implementation library.java.google_http_client
113113
implementation library.java.google_http_client_gson
114114
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
115+
implementation library.java.opentelemetry_context
115116
implementation library.java.hamcrest
116117
implementation library.java.jackson_annotations
117118
implementation library.java.jackson_core

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2121

2222
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23+
import io.opentelemetry.context.Context;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.io.OutputStream;
@@ -1403,6 +1404,11 @@ public PaneInfo getPaneInfo() {
14031404
return null;
14041405
}
14051406

1407+
@Override
1408+
public @Nullable Context getContext() {
1409+
return null;
1410+
}
1411+
14061412
@Override
14071413
public @Nullable Long getRecordOffset() {
14081414
return null;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.Objects;
@@ -64,6 +65,11 @@ public boolean causedByDrain() {
6465
return false;
6566
}
6667

68+
@Override
69+
public @Nullable Context getContext() {
70+
return null;
71+
}
72+
6773
@Override
6874
public Iterable<WindowedValue<T>> explodeWindows() {
6975
return Collections.emptyList();

runners/spark/spark_runner.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ dependencies {
167167
implementation library.java.jackson_annotations
168168
implementation library.java.slf4j_api
169169
implementation library.java.joda_time
170+
implementation library.java.opentelemetry_context
170171
implementation library.java.commons_lang3
171172
implementation library.java.args4j
172173
implementation project(path: ":model:fn-execution", configuration: "shadow")

runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.spark.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.io.Serializable;
2122
import java.util.Collection;
2223
import java.util.Collections;
@@ -115,6 +116,11 @@ public PaneInfo getPaneInfo() {
115116
return null;
116117
}
117118

119+
@Override
120+
public @Nullable Context getContext() {
121+
return null;
122+
}
123+
118124
@Override
119125
public boolean causedByDrain() {
120126
return false;

sdks/java/core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ dependencies {
9898
shadow library.java.jackson_databind
9999
shadow platform(library.java.opentelemetry_bom)
100100
shadow library.java.opentelemetry_api
101+
shadow library.java.opentelemetry_context
101102
shadow library.java.slf4j_api
102103
shadow library.java.snappy_java
103104
shadow library.java.joda_time

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWind
490490
getMutableOutput(tag)
491491
.add(
492492
ValueInSingleWindow.of(
493-
output, timestamp, window, PaneInfo.NO_FIRING, null, null));
493+
output, timestamp, window, PaneInfo.NO_FIRING, null, null, null));
494494
}
495495
};
496496
}
@@ -623,7 +623,7 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
623623
getMutableOutput(tag)
624624
.add(
625625
ValueInSingleWindow.of(
626-
output, timestamp, element.getWindow(), element.getPaneInfo(), null, null));
626+
output, timestamp, element.getWindow(), element.getPaneInfo(), null, null, null));
627627
}
628628

629629
@Override
@@ -635,7 +635,7 @@ public <T> void outputWindowedValue(
635635
PaneInfo paneInfo) {
636636
for (BoundedWindow w : windows) {
637637
getMutableOutput(tag)
638-
.add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null));
638+
.add(ValueInSingleWindow.of(output, timestamp, w, paneInfo, null, null, null));
639639
}
640640
}
641641
}

sdks/java/core/src/main/java/org/apache/beam/sdk/values/OutputBuilder.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.values;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.util.Collection;
2122
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
2223
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -50,5 +51,7 @@ public interface OutputBuilder<T> extends WindowedValue<T> {
5051

5152
OutputBuilder<T> setCausedByDrain(boolean causedByDrain);
5253

54+
OutputBuilder<T> setContext(@Nullable Context context);
55+
5356
void output();
5457
}

sdks/java/core/src/main/java/org/apache/beam/sdk/values/ValueInSingleWindow.java

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.values;
1919

2020
import com.google.auto.value.AutoValue;
21+
import io.opentelemetry.context.Context;
2122
import java.io.IOException;
2223
import java.io.InputStream;
2324
import java.io.OutputStream;
@@ -66,21 +67,24 @@ public T getValue() {
6667

6768
public abstract @Nullable Long getCurrentRecordOffset();
6869

70+
public abstract @Nullable Context getContext();
71+
6972
// todo #33176 specify additional metadata in the future
7073
public static <T> ValueInSingleWindow<T> of(
7174
T value,
7275
Instant timestamp,
7376
BoundedWindow window,
7477
PaneInfo paneInfo,
7578
@Nullable String currentRecordId,
76-
@Nullable Long currentRecordOffset) {
79+
@Nullable Long currentRecordOffset,
80+
@Nullable Context context) {
7781
return new AutoValue_ValueInSingleWindow<>(
78-
value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset);
82+
value, timestamp, window, paneInfo, currentRecordId, currentRecordOffset, context);
7983
}
8084

8185
public static <T> ValueInSingleWindow<T> of(
8286
T value, Instant timestamp, BoundedWindow window, PaneInfo paneInfo) {
83-
return of(value, timestamp, window, paneInfo, null, null);
87+
return of(value, timestamp, window, paneInfo, null, null, null);
8488
}
8589

8690
/** A coder for {@link ValueInSingleWindow}. */
@@ -105,11 +109,14 @@ public static <T> Coder<T> of(
105109
@Override
106110
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream)
107111
throws IOException {
108-
encode(windowedElem, outStream, Context.NESTED);
112+
encode(windowedElem, outStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
109113
}
110114

111115
@Override
112-
public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream, Context context)
116+
public void encode(
117+
ValueInSingleWindow<T> windowedElem,
118+
OutputStream outStream,
119+
org.apache.beam.sdk.coders.Coder.Context context)
113120
throws IOException {
114121
InstantCoder.of().encode(windowedElem.getTimestamp(), outStream);
115122
windowCoder.encode(windowedElem.getWindow(), outStream);
@@ -120,6 +127,10 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream,
120127
BeamFnApi.Elements.ElementMetadata.Builder builder =
121128
BeamFnApi.Elements.ElementMetadata.newBuilder();
122129
// todo #33176 specify additional metadata in the future
130+
io.opentelemetry.context.Context context1 = windowedElem.getContext();
131+
if (context1 != null) {
132+
WindowedValues.OpenTelemetryContextSerializer.write(context1, builder);
133+
}
123134
BeamFnApi.Elements.ElementMetadata metadata = builder.build();
124135
ByteArrayCoder.of().encode(metadata.toByteArray(), outStream);
125136
}
@@ -129,22 +140,27 @@ public void encode(ValueInSingleWindow<T> windowedElem, OutputStream outStream,
129140

130141
@Override
131142
public ValueInSingleWindow<T> decode(InputStream inStream) throws IOException {
132-
return decode(inStream, Context.NESTED);
143+
return decode(inStream, org.apache.beam.sdk.coders.Coder.Context.NESTED);
133144
}
134145

135146
@Override
136147
@SuppressWarnings("IgnoredPureGetter")
137-
public ValueInSingleWindow<T> decode(InputStream inStream, Context context) throws IOException {
148+
public ValueInSingleWindow<T> decode(
149+
InputStream inStream, org.apache.beam.sdk.coders.Coder.Context context) throws IOException {
138150
Instant timestamp = InstantCoder.of().decode(inStream);
139151
BoundedWindow window = windowCoder.decode(inStream);
140152
PaneInfo paneInfo = PaneInfo.PaneInfoCoder.INSTANCE.decode(inStream);
153+
io.opentelemetry.context.Context openTelemetryContext = null;
141154
if (WindowedValues.WindowedValueCoder.isMetadataSupported() && paneInfo.isElementMetadata()) {
142-
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
155+
BeamFnApi.Elements.ElementMetadata elementMetadata =
156+
BeamFnApi.Elements.ElementMetadata.parseFrom(ByteArrayCoder.of().decode(inStream));
157+
openTelemetryContext = WindowedValues.OpenTelemetryContextSerializer.read(elementMetadata);
143158
}
144159

145160
T value = valueCoder.decode(inStream, context);
146161
// todo #33176 specify additional metadata in the future
147-
return new AutoValue_ValueInSingleWindow<>(value, timestamp, window, paneInfo, null, null);
162+
return new AutoValue_ValueInSingleWindow<>(
163+
value, timestamp, window, paneInfo, null, null, openTelemetryContext);
148164
}
149165

150166
@Override

0 commit comments

Comments
 (0)