Skip to content

Commit da391d0

Browse files
authored
Merge pull request #38230 from stankiewicz/drain_combiner
[Drain] Support extensible element metadata propagation in ReduceFnRunner
2 parents d6b98ba + 79f7db9 commit da391d0

13 files changed

Lines changed: 453 additions & 30 deletions

File tree

runners/core-java/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ dependencies {
4242
implementation project(path: ":model:pipeline", configuration: "shadow")
4343
implementation project(path: ":sdks:java:core", configuration: "shadow")
4444
implementation project(path: ":model:job-management", configuration: "shadow")
45+
implementation project(path: ":model:fn-execution", configuration: "shadow")
4546
implementation library.java.vendored_guava_32_1_2_jre
4647
implementation library.java.joda_time
4748
implementation library.java.vendored_grpc_1_69_0
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core;
19+
20+
import com.google.auto.value.AutoValue;
21+
import java.io.IOException;
22+
import java.io.InputStream;
23+
import java.io.OutputStream;
24+
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
25+
import org.apache.beam.sdk.coders.AtomicCoder;
26+
import org.apache.beam.sdk.values.CausedByDrain;
27+
28+
/**
29+
* Encapsulates metadata that propagates with elements in the pipeline.
30+
*
31+
* <p>This metadata is sent along with elements. It currently includes fields like {@link
32+
* CausedByDrain}, and is designed to be extensible to support future metadata fields such as
33+
* OpenTelemetry context or CDC (Change Data Capture) kind.
34+
*
35+
* <p>The purpose of this class is to group targeted metadata fields together. This makes it easier
36+
* to define combination strategies (e.g., when accumulating state in {@code ReduceFnRunner}) when
37+
* multiple elements are merged or grouped, without having to extend method signatures or state
38+
* handling for every new metadata field.
39+
*/
40+
@AutoValue
41+
public abstract class CombinedMetadata {
42+
public abstract CausedByDrain causedByDrain();
43+
44+
public static CombinedMetadata create(CausedByDrain causedByDrain) {
45+
return new AutoValue_CombinedMetadata(causedByDrain);
46+
}
47+
48+
public static CombinedMetadata createDefault() {
49+
return create(CausedByDrain.NORMAL);
50+
}
51+
52+
public static class Coder extends AtomicCoder<CombinedMetadata> {
53+
private static final Coder INSTANCE = new Coder();
54+
55+
public static Coder of() {
56+
return INSTANCE;
57+
}
58+
59+
@Override
60+
public void encode(CombinedMetadata value, OutputStream outStream) throws IOException {
61+
BeamFnApi.Elements.ElementMetadata proto =
62+
BeamFnApi.Elements.ElementMetadata.newBuilder()
63+
.setDrain(
64+
value.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
65+
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
66+
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
67+
.build();
68+
proto.writeDelimitedTo(outStream);
69+
}
70+
71+
@Override
72+
public CombinedMetadata decode(InputStream inStream) throws IOException {
73+
BeamFnApi.Elements.ElementMetadata proto =
74+
BeamFnApi.Elements.ElementMetadata.parseDelimitedFrom(inStream);
75+
if (proto == null) {
76+
return CombinedMetadata.createDefault();
77+
}
78+
79+
CausedByDrain causedByDrain =
80+
proto.getDrain() == BeamFnApi.Elements.DrainMode.Enum.DRAINING
81+
? CausedByDrain.CAUSED_BY_DRAIN
82+
: CausedByDrain.NORMAL;
83+
84+
return CombinedMetadata.create(causedByDrain);
85+
}
86+
}
87+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core;
19+
20+
import org.apache.beam.sdk.transforms.Combine.CombineFn;
21+
import org.apache.beam.sdk.values.CausedByDrain;
22+
23+
/** Combiner for CombinedMetadata. */
24+
class CombinedMetadataCombiner
25+
extends CombineFn<CombinedMetadata, CombinedMetadata, CombinedMetadata> {
26+
private static final CombinedMetadataCombiner INSTANCE = new CombinedMetadataCombiner();
27+
28+
public static CombinedMetadataCombiner of() {
29+
return INSTANCE;
30+
}
31+
32+
@Override
33+
public CombinedMetadata createAccumulator() {
34+
return CombinedMetadata.create(CausedByDrainCombiner.of().createAccumulator());
35+
}
36+
37+
@Override
38+
public CombinedMetadata addInput(CombinedMetadata accumulator, CombinedMetadata input) {
39+
return CombinedMetadata.create(
40+
CausedByDrainCombiner.of().addInput(accumulator.causedByDrain(), input.causedByDrain()));
41+
}
42+
43+
@Override
44+
public CombinedMetadata mergeAccumulators(Iterable<CombinedMetadata> accumulators) {
45+
CombinedMetadata result = createAccumulator();
46+
for (CombinedMetadata accum : accumulators) {
47+
result = addInput(result, accum);
48+
}
49+
return result;
50+
}
51+
52+
@Override
53+
public CombinedMetadata extractOutput(CombinedMetadata accumulator) {
54+
return accumulator;
55+
}
56+
57+
/** Combiner for CausedByDrain metadata. */
58+
static class CausedByDrainCombiner implements MetadataCombiner<CausedByDrain> {
59+
private static final CausedByDrainCombiner INSTANCE = new CausedByDrainCombiner();
60+
61+
public static CausedByDrainCombiner of() {
62+
return INSTANCE;
63+
}
64+
65+
@Override
66+
public CausedByDrain createAccumulator() {
67+
return CausedByDrain.NORMAL;
68+
}
69+
70+
@Override
71+
public CausedByDrain addInput(CausedByDrain current, CausedByDrain input) {
72+
if (current == CausedByDrain.CAUSED_BY_DRAIN || input == CausedByDrain.CAUSED_BY_DRAIN) {
73+
return CausedByDrain.CAUSED_BY_DRAIN;
74+
}
75+
return CausedByDrain.NORMAL;
76+
}
77+
}
78+
}

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,13 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
145145
} else {
146146
nonLateElements.add(
147147
WindowedValues.of(
148-
element.getValue(), element.getTimestamp(), window, element.getPaneInfo()));
148+
element.getValue(),
149+
element.getTimestamp(),
150+
window,
151+
element.getPaneInfo(),
152+
element.getRecordId(),
153+
element.getRecordOffset(),
154+
element.causedByDrain()));
149155
}
150156
}
151157
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.runners.core;
19+
20+
/** Interface for combining pipeline metadata. */
21+
interface MetadataCombiner<T> {
22+
T createAccumulator();
23+
24+
T addInput(T accumulator, T input);
25+
}

0 commit comments

Comments
 (0)