Skip to content

Commit cb00495

Browse files
committed
fix merge windows tests
1 parent 8052824 commit cb00495

4 files changed

Lines changed: 30 additions & 16 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,13 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
145145
} else {
146146
nonLateElements.add(
147147
WindowedValues.of(
148-
element.getValue(), element.getTimestamp(), window, element.getPaneInfo()));
148+
element.getValue(),
149+
element.getTimestamp(),
150+
window,
151+
element.getPaneInfo(),
152+
element.getRecordId(),
153+
element.getRecordOffset(),
154+
element.causedByDrain()));
149155
}
150156
}
151157
}

runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import org.apache.beam.sdk.metrics.Counter;
3939
import org.apache.beam.sdk.metrics.Metrics;
4040
import org.apache.beam.sdk.options.PipelineOptions;
41-
import org.apache.beam.sdk.state.CombiningState;
41+
import org.apache.beam.sdk.state.BagState;
4242
import org.apache.beam.sdk.state.TimeDomain;
4343
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
4444
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
@@ -108,11 +108,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
108108
* <li>It uses discarding or accumulation mode according to the {@link WindowingStrategy}.
109109
* </ul>
110110
*/
111-
static final StateTag<CombiningState<CombinedMetadata, CombinedMetadata, CombinedMetadata>>
112-
METADATA_TAG =
113-
StateTags.makeSystemTagInternal(
114-
StateTags.combiningValue(
115-
"combinedMetadata", CombinedMetadata.Coder.of(), CombinedMetadataCombiner.of()));
111+
static final StateTag<BagState<CombinedMetadata>> METADATA_TAG =
112+
StateTags.makeSystemTagInternal(
113+
StateTags.bag("combinedMetadata", CombinedMetadata.Coder.of()));
116114

117115
private final WindowingStrategy<Object, W> windowingStrategy;
118116

@@ -1010,6 +1008,7 @@ private boolean needToEmit(boolean isEmpty, boolean isFinished, PaneInfo.Timing
10101008
private void prefetchOnTrigger(
10111009
final ReduceFn<K, InputT, OutputT, W>.Context directContext,
10121010
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
1011+
directContext.state().access(METADATA_TAG).readLater();
10131012
paneInfoTracker.prefetchPaneInfo(directContext);
10141013
watermarkHold.prefetchExtract(renamedContext);
10151014
nonEmptyPanes.isEmpty(renamedContext.state()).readLater();
@@ -1028,12 +1027,11 @@ private void prefetchOnTrigger(
10281027
boolean isEndOfWindow,
10291028
CombinedMetadata metadata)
10301029
throws Exception {
1031-
CombiningState<CombinedMetadata, CombinedMetadata, CombinedMetadata> metadataState =
1032-
directContext.state().access(METADATA_TAG);
1033-
CombinedMetadata aggregatedMetadata = metadataState.read();
1034-
if (aggregatedMetadata == null) {
1035-
aggregatedMetadata = CombinedMetadata.createDefault();
1036-
}
1030+
BagState<CombinedMetadata> metadataState = directContext.state().access(METADATA_TAG);
1031+
Iterable<CombinedMetadata> allMetadata = metadataState.read();
1032+
CombinedMetadata aggregatedMetadata =
1033+
CombinedMetadataCombiner.of().mergeAccumulators(allMetadata);
1034+
10371035
CombinedMetadata fullyAggregatedMetadata =
10381036
CombinedMetadataCombiner.of().addInput(aggregatedMetadata, metadata);
10391037
final CausedByDrain aggregatedCausedByDrain = fullyAggregatedMetadata.causedByDrain();

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
4040
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
4141
import org.apache.beam.sdk.util.ByteStringOutputStream;
42+
import org.apache.beam.sdk.values.CausedByDrain;
4243
import org.apache.beam.sdk.values.KV;
4344
import org.apache.beam.sdk.values.ValueWithRecordId;
4445
import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
@@ -220,7 +221,12 @@ public long add(WindowedValue<T> data) throws IOException {
220221
ByteString id = ByteString.EMPTY;
221222
// todo #33176 specify additional metadata in the future
222223
BeamFnApi.Elements.ElementMetadata additionalMetadata =
223-
BeamFnApi.Elements.ElementMetadata.newBuilder().build();
224+
BeamFnApi.Elements.ElementMetadata.newBuilder()
225+
.setDrain(
226+
data.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
227+
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
228+
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
229+
.build();
224230
ByteString metadata =
225231
encodeMetadata(
226232
stream, windowsCoder, data.getWindows(), data.getPaneInfo(), additionalMetadata);

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,6 +1922,11 @@ public void testMergeWindows() throws Exception {
19221922
.getValueBuilder()
19231923
.setTimestamp(0)
19241924
.setData(ByteString.EMPTY);
1925+
dataBuilder
1926+
.addBagsBuilder()
1927+
.setTag(combinedMetadataTag)
1928+
.setStateFamily(stateFamily)
1929+
.addValues(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x08, 0x01}));
19251930
server.whenGetDataCalled().thenReturn(dataResponse.build());
19261931

19271932
expectedBytesRead += dataBuilder.build().getSerializedSize();
@@ -2154,7 +2159,6 @@ public void testMergeWindowsCaching() throws Exception {
21542159
Windmill.TagBag.newBuilder()
21552160
.setTag(combinedMetadataTag)
21562161
.setStateFamily(stateFamily)
2157-
.setDeleteAll(true)
21582162
.addValues(ByteString.copyFrom(new byte[] {0x01, 0x02, 0x08, 0x01}))
21592163
.build())));
21602164

@@ -2308,7 +2312,7 @@ public void testMergeWindowsCaching() throws Exception {
23082312
CacheStats stats = worker.getStateCacheStats();
23092313
LOG.info("cache stats {}", stats);
23102314
assertEquals(1, stats.hitCount());
2311-
assertEquals(4, stats.missCount());
2315+
assertEquals(5, stats.missCount());
23122316
worker.stop();
23132317
}
23142318

0 commit comments

Comments
 (0)