Skip to content

Commit 674873d

Browse files
authored
Merge pull request #36962 from stankiewicz/model
[OpenTelemetry] Model changes to allow OpenTelemetry context propagation
2 parents 268ae1a + 5223c23 commit 674873d

23 files changed

Lines changed: 338 additions & 75 deletions

File tree

buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,8 @@ class BeamModulePlugin implements Plugin<Project> {
633633
// [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom
634634
def netty_version = "4.1.130.Final"
635635
// [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk, consistent with: google_cloud_platform_libraries_bom
636-
def opentelemetry_version = "1.51.0"
636+
def opentelemetry_version = "1.56.0"
637+
def opentelemetry_contrib_version = "1.52.0"
637638
def postgres_version = "42.6.2"
638639
// [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom
639640
def protobuf_version = "4.33.2"
@@ -860,6 +861,11 @@ class BeamModulePlugin implements Plugin<Project> {
860861
netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version",
861862
opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version
862863
opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // alpha required by extensions
864+
opentelemetry_context : "io.opentelemetry:opentelemetry-context", // google_cloud_platform_libraries_bom sets version
865+
opentelemetry_gcp_auth : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_contrib_version-alpha",
866+
opentelemetry_sdk : "io.opentelemetry:opentelemetry-sdk", // google_cloud_platform_libraries_bom sets version
867+
opentelemetry_exporter_otlp : "io.opentelemetry:opentelemetry-exporter-otlp", // google_cloud_platform_libraries_bom sets version
868+
opentelemetry_extension_autoconfigure : "io.opentelemetry:opentelemetry-sdk-extension-autoconfigure", // google_cloud_platform_libraries_bom sets version
863869
postgres : "org.postgresql:postgresql:$postgres_version",
864870
protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version",
865871
protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version",

runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,8 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp)
451451
element.getPaneInfo(),
452452
element.getRecordId(),
453453
element.getRecordOffset(),
454-
element.causedByDrain()));
454+
element.causedByDrain(),
455+
element.getOpenTelemetryContext()));
455456
}
456457

457458
@Override
@@ -474,7 +475,8 @@ public <T> void outputWindowedValue(
474475
paneInfo,
475476
element.getRecordId(),
476477
element.getRecordOffset(),
477-
element.causedByDrain()));
478+
element.causedByDrain(),
479+
element.getOpenTelemetryContext()));
478480
}
479481

480482
@Override

runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,8 @@ public String getErrorContext() {
472472
read.getPaneInfo(),
473473
read.getRecordId(),
474474
read.getRecordOffset(),
475-
CausedByDrain.CAUSED_BY_DRAIN);
475+
CausedByDrain.CAUSED_BY_DRAIN,
476+
read.getOpenTelemetryContext());
476477
}
477478
elementAndRestriction = KV.of(read, restrictionState.read());
478479
watermarkEstimatorStateT = watermarkEstimatorState.read();

runners/google-cloud-dataflow-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ dependencies {
112112
permitUnusedDeclared library.java.google_http_client_gson // BEAM-11761
113113
implementation library.java.google_cloud_logging
114114
permitUnusedDeclared library.java.google_cloud_logging // BEAM-11761
115+
implementation library.java.opentelemetry_context
115116
implementation library.java.hamcrest
116117
implementation library.java.jackson_annotations
117118
implementation library.java.jackson_core

runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/BatchViewOverrides.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2121

2222
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
23+
import io.opentelemetry.context.Context;
2324
import java.io.IOException;
2425
import java.io.InputStream;
2526
import java.io.OutputStream;
@@ -1404,6 +1405,11 @@ public PaneInfo getPaneInfo() {
14041405
return null;
14051406
}
14061407

1408+
@Override
1409+
public @Nullable Context getOpenTelemetryContext() {
1410+
return null;
1411+
}
1412+
14071413
@Override
14081414
public @Nullable Long getRecordOffset() {
14091415
return null;

runners/google-cloud-dataflow-java/worker/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ dependencies {
212212
implementation library.java.jackson_core
213213
implementation library.java.jackson_databind
214214
implementation library.java.joda_time
215+
implementation library.java.opentelemetry_context
215216
implementation library.java.slf4j_api
216217
implementation library.java.vendored_grpc_1_69_0
217218
implementation library.java.error_prone_annotations

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,28 @@ protected WindowedValue<T> decodeMessage(Windmill.Message message) throws IOExce
155155
@SuppressWarnings("unchecked")
156156
T result =
157157
(T) KV.of(decode(kvCoder.getKeyCoder(), key), decode(kvCoder.getValueCoder(), data));
158+
// todo #37030 parse context from previous stage
158159
return WindowedValues.of(
159-
result, timestampMillis, windows, paneInfo, null, null, drainingValueFromUpstream);
160+
result,
161+
timestampMillis,
162+
windows,
163+
paneInfo,
164+
null,
165+
null,
166+
drainingValueFromUpstream,
167+
null);
160168
} else {
161169
notifyElementRead(data.available() + metadata.available());
170+
// todo #37030 parse context from previous stage
162171
return WindowedValues.of(
163172
decode(valueCoder, data),
164173
timestampMillis,
165174
windows,
166175
paneInfo,
167176
null,
168177
null,
169-
drainingValueFromUpstream);
178+
drainingValueFromUpstream,
179+
null);
170180
}
171181
}
172182

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ public Iterable<TimerData> timersIterable() {
159159
InputStream inputStream = message.getData().newInput();
160160
ElemT value = valueCoder.decode(inputStream, Coder.Context.OUTER);
161161
return WindowedValues.of(
162-
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream);
162+
value, timestamp, windows, paneInfo, null, null, drainingValueFromUpstream, null);
163163
} catch (RuntimeException | IOException e) {
164164
if (!skipUndecodableElements) {
165165
throw new RuntimeException(e);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/ValueInEmptyWindows.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.runners.dataflow.worker.util;
1919

20+
import io.opentelemetry.context.Context;
2021
import java.util.Collection;
2122
import java.util.Collections;
2223
import java.util.Objects;
@@ -65,6 +66,11 @@ public CausedByDrain causedByDrain() {
6566
return CausedByDrain.NORMAL;
6667
}
6768

69+
@Override
70+
public @Nullable Context getOpenTelemetryContext() {
71+
return null;
72+
}
73+
6874
@Override
6975
public Iterable<WindowedValue<T>> explodeWindows() {
7076
return Collections.emptyList();

runners/spark/spark_runner.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ dependencies {
243243
implementation library.java.jackson_annotations
244244
implementation library.java.slf4j_api
245245
implementation library.java.joda_time
246+
implementation library.java.opentelemetry_context
246247
implementation library.java.commons_lang3
247248
implementation library.java.args4j
248249
implementation project(path: ":model:fn-execution", configuration: "shadow")

0 commit comments

Comments
 (0)