Skip to content

Commit 3afc30b

Browse files
committed
Model changes to allow OpenTelemetry context propagation
dsadasdsa dsd
1 parent 437cd3c commit 3afc30b

File tree

18 files changed

+339
-69
lines changed

18 files changed

+339
-69
lines changed

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -635,7 +635,8 @@ class BeamModulePlugin implements Plugin<Project> {
635635
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
636636
def netty_version = "4.1.124.Final"
637637
// [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk, consistent with: google_cloud_platform_libraries_bom
638-
def opentelemetry_version = "1.52.0"
638+
def opentelemetry_sdk_version = "1.56.0"
639+
def opentelemetry_contrib_version = "1.52.0"
639640
def postgres_version = "42.2.16"
640641
// [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom
641642
def protobuf_version = "4.33.0"
@@ -857,8 +858,13 @@ class BeamModulePlugin implements Plugin<Project> {
857858
netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final",
858859
netty_transport : "io.netty:netty-transport:$netty_version",
859860
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
860-
opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version
861-
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions
861+
opentelemetry_api : "io.opentelemetry:opentelemetry-api:$opentelemetry_sdk_version",
862+
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_sdk_version-alpha", // alpha required by extensions
863+
opentelemetry_context : "io.opentelemetry:opentelemetry-context:$opentelemetry_sdk_version",
864+
opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_contrib_version-alpha",
865+
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk:$opentelemetry_sdk_version",
866+
opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp:$opentelemetry_sdk_version",
867+
opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:$opentelemetry_sdk_version",
862868
postgres : "org.postgresql:postgresql:$postgres_version",
863869
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
864870
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ dependencies {
112112
implementation library.java.google_http_client
113113
implementation library.java.google_http_client_gson
114114
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
115+
implementation library.java.opentelemetry_context
115116
implementation library.java.hamcrest
116117
implementation library.java.jackson_annotations
117118
implementation library.java.jackson_core

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2121

2222
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23+
import io.opentelemetry.context.Context;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.io.OutputStream;
@@ -1403,6 +1404,11 @@ public PaneInfo getPaneInfo() {
14031404
return null;
14041405
}
14051406

1407+
@Override
1408+
public @Nullable Context getOpenTelemetryContext() {
1409+
return null;
1410+
}
1411+
14061412
@Override
14071413
public @Nullable Long getRecordOffset() {
14081414
return null;

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,18 +137,28 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
137137
@SuppressWarnings("unchecked")
138138
T result =
139139
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
140+
// todo #37030 parse context from previous stage
140141
return WindowedValues.of(
141-
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
142+
result,
143+
timestampMillis,
144+
windows,
145+
paneInfo,
146+
null,
147+
null,
148+
drainingValueFromUpstream,
149+
null);
142150
} else {
143151
notifyElementRead(data.available() + metadata.available());
152+
// todo #37030 parse context from previous stage
144153
return WindowedValues.of(
145154
decode(valueCoder, data),
146155
timestampMillis,
147156
windows,
148157
paneInfo,
149158
null,
150159
null,
151-
drainingValueFromUpstream);
160+
drainingValueFromUpstream,
161+
null);
152162
}
153163
}
154164

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,8 +129,16 @@ public Iterable<WindowedValue<ElemT>> elementsIterable() {
129129
}
130130
InputStream inputStream = message.getData().newInput();
131131
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
132+
// todo #37030 parse context from previous stage
132133
return WindowedValues.of(
133-
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
134+
value,
135+
timestamp,
136+
windows,
137+
paneInfo,
138+
null,
139+
null,
140+
drainingValueFromUpstream,
141+
null);
134142
} catch (IOException e) {
135143
throw new RuntimeException(e);
136144
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.Objects;
@@ -64,6 +65,11 @@ public boolean causedByDrain() {
6465
return false;
6566
}
6667

68+
@Override
69+
public @Nullable Context getOpenTelemetryContext() {
70+
return null;
71+
}
72+
6773
@Override
6874
public Iterable<WindowedValue<T>> explodeWindows() {
6975
return Collections.emptyList();

runners/spark/spark_runner.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ dependencies {
167167
implementation library.java.jackson_annotations
168168
implementation library.java.slf4j_api
169169
implementation library.java.joda_time
170+
implementation library.java.opentelemetry_context
170171
implementation library.java.commons_lang3
171172
implementation library.java.args4j
172173
implementation project(path: ":model:fn-execution", configuration: "shadow")

runners/spark/src/main/java/org/apache/beam/runners/spark/util/TimerUtils.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.spark.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.io.Serializable;
2122
import java.util.Collection;
2223
import java.util.Collections;
@@ -115,6 +116,11 @@ public PaneInfo getPaneInfo() {
115116
return null;
116117
}
117118

119+
@Override
120+
public @Nullable Context getOpenTelemetryContext() {
121+
return null;
122+
}
123+
118124
@Override
119125
public boolean causedByDrain() {
120126
return false;

sdks/java/container/license_scripts/dep_urls_java.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,11 @@ org.eclipse.jgit:
6666
license: "https://www.eclipse.org/org/documents/edl-v10.html"
6767
type: "Eclipse Distribution License - v1.0"
6868
opentelemetry-bom:
69-
'1.52.0':
69+
'1.56.0':
7070
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.52.0/LICENSE"
7171
type: "Apache License 2.0"
7272
opentelemetry-bom-alpha:
73-
'1.52.0-alpha':
73+
'1.56.0-alpha':
7474
license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.52.0/LICENSE"
7575
type: "Apache License 2.0"
7676
zstd-jni:

sdks/java/core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ dependencies {
9898
shadow library.java.jackson_databind
9999
shadow platform(library.java.opentelemetry_bom)
100100
shadow library.java.opentelemetry_api
101+
shadow library.java.opentelemetry_context
101102
shadow library.java.slf4j_api
102103
shadow library.java.snappy_java
103104
shadow library.java.joda_time

0 commit comments

Comments
 (0)