diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 613d87c127b7..b426b96cb5b9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -72,9 +72,13 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol @Override public void clear() { + localAdditions = null; + if (cachedValue != null && !cachedValue.isPresent()) { + // No need to clear the backend as it is known empty. + return; + } cleared = true; cachedValue = Optional.absent(); - localAdditions = null; } @Override diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 3a25a671ca92..cff5ba4a2fc4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2374,9 +2374,6 @@ public void testMergeSessionWindows_singleLateWindow() throws Exception { new Action( buildSessionInput( 1, 40, 0, Collections.singletonList(1L), Collections.EMPTY_LIST)) - .withHolds( - buildHold("/gAAAAAAAAAsK/+uhold", -1, true), - buildHold("/gAAAAAAAAAsK/+uextra", -1, true)) .withTimers(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 3600010)))); } @@ -2404,10 +2401,7 @@ public void testMergeSessionWindows() throws Exception { 0, Collections.EMPTY_LIST, Collections.singletonList(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 10)))) - .withTimers(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 3600010)) - .withHolds( - buildHold("/gAAAAAAAAAsK/+uhold", -1, true), - buildHold("/gAAAAAAAAAsK/+uextra", -1, true)), + .withTimers(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 3600010)), new Action( buildSessionInput( 3, 30, 0, Collections.singletonList(8L), Collections.EMPTY_LIST)) @@ -2436,8 +2430,8 @@ public void testMergeSessionWindows() throws Exception { .withHolds( buildHold("/gAAAAAAAACkK/+uhold", -1, true), buildHold("/gAAAAAAAACkK/+uextra", -1, true), - buildHold("/gAAAAAAAAAsK/+uhold", 40, true), - buildHold("/gAAAAAAAAAsK/+uextra", 3600040, true)), + buildHold("/gAAAAAAAAAsK/+uhold", 40, false), + buildHold("/gAAAAAAAAAsK/+uextra", 3600040, false)), new Action( buildSessionInput( 6, diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index de036214b958..5fbae493581b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3228,6 +3228,40 @@ public void testNewWatermarkNoFetch() throws Exception { Mockito.verifyNoInteractions(mockReader); } + @Test + public void testWatermarkClearNoOp() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr); + + hold.clear(); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTestNewKey.persist(commitBuilder); + + assertEquals(0, commitBuilder.getWatermarkHoldsCount()); + assertBuildable(commitBuilder); + } + + @Test + public void testWatermarkClearWithLocalAdditionsNoop() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr); + + hold.add(new Instant(500)); + + hold.clear(); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTestNewKey.persist(commitBuilder); + + assertEquals(0, commitBuilder.getWatermarkHoldsCount()); + assertBuildable(commitBuilder); + } + @Test public void testValueSetBeforeRead() throws Exception { StateTag> addr = StateTags.value("value", StringUtf8Coder.of());