diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 103405a57931..378fb8b29c97 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -632,6 +632,12 @@ class BeamModulePlugin implements Plugin { def nemo_version = "0.1" // [bomupgrader] determined by: io.grpc:grpc-netty, consistent with: google_cloud_platform_libraries_bom def netty_version = "4.1.110.Final" + // [bomupgrader] determined by: io.opentelemetry:opentelemetry-sdk, consistent with: google_cloud_platform_libraries_bom + def opentelemetry_version = "1.47.0" + // [manual] determined by: https://github.com/open-telemetry/opentelemetry-java-contrib/releases, consistent with: opentelemetry_instrumentation_bom + def opentelemetry_contrib_version = "1.44.0" + // [manual] determined by: https://github.com/open-telemetry/opentelemetry-java-instrumentation/releases, consistent with: opentelemetry_bom + def opentelemetry_instrumentation_version = "2.13.3" def postgres_version = "42.2.16" // [bomupgrader] determined by: com.google.protobuf:protobuf-java, consistent with: google_cloud_platform_libraries_bom def protobuf_version = "4.29.4" @@ -851,6 +857,11 @@ class BeamModulePlugin implements Plugin { netty_tcnative_boringssl_static : "io.netty:netty-tcnative-boringssl-static:2.0.52.Final", netty_transport : "io.netty:netty-transport:$netty_version", netty_transport_native_epoll : "io.netty:netty-transport-native-epoll:$netty_version", + opentelemetry_api : "io.opentelemetry:opentelemetry-api", // google_cloud_platform_libraries_bom sets version + opentelemetry_bom : "io.opentelemetry:opentelemetry-bom-alpha:$opentelemetry_version-alpha", // opentelemetry_contrib_gcp_auth_extension requires alpha + opentelemetry_contrib_gcp_auth_extension : "io.opentelemetry.contrib:opentelemetry-gcp-auth-extension:$opentelemetry_contrib_version-alpha:shadow", + opentelemetry_instrumentation_bom : "io.opentelemetry.instrumentation:opentelemetry-instrumentation-bom-alpha:$opentelemetry_instrumentation_version-alpha", // opentelemetry_contrib_gcp_auth_extension requires alpha + opentelemetry_javaagent : "io.opentelemetry.javaagent:opentelemetry-javaagent", // opentelemetry_instrumentation_bom sets version postgres : "org.postgresql:postgresql:$postgres_version", protobuf_java : "com.google.protobuf:protobuf-java:$protobuf_version", protobuf_java_util : "com.google.protobuf:protobuf-java-util:$protobuf_version", diff --git a/scripts/tools/bomupgrader.py b/scripts/tools/bomupgrader.py index 23de807a4faf..a759bae827ba 100644 --- a/scripts/tools/bomupgrader.py +++ b/scripts/tools/bomupgrader.py @@ -52,6 +52,7 @@ class BeamModulePluginProcessor: "grpc": "io.grpc:grpc-netty", # use "grpc-netty" to pick up proper netty version "netty": "io.netty:netty-transport", + "opentelemetry": "io.opentelemetry:opentelemetry-sdk", "protobuf": "com.google.protobuf:protobuf-java" } # dependencies managed by GCP-BOM that used the dependencies in KNOWN_DEPS diff --git a/sdks/java/container/Dockerfile b/sdks/java/container/Dockerfile index c43eb0cb8c02..eb3ccba43432 100644 --- a/sdks/java/container/Dockerfile +++ b/sdks/java/container/Dockerfile @@ -30,6 +30,8 @@ ADD target/jcl-over-slf4j.jar /opt/apache/beam/jars/ ADD target/log4j-over-slf4j.jar /opt/apache/beam/jars/ ADD target/log4j-to-slf4j.jar /opt/apache/beam/jars/ ADD target/beam-sdks-java-harness.jar /opt/apache/beam/jars/ +ADD target/opentelemetry-gcp-auth-extension.jar /opt/opentelemetry/extensions/gcp-auth/ +ADD target/opentelemetry-javaagent.jar /opt/apache/beam/jars/ # Required to use jamm as a javaagent to get accurate object size measuring # COPY fails if file is not found, so use a wildcard for open-module-agent.jar diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 20283740ca0f..244e944e658d 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -52,9 +52,11 @@ const ( disableJammAgentOption = "disable_jamm_agent" enableGoogleCloudProfilerOption = "enable_google_cloud_profiler" enableGoogleCloudHeapSamplingOption = "enable_google_cloud_heap_sampling" + enableOpenTelemetryAgentOption = "enable_open_telemetry_agent" googleCloudProfilerAgentBaseArgs = "-agentpath:/opt/google_cloud_profiler/profiler_java_agent.so=-logtostderr,-cprof_service=%s,-cprof_service_version=%s" googleCloudProfilerAgentHeapArgs = googleCloudProfilerAgentBaseArgs + ",-cprof_enable_heap_sampling,-cprof_heap_sampling_interval=2097152" jammAgentArgs = "-javaagent:/opt/apache/beam/jars/jamm.jar" + openTelemetryAgentArgs = "-javaagent:/opt/apache/beam/jars/opentelemetry-javaagent.jar" ) func main() { @@ -184,6 +186,7 @@ func main() { lim = HeapSizeLimit(size) } + env := map[string]string{} args := []string{ "-Xmx" + strconv.FormatUint(lim, 10), // ParallelGC the most adequate for high throughput and lower CPU utilization @@ -223,6 +226,19 @@ func main() { args = append(args, jammAgentArgs) } + enableOpenTelemetryAgent := strings.Contains(options, enableOpenTelemetryAgentOption) + if enableOpenTelemetryAgent { + args = append(args, openTelemetryAgentArgs) + os.Setenv("OTEL_JAVAAGENT_EXTENSIONS", "/opt/opentelemetry/extensions") + os.Setenv("OTEL_JAVA_GLOBAL_AUTOCONFIGURE_ENABLED", "true") + if metadata := info.GetMetadata(); metadata != nil { + if jobName, ok := metadata["job_name"]; ok { + os.Setenv("OTEL_SERVICE_NAME", jobName) + } + } + logger.Printf(ctx, "Enabling OpenTelemetry agent.") + } + // If heap dumping is enabled, configure the JVM to dump it on oom events. if pipelineOptions, ok := info.GetPipelineOptions().GetFields()["options"]; ok { if heapDumpOption, ok := pipelineOptions.GetStructValue().GetFields()["enableHeapDumps"]; ok { @@ -278,6 +294,26 @@ func main() { args = append(args, "--add-modules="+module.GetStringValue()) } } + + // Add OpenTelemetry properties specified in pipeline options + if properties, ok := pipelineOptions.GetStructValue().GetFields()["openTelemetryProperties"]; ok { + for key, value := range properties.GetStructValue().GetFields() { + if strings.HasPrefix(key, "otel.") { + args = append(args, "-D"+key+"="+value.GetStringValue()) + } + } + } + + // TODO(sjvanrossum): Remove this section when Dataflow sets this environment variable + // Add GOOGLE_CLOUD_PROJECT environment variable if unset + if _, ok := os.LookupEnv("GOOGLE_CLOUD_PROJECT"); !ok { + if project, ok := pipelineOptions.GetStructValue().GetFields()["project"]; ok { + if p := project.GetStringValue(); len(p) > 0 { + env["GOOGLE_CLOUD_PROJECT"] = p + logger.Printf(ctx, "Setting GOOGLE_CLOUD_PROJECT environment variable.") + } + } + } } // Automatically open modules for Java 11+ openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar" @@ -287,7 +323,7 @@ func main() { args = append(args, "org.apache.beam.fn.harness.FnHarness") logger.Printf(ctx, "Executing: java %v", strings.Join(args, " ")) - logger.Fatalf(ctx, "Java exited: %v", execx.Execute("java", args...)) + logger.Fatalf(ctx, "Java exited: %v", execx.ExecuteEnv(env, "java", args...)) } // heapSizeLimit returns 80% of the runner limit, if provided. If not provided, diff --git a/sdks/java/container/build.gradle b/sdks/java/container/build.gradle index 711b34b38b82..db3b13f33763 100644 --- a/sdks/java/container/build.gradle +++ b/sdks/java/container/build.gradle @@ -42,6 +42,9 @@ dependencies { dockerDependency library.java.log4j2_to_slf4j dockerDependency project(path: ":sdks:java:harness", configuration: "shadow") dockerDependency library.java.jamm + dockerDependency platform(library.java.opentelemetry_instrumentation_bom) + dockerDependency library.java.opentelemetry_javaagent + dockerDependency library.java.opentelemetry_contrib_gcp_auth_extension } goBuild { diff --git a/sdks/java/container/common.gradle b/sdks/java/container/common.gradle index c81a33827bef..a3352fea5d01 100644 --- a/sdks/java/container/common.gradle +++ b/sdks/java/container/common.gradle @@ -55,6 +55,8 @@ task copyDockerfileDependencies(type: Copy) { rename 'beam-sdks-java-container-agent.*.jar', 'open-module-agent.jar' rename 'beam-sdks-java-harness-.*.jar', 'beam-sdks-java-harness.jar' rename 'jamm.*.jar', 'jamm.jar' + rename 'opentelemetry-gcp-auth-extension-.*\\.jar', 'opentelemetry-gcp-auth-extension.jar' + rename 'opentelemetry-javaagent-.*\\.jar', 'opentelemetry-javaagent.jar' setDuplicatesStrategy(DuplicatesStrategy.INCLUDE) into "build/target" diff --git a/sdks/java/container/license_scripts/dep_urls_java.yaml b/sdks/java/container/license_scripts/dep_urls_java.yaml index 93f5f6fa211f..c47d051c3469 100644 --- a/sdks/java/container/license_scripts/dep_urls_java.yaml +++ b/sdks/java/container/license_scripts/dep_urls_java.yaml @@ -65,6 +65,22 @@ org.eclipse.jgit: '4.4.1.201607150455-r': license: "https://www.eclipse.org/org/documents/edl-v10.html" type: "Eclipse Distribution License - v1.0" +opentelemetry-bom: + '1.47.0': + license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.47.0/LICENSE" + type: "Apache License 2.0" +opentelemetry-bom-alpha: + '1.47.0-alpha': + license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java/v1.47.0/LICENSE" + type: "Apache License 2.0" +opentelemetry-instrumentation-bom: + '2.13.3': + license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java-instrumentation/v2.13.3/LICENSE" + type: "Apache License 2.0" +opentelemetry-instrumentation-bom-alpha: + '2.13.3-alpha': + license: "https://raw.githubusercontent.com/open-telemetry/opentelemetry-java-instrumentation/v2.13.3/LICENSE" + type: "Apache License 2.0" zstd-jni: '1.5.2-5': license: "https://raw.githubusercontent.com/luben/zstd-jni/master/LICENSE" diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index e849ae597791..f5bdb342ab8c 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -96,6 +96,8 @@ dependencies { shadow library.java.jackson_core shadow library.java.jackson_annotations shadow library.java.jackson_databind + shadow platform(library.java.opentelemetry_instrumentation_bom) + shadow library.java.opentelemetry_api shadow library.java.slf4j_api shadow library.java.snappy_java shadow library.java.joda_time diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java index ad5b1451075c..e99463deeedd 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java @@ -20,6 +20,9 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.api.OpenTelemetry; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -440,4 +443,29 @@ public Duration create(PipelineOptions options) { int getElementProcessingTimeoutMinutes(); void setElementProcessingTimeoutMinutes(int value); + + /** + * The OpenTelemetry properties that will be appended to the set of system properties for SDK + * harness instances. + */ + @Description( + "The OpenTelemetry properties that will be appended to the set of system properties for SDK " + + "harness instances.") + Map getOpenTelemetryProperties(); + + void setOpenTelemetryProperties(Map value); + + @JsonIgnore + @Hidden + @Default.InstanceFactory(GlobalOpenTelemetryFactory.class) + OpenTelemetry getOpenTelemetry(); + + void setOpenTelemetry(OpenTelemetry value); + + class GlobalOpenTelemetryFactory implements DefaultValueFactory { + @Override + public OpenTelemetry create(PipelineOptions options) { + return GlobalOpenTelemetry.get(); + } + } }