Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
b4523d8
x-lang gbek tests
damccorm Oct 7, 2025
343189f
Add java test
damccorm Oct 7, 2025
d9d8af4
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm Oct 7, 2025
8dc83ec
missing import
damccorm Oct 7, 2025
2c260ec
Move towards standardizing on base64
damccorm Oct 7, 2025
54814f6
url encoded
damccorm Oct 7, 2025
dfe121b
More doc
damccorm Oct 8, 2025
903891e
yapf
damccorm Oct 8, 2025
5258999
test cleanup
damccorm Oct 8, 2025
f8377c7
progress, kick presubmits
damccorm Oct 8, 2025
ee97593
use options
damccorm Oct 8, 2025
c88f0b8
Additional pieces
damccorm Oct 9, 2025
5819212
Add pipeline options piece
damccorm Oct 9, 2025
9d0e949
merge in master
damccorm Oct 9, 2025
d07ac31
Format
damccorm Oct 9, 2025
c27b0c9
Move gbek into own test class
damccorm Oct 9, 2025
fbecc62
Remove python -> java tests (see #36457)
damccorm Oct 9, 2025
ca2f0ec
Merge branch 'master' of https://github.com/apache/beam into users/da…
damccorm Oct 9, 2025
799e0bf
Simplify to get faster repro
damccorm Oct 10, 2025
89f9cc1
Get it working, need to figure out actual issue though
damccorm Oct 11, 2025
9efbf7c
Fix type hinting
damccorm Oct 13, 2025
b44ed88
Clean up
damccorm Oct 13, 2025
9288605
pipeline options tests
damccorm Oct 13, 2025
9aaa25e
simplify/lint
damccorm Oct 13, 2025
6c1245f
resolve gemini comments (minor)
damccorm Oct 13, 2025
4c45204
Do not merge: Validate gbek against a bunch of DF tests
damccorm Oct 13, 2025
7e1095c
Soften determinism constraints and fix build issues
damccorm Oct 13, 2025
749e4d4
Update util.py
damccorm Oct 13, 2025
7eabee7
Try non-empty check
damccorm Oct 14, 2025
e0d6f4d
Merge branch 'users/damccorm/validateGbek' of https://github.com/apac…
damccorm Oct 14, 2025
1479726
Use hashmap which can handle nullness
damccorm Oct 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV1.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 4,
"modification": 5,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Java_DataflowV2.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"modification": 6,
"modification": 7,
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
{}
{
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"revision": 1
"revision": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
"https://github.com/apache/beam/pull/36138": "Cleanly separating v1 worker and v2 sdk harness container image handling",
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2,
"modification": 3,
"https://github.com/apache/beam/pull/34294": "noting that PR #34294 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,4 @@
{
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
"comment": "Modify this file in a trivial way to cause this test suite to run",
"https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test",
"https://github.com/apache/beam/pull/31268": "noting that PR #31268 should run this test",
"https://github.com/apache/beam/pull/31490": "noting that PR #31490 should run this test",
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
"comment": "Modify this file in a trivial way to cause this test suite to run"
}
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 31
"modification": 32
}

4 changes: 4 additions & 0 deletions .github/trigger_files/beam_PostCommit_Python_Arm.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 3
}
1 change: 1 addition & 0 deletions examples/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ apply from: "$projectDir/common.gradle"

dependencies {
implementation enforcedPlatform(library.java.google_cloud_platform_libraries_bom)
implementation library.java.google_cloud_secret_manager
implementation library.java.vendored_guava_32_1_2_jre
if (project.findProperty('testJavaVersion') == '21' || JavaVersion.current().compareTo(JavaVersion.VERSION_21) >= 0) {
// this dependency is a provided dependency for kafka-avro-serializer. It is not needed to compile with Java<=17
Expand Down
10 changes: 8 additions & 2 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def legacyPipelineOptions = [
"--region=${gcpRegion}",
"--tempRoot=${dataflowValidatesTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--experiments=enable_lineage"
"--experiments=enable_lineage",
"--gbek=type:GcpSecret;version_name:projects/apache-beam-testing/secrets/gbek_secret_tests_dannystest/versions/latest",
]

// For the following test tasks using legacy worker, set workerHarnessContainerImage to empty to
Expand All @@ -183,7 +184,8 @@ def runnerV2CommonPipelineOptions = [
"--tempRoot=${dataflowValidatesTempRoot}",
"--experiments=use_unified_worker,use_runner_v2",
"--firestoreDb=${firestoreDb}",
"--experiments=enable_lineage"
"--experiments=enable_lineage",
"--gbek=type:GcpSecret;version_name:projects/apache-beam-testing/secrets/gbek_secret_tests_dannystest/versions/latest"
]

def runnerV2PipelineOptions = runnerV2CommonPipelineOptions + [
Expand Down Expand Up @@ -486,6 +488,7 @@ createCrossLanguageValidatesRunnerTask(
"--tempRoot=${dataflowValidatesTempRoot}",
"--sdkContainerImage=${dockerJavaImageContainer}:${dockerTag}",
"--sdkHarnessContainerImageOverrides=.*python.*,${dockerPythonImageContainer}:${dockerTag}",
"--gbek=type:GcpSecret;version_name:projects/apache-beam-testing/secrets/gbek_secret_tests_dannystest/versions/latest"
],
pytestOptions: [
"--capture=no",
Expand Down Expand Up @@ -600,6 +603,7 @@ task googleCloudPlatformLegacyWorkerIntegrationTest(type: Test, dependsOn: copyG
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
"--firestoreDb=${firestoreDb}",
"--gbek=type:GcpSecret;version_name:projects/apache-beam-testing/secrets/gbek_secret_tests_dannystest/versions/latest",
])

include '**/*IT.class'
Expand Down Expand Up @@ -637,6 +641,7 @@ task googleCloudPlatformLegacyWorkerKmsIntegrationTest(type: Test) {
"--workerHarnessContainerImage=",
"--dataflowKmsKey=${dataflowKmsKey}",
"--firestoreDb=${firestoreDb}",
"--gbek=type:GcpSecret;version_name:projects/apache-beam-testing/secrets/gbek_secret_tests_dannystest/versions/latest",
])

include '**/*IT.class'
Expand Down Expand Up @@ -737,6 +742,7 @@ task coreSDKJavaLegacyWorkerIntegrationTest(type: Test) {
"--tempRoot=${dataflowPostCommitTempRoot}",
"--dataflowWorkerJar=${dataflowLegacyWorkerJar}",
"--workerHarnessContainerImage=",
"--gbek=type:GcpSecret;version_name:projects/apache-beam-testing/secrets/gbek_secret_tests_dannystest/versions/latest",
])

include '**/*IT.class'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByEncryptedKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*GroupByKeyIT.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*core.*ValidateRunnerXlangTest.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*extensions.*ml.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*gcp.*" />
<suppress id="ForbidNonVendoredGrpcProtobuf" files=".*sdk.*io.*googleads.*DummyRateLimitPolicy\.java" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,9 @@ public void setup() {
}

@ProcessElement
@SuppressWarnings("nullness")
public void processElement(ProcessContext c) throws Exception {
java.util.Map<K, java.util.List<V>> decryptedKvs = new java.util.HashMap<>();
java.util.HashMap<K, java.util.List<V>> decryptedKvs = new java.util.HashMap<>();
for (KV<byte[], byte[]> encryptedKv : c.element().getValue()) {
byte[] iv = Arrays.copyOfRange(encryptedKv.getKey(), 0, 12);
GCMParameterSpec gcmParameterSpec = new GCMParameterSpec(128, iv);
Expand All @@ -251,7 +252,8 @@ public void processElement(ProcessContext c) throws Exception {
byte[] decryptedKeyBytes = this.cipher.doFinal(encryptedKey);
K key = decode(this.keyCoder, decryptedKeyBytes);

if (key != null) {
// If somehow the key was decoded to null, but the byte string is non-empty, throw.
if (key != null || decryptedKeyBytes == null || decryptedKeyBytes.length == 0) {
if (!decryptedKvs.containsKey(key)) {
decryptedKvs.put(key, new java.util.ArrayList<>());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@
*/
package org.apache.beam.sdk.util.construction;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import com.google.cloud.secretmanager.v1.ProjectName;
import com.google.cloud.secretmanager.v1.SecretManagerServiceClient;
import com.google.cloud.secretmanager.v1.SecretName;
import com.google.cloud.secretmanager.v1.SecretPayload;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.security.SecureRandom;
import java.util.Arrays;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.UsesJavaExpansionService;
import org.apache.beam.sdk.testing.UsesPythonExpansionService;
import org.apache.beam.sdk.testing.ValidatesRunner;
Expand All @@ -42,8 +54,13 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand Down Expand Up @@ -286,6 +303,118 @@ public void test() {
}
}

/**
* Motivation behind GroupByKeyWithGbekTest.
*
* <p>Target transform – GroupByKey
* (https://beam.apache.org/documentation/programming-guide/#groupbykey) Test scenario – Grouping
* a collection of KV<K,V> to a collection of KV<K, Iterable<V>> by key Boundary conditions
* checked – –> PCollection<KV<?, ?>> to external transforms –> PCollection<KV<?, Iterable<?>>>
* from external transforms while using GroupByEncryptedKey overrides
*/
@RunWith(JUnit4.class)
public static class GroupByKeyWithGbekTest extends ValidateRunnerXlangTestBase {
@Rule public ExpectedException thrown = ExpectedException.none();
private static final String PROJECT_ID = "apache-beam-testing";
private static final String SECRET_ID = "gbek-test";
private static String gcpSecretVersionName;
private static String secretId;

@BeforeClass
public static void setUpClass() {
secretId = String.format("%s-%d", SECRET_ID, new SecureRandom().nextInt(10000));
try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
ProjectName projectName = ProjectName.of(PROJECT_ID);
SecretName secretName = SecretName.of(PROJECT_ID, secretId);

try {
client.getSecret(secretName);
} catch (Exception e) {
com.google.cloud.secretmanager.v1.Secret secret =
com.google.cloud.secretmanager.v1.Secret.newBuilder()
.setReplication(
com.google.cloud.secretmanager.v1.Replication.newBuilder()
.setAutomatic(
com.google.cloud.secretmanager.v1.Replication.Automatic.newBuilder()
.build())
.build())
.build();
client.createSecret(projectName, secretId, secret);
byte[] secretBytes = new byte[32];
new SecureRandom().nextBytes(secretBytes);
client.addSecretVersion(
secretName,
SecretPayload.newBuilder()
.setData(
ByteString.copyFrom(java.util.Base64.getUrlEncoder().encode(secretBytes)))
.build());
}
gcpSecretVersionName = secretName.toString() + "/versions/latest";
} catch (IOException e) {
gcpSecretVersionName = null;
return;
}
expansionAddr =
String.format("localhost:%s", Integer.valueOf(System.getProperty("expansionPort")));
}

@AfterClass
public static void tearDownClass() {
if (gcpSecretVersionName != null) {
try (SecretManagerServiceClient client = SecretManagerServiceClient.create()) {
SecretName secretName = SecretName.of(PROJECT_ID, secretId);
client.deleteSecret(secretName);
} catch (IOException e) {
// Do nothing.
}
}
}

@After
@Override
public void tearDown() {
// Override tearDown since we're doing our own assertion instead of relying on base class
// assertions
}

@Test
@Category({
ValidatesRunner.class,
UsesJavaExpansionService.class,
UsesPythonExpansionService.class
})
public void test() {
if (gcpSecretVersionName == null) {
// Skip test if we couldn't set up secret manager
return;
}
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName));
Pipeline pipeline = Pipeline.create(options);
groupByKeyTest(pipeline);
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
}

@Test
@Category({
ValidatesRunner.class,
UsesJavaExpansionService.class,
UsesPythonExpansionService.class
})
public void testFailure() {
thrown.expect(Exception.class);
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.setGbek("version_name:fake_secret");
Pipeline pipeline = Pipeline.create(options);
groupByKeyTest(pipeline);
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
assertThat(pipelineResult.getState(), equalTo(PipelineResult.State.DONE));
}
}

/**
* Motivation behind coGroupByKeyTest.
*
Expand Down
21 changes: 17 additions & 4 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@
# that have a destination(dest) in parser.add_argument() different
# from the flag name and whose default value is `None`.
_FLAG_THAT_SETS_FALSE_VALUE = {'use_public_ips': 'no_use_public_ips'}
# Set of options which should not be overriden when applying options from a
# different language. This is relevant when using x-lang transforms where the
# expansion service is started up with some pipeline options, and will
# impact which options are passed in to expanded transforms' expand functions.
_NON_OVERIDABLE_XLANG_OPTIONS = ['runner', 'experiments']


def _static_value_provider_of(value_type):
Expand Down Expand Up @@ -287,6 +292,10 @@ def _smart_split(self, values):


class PipelineOptions(HasDisplayData):
# Set of options which should not be overriden when pipeline options are
# being merged (see from_runner_api). This primarily comes up when expanding
# the Python expansion service

"""This class and subclasses are used as containers for command line options.

These classes are wrappers over the standard argparse Python module
Expand Down Expand Up @@ -592,15 +601,19 @@ def to_struct_value(o):
})

@classmethod
def from_runner_api(cls, proto_options):
def from_runner_api(cls, proto_options, original_options=None):
def from_urn(key):
assert key.startswith('beam:option:')
assert key.endswith(':v1')
return key[12:-3]

return cls(
**{from_urn(key): value
for (key, value) in proto_options.items()})
parsed = {from_urn(key): value for (key, value) in proto_options.items()}
if original_options is None:
return cls(**parsed)
for (key, value) in parsed.items():
if value and key not in _NON_OVERIDABLE_XLANG_OPTIONS:
original_options._all_options[key] = value
return original_options

def display_data(self):
return self.get_all_options(drop_default=True, retain_unknown_options=True)
Expand Down
Loading
Loading