Skip to content

Commit b5a0495

Browse files
authored
CombinePerKey with gbek (Java) (#36408)
* First pass at Java GBEK (AI generated) * Compile * Compiletest * checkstyle * tests passing * Move secret code into utils * Use secret manager from bom * Docs * Better docs * Updates * [WIP] Add pipeline option to force GBEK (Java) * Trigger some postcommits * Update triggers * Tests * test fixes * Move tests to IT * Randomized secret postfix * Update encryption mode * checkstyle * explicitly add dep * spotbugs: only create generator once * Gemini nits * [WIP] CombinePerKey with gbek (Java) * Dont use urn when gbek set * Merge in changes * Fix casing * Syntax + format * Fix test naming
1 parent 702d73e commit b5a0495

6 files changed

Lines changed: 78 additions & 6 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 2
3+
"modification": 4
44
}

.github/trigger_files/beam_PostCommit_Java_DataflowV1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
44
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
55
"comment": "Modify this file in a trivial way to cause this test suite to run",
6-
"modification": 2,
6+
"modification": 4,
77
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
88
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"modification": 4,
2+
"modification": 6,
33
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
44
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@
4141
import org.apache.beam.sdk.coders.VarIntCoder;
4242
import org.apache.beam.sdk.coders.VoidCoder;
4343
import org.apache.beam.sdk.options.PipelineOptions;
44+
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
45+
import org.apache.beam.sdk.transforms.Combine.CombineFn;
46+
import org.apache.beam.sdk.transforms.Combine.Globally;
47+
import org.apache.beam.sdk.transforms.Combine.PerKey;
4448
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
4549
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
4650
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
@@ -1499,6 +1503,7 @@ public static class PerKey<K, InputT, OutputT>
14991503
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
15001504
private final boolean fewKeys;
15011505
private final List<PCollectionView<?>> sideInputs;
1506+
private boolean shouldSkipReplacement;
15021507

15031508
private PerKey(
15041509
GlobalCombineFn<? super InputT, ?, OutputT> fn,
@@ -1508,6 +1513,7 @@ private PerKey(
15081513
this.fnDisplayData = fnDisplayData;
15091514
this.fewKeys = fewKeys;
15101515
this.sideInputs = ImmutableList.of();
1516+
this.shouldSkipReplacement = false;
15111517
}
15121518

15131519
private PerKey(
@@ -1519,6 +1525,7 @@ private PerKey(
15191525
this.fnDisplayData = fnDisplayData;
15201526
this.fewKeys = fewKeys;
15211527
this.sideInputs = sideInputs;
1528+
this.shouldSkipReplacement = false;
15221529
}
15231530

15241531
@Override
@@ -1592,6 +1599,11 @@ public List<PCollectionView<?>> getSideInputs() {
15921599
return sideInputs;
15931600
}
15941601

1602+
/** Returns whether a runner should skip replacing this transform. For runner use only */
1603+
public boolean shouldSkipReplacement() {
1604+
return this.shouldSkipReplacement;
1605+
}
1606+
15951607
/**
15961608
* Returns the side inputs of this {@link Combine}, tagged with the tag of the {@link
15971609
* PCollectionView}. The values of the returned map will be equal to the result of {@link
@@ -1604,6 +1616,13 @@ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
16041616

16051617
@Override
16061618
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
1619+
PipelineOptions options = input.getPipeline().getOptions();
1620+
String gbekOveride = options.getGbek();
1621+
if (gbekOveride != null && !gbekOveride.trim().isEmpty()) {
1622+
// Don't replace this transform if we're using GBEK since the runner may insert
1623+
// its own GBK which doesn't perform encryption.
1624+
this.shouldSkipReplacement = true;
1625+
}
16071626
return input
16081627
.apply(fewKeys ? GroupByKey.createWithFewKeys() : GroupByKey.create())
16091628
.apply(

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,25 @@ public String getUrn() {
6161
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
6262
}
6363

64+
@Override
65+
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
66+
if (transform.shouldSkipReplacement()) {
67+
return "beam:transform:combine_per_key_wrapper:v1";
68+
}
69+
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
70+
}
71+
6472
@Override
6573
public FunctionSpec translate(
6674
AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, SdkComponents components)
6775
throws IOException {
68-
if (transform.getTransform().getSideInputs().isEmpty()) {
69-
GlobalCombineFn<?, ?, ?> combineFn = transform.getTransform().getFn();
76+
Combine.PerKey underlyingCombine = transform.getTransform();
77+
if (underlyingCombine.shouldSkipReplacement()) {
78+
// Can use null for spec for generic composite.
79+
return null;
80+
}
81+
if (underlyingCombine.getSideInputs().isEmpty()) {
82+
GlobalCombineFn<?, ?, ?> combineFn = underlyingCombine.getFn();
7083
Coder<?> accumulatorCoder =
7184
extractAccumulatorCoder(combineFn, (AppliedPTransform) transform);
7285
return FunctionSpec.newBuilder()

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception {
151151

152152
// Redistribute depends on GBK under the hood and can have runner-specific implementations
153153
@Test
154-
public void redistributeWithValidGcpSecretOption() throws Exception {
154+
public void testRedistributeWithValidGcpSecretOption() throws Exception {
155155
if (gcpSecretVersionName == null) {
156156
// Skip test if we couldn't set up secret manager
157157
return;
@@ -192,4 +192,44 @@ public void testRedistributeWithInvalidGcpSecretOption() throws Exception {
192192
thrown.expect(RuntimeException.class);
193193
p.run();
194194
}
195+
196+
// Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can
197+
// fail unless it is handled specially, so we should test it specifically
198+
@Test
199+
public void testCombinePerKeyWithValidGcpSecretOption() throws Exception {
200+
if (gcpSecretVersionName == null) {
201+
// Skip test if we couldn't set up secret manager
202+
return;
203+
}
204+
PipelineOptions options = TestPipeline.testingPipelineOptions();
205+
options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName));
206+
Pipeline p = Pipeline.create(options);
207+
208+
List<KV<String, Integer>> ungroupedPairs =
209+
Arrays.asList(
210+
KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0));
211+
List<KV<String, Integer>> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 33), KV.of("k3", 0));
212+
PCollection<KV<String, Integer>> input =
213+
p.apply(
214+
Create.of(ungroupedPairs)
215+
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
216+
PCollection<KV<String, Integer>> output = input.apply(Combine.perKey(Sum.ofIntegers()));
217+
PAssert.that(output).containsInAnyOrder(sums);
218+
219+
p.run();
220+
}
221+
222+
@Test
223+
public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception {
224+
if (gcpSecretVersionName == null) {
225+
// Skip test if we couldn't set up secret manager
226+
return;
227+
}
228+
PipelineOptions options = TestPipeline.testingPipelineOptions();
229+
options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
230+
Pipeline p = Pipeline.create(options);
231+
p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers()));
232+
thrown.expect(RuntimeException.class);
233+
p.run();
234+
}
195235
}

0 commit comments

Comments
 (0)