Skip to content

Commit 5117734

Browse files
committed
Revert "CombinePerKey with gbek (Java) (#36408)"
This reverts commit b5a0495.
1 parent ac4b5ab commit 5117734

6 files changed

Lines changed: 6 additions & 78 deletions

File tree

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 4
3+
"modification": 2
44
}

.github/trigger_files/beam_PostCommit_Java_DataflowV1.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@
33
"https://github.com/apache/beam/pull/34902": "Introducing OutputBuilder",
44
"https://github.com/apache/beam/pull/35177": "Introducing WindowedValueReceiver to runners",
55
"comment": "Modify this file in a trivial way to cause this test suite to run",
6-
"modification": 4,
6+
"modification": 2,
77
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
88
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"modification": 6,
2+
"modification": 4,
33
"https://github.com/apache/beam/pull/35159": "moving WindowedValue and making an interface"
44
}

sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,6 @@
4141
import org.apache.beam.sdk.coders.VarIntCoder;
4242
import org.apache.beam.sdk.coders.VoidCoder;
4343
import org.apache.beam.sdk.options.PipelineOptions;
44-
import org.apache.beam.sdk.transforms.Combine.AccumulatingCombineFn;
45-
import org.apache.beam.sdk.transforms.Combine.CombineFn;
46-
import org.apache.beam.sdk.transforms.Combine.Globally;
47-
import org.apache.beam.sdk.transforms.Combine.PerKey;
4844
import org.apache.beam.sdk.transforms.CombineFnBase.AbstractGlobalCombineFn;
4945
import org.apache.beam.sdk.transforms.CombineFnBase.GlobalCombineFn;
5046
import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext;
@@ -1503,7 +1499,6 @@ public static class PerKey<K, InputT, OutputT>
15031499
private final DisplayData.ItemSpec<? extends Class<?>> fnDisplayData;
15041500
private final boolean fewKeys;
15051501
private final List<PCollectionView<?>> sideInputs;
1506-
private boolean shouldSkipReplacement;
15071502

15081503
private PerKey(
15091504
GlobalCombineFn<? super InputT, ?, OutputT> fn,
@@ -1513,7 +1508,6 @@ private PerKey(
15131508
this.fnDisplayData = fnDisplayData;
15141509
this.fewKeys = fewKeys;
15151510
this.sideInputs = ImmutableList.of();
1516-
this.shouldSkipReplacement = false;
15171511
}
15181512

15191513
private PerKey(
@@ -1525,7 +1519,6 @@ private PerKey(
15251519
this.fnDisplayData = fnDisplayData;
15261520
this.fewKeys = fewKeys;
15271521
this.sideInputs = sideInputs;
1528-
this.shouldSkipReplacement = false;
15291522
}
15301523

15311524
@Override
@@ -1599,11 +1592,6 @@ public List<PCollectionView<?>> getSideInputs() {
15991592
return sideInputs;
16001593
}
16011594

1602-
/** Returns whether a runner should skip replacing this transform. For runner use only */
1603-
public boolean shouldSkipReplacement() {
1604-
return this.shouldSkipReplacement;
1605-
}
1606-
16071595
/**
16081596
* Returns the side inputs of this {@link Combine}, tagged with the tag of the {@link
16091597
* PCollectionView}. The values of the returned map will be equal to the result of {@link
@@ -1616,13 +1604,6 @@ public Map<TupleTag<?>, PValue> getAdditionalInputs() {
16161604

16171605
@Override
16181606
public PCollection<KV<K, OutputT>> expand(PCollection<KV<K, InputT>> input) {
1619-
PipelineOptions options = input.getPipeline().getOptions();
1620-
String gbekOveride = options.getGbek();
1621-
if (gbekOveride != null && !gbekOveride.trim().isEmpty()) {
1622-
// Don't replace this transform if we're using GBEK since the runner may insert
1623-
// its own GBK which doesn't perform encryption.
1624-
this.shouldSkipReplacement = true;
1625-
}
16261607
return input
16271608
.apply(fewKeys ? GroupByKey.createWithFewKeys() : GroupByKey.create())
16281609
.apply(

sdks/java/core/src/main/java/org/apache/beam/sdk/util/construction/CombineTranslation.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -61,25 +61,12 @@ public String getUrn() {
6161
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
6262
}
6363

64-
@Override
65-
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
66-
if (transform.shouldSkipReplacement()) {
67-
return "beam:transform:combine_per_key_wrapper:v1";
68-
}
69-
return PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN;
70-
}
71-
7264
@Override
7365
public FunctionSpec translate(
7466
AppliedPTransform<?, ?, Combine.PerKey<?, ?, ?>> transform, SdkComponents components)
7567
throws IOException {
76-
Combine.PerKey underlyingCombine = transform.getTransform();
77-
if (underlyingCombine.shouldSkipReplacement()) {
78-
// Can use null for spec for generic composite.
79-
return null;
80-
}
81-
if (underlyingCombine.getSideInputs().isEmpty()) {
82-
GlobalCombineFn<?, ?, ?> combineFn = underlyingCombine.getFn();
68+
if (transform.getTransform().getSideInputs().isEmpty()) {
69+
GlobalCombineFn<?, ?, ?> combineFn = transform.getTransform().getFn();
8370
Coder<?> accumulatorCoder =
8471
extractAccumulatorCoder(combineFn, (AppliedPTransform) transform);
8572
return FunctionSpec.newBuilder()

sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyIT.java

Lines changed: 1 addition & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ public void testGroupByKeyWithInvalidGcpSecretOption() throws Exception {
151151

152152
// Redistribute depends on GBK under the hood and can have runner-specific implementations
153153
@Test
154-
public void testRedistributeWithValidGcpSecretOption() throws Exception {
154+
public void redistributeWithValidGcpSecretOption() throws Exception {
155155
if (gcpSecretVersionName == null) {
156156
// Skip test if we couldn't set up secret manager
157157
return;
@@ -192,44 +192,4 @@ public void testRedistributeWithInvalidGcpSecretOption() throws Exception {
192192
thrown.expect(RuntimeException.class);
193193
p.run();
194194
}
195-
196-
// Combine.PerKey depends on GBK under the hood, but can be overriden by a runner. This can
197-
// fail unless it is handled specially, so we should test it specifically
198-
@Test
199-
public void testCombinePerKeyWithValidGcpSecretOption() throws Exception {
200-
if (gcpSecretVersionName == null) {
201-
// Skip test if we couldn't set up secret manager
202-
return;
203-
}
204-
PipelineOptions options = TestPipeline.testingPipelineOptions();
205-
options.setGbek(String.format("type:gcpsecret;version_name:%s", gcpSecretVersionName));
206-
Pipeline p = Pipeline.create(options);
207-
208-
List<KV<String, Integer>> ungroupedPairs =
209-
Arrays.asList(
210-
KV.of("k1", 3), KV.of("k2", 66), KV.of("k1", 4), KV.of("k2", -33), KV.of("k3", 0));
211-
List<KV<String, Integer>> sums = Arrays.asList(KV.of("k1", 7), KV.of("k2", 33), KV.of("k3", 0));
212-
PCollection<KV<String, Integer>> input =
213-
p.apply(
214-
Create.of(ungroupedPairs)
215-
.withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
216-
PCollection<KV<String, Integer>> output = input.apply(Combine.perKey(Sum.ofIntegers()));
217-
PAssert.that(output).containsInAnyOrder(sums);
218-
219-
p.run();
220-
}
221-
222-
@Test
223-
public void testCombinePerKeyWithInvalidGcpSecretOption() throws Exception {
224-
if (gcpSecretVersionName == null) {
225-
// Skip test if we couldn't set up secret manager
226-
return;
227-
}
228-
PipelineOptions options = TestPipeline.testingPipelineOptions();
229-
options.setGbek("type:gcpsecret;version_name:bad_path/versions/latest");
230-
Pipeline p = Pipeline.create(options);
231-
p.apply(Create.of(KV.of("k1", 1))).apply(Combine.perKey(Sum.ofIntegers()));
232-
thrown.expect(RuntimeException.class);
233-
p.run();
234-
}
235195
}

0 commit comments

Comments
 (0)