From 1925b3becee7e3234b8f93b7366310b88ab2d45e Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Mon, 27 Apr 2026 23:28:53 +0000 Subject: [PATCH 1/4] Optimze away WindmillWatermarkHold::clear when the cached hold is empty --- .../windmill/state/WindmillWatermarkHold.java | 3 +++ .../state/WindmillStateInternalsTest.java | 16 ++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 613d87c127b7..9353c0e92a8d 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -72,6 +72,9 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol @Override public void clear() { + if (cachedValue != null && !cachedValue.isPresent() && localAdditions == null) { + return; + } cleared = true; cachedValue = Optional.absent(); localAdditions = null; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index de036214b958..2810ab630c58 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3228,6 +3228,22 @@ public void testNewWatermarkNoFetch() throws Exception { Mockito.verifyNoInteractions(mockReader); } + @Test + public void testWatermarkClearNoOp() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr); + + hold.clear(); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTestNewKey.persist(commitBuilder); + + assertEquals(0, commitBuilder.getWatermarkHoldsCount()); + assertBuildable(commitBuilder); + } + @Test public void testValueSetBeforeRead() throws Exception { StateTag> addr = StateTags.value("value", StringUtf8Coder.of()); From f3d5461ff1ece48348ec0a774be723418cc62438 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 1 May 2026 22:28:07 +0000 Subject: [PATCH 2/4] review comments --- .../windmill/state/WindmillWatermarkHold.java | 3 ++- .../state/WindmillStateInternalsTest.java | 18 ++++++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 9353c0e92a8d..67c090bcd910 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -72,7 +72,8 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol @Override public void clear() { - if (cachedValue != null && !cachedValue.isPresent() && localAdditions == null) { + localAdditions = null; + if (cachedValue != null && !cachedValue.isPresent()) { return; } cleared = true; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index 2810ab630c58..5fbae493581b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -3244,6 +3244,24 @@ public void testWatermarkClearNoOp() throws Exception { assertBuildable(commitBuilder); } + @Test + public void testWatermarkClearWithLocalAdditionsNoop() throws Exception { + StateTag addr = + StateTags.watermarkStateInternal("watermark", TimestampCombiner.EARLIEST); + WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr); + + hold.add(new Instant(500)); + + hold.clear(); + + Windmill.WorkItemCommitRequest.Builder commitBuilder = + Windmill.WorkItemCommitRequest.newBuilder(); + underTestNewKey.persist(commitBuilder); + + assertEquals(0, commitBuilder.getWatermarkHoldsCount()); + assertBuildable(commitBuilder); + } + @Test public void testValueSetBeforeRead() throws Exception { StateTag> addr = StateTags.value("value", StringUtf8Coder.of()); From d3a1ea83292077ab75ebf199a1de8d2c25582f71 Mon Sep 17 00:00:00 2001 From: Arun Pandian Date: Fri, 1 May 2026 23:59:50 +0000 Subject: [PATCH 3/4] fix test --- .../dataflow/worker/StreamingDataflowWorkerTest.java | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index 3a25a671ca92..cff5ba4a2fc4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -2374,9 +2374,6 @@ public void testMergeSessionWindows_singleLateWindow() throws Exception { new Action( buildSessionInput( 1, 40, 0, Collections.singletonList(1L), Collections.EMPTY_LIST)) - .withHolds( - buildHold("/gAAAAAAAAAsK/+uhold", -1, true), - buildHold("/gAAAAAAAAAsK/+uextra", -1, true)) .withTimers(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 3600010)))); } @@ -2404,10 +2401,7 @@ public void testMergeSessionWindows() throws Exception { 0, Collections.EMPTY_LIST, Collections.singletonList(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 10)))) - .withTimers(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 3600010)) - .withHolds( - buildHold("/gAAAAAAAAAsK/+uhold", -1, true), - buildHold("/gAAAAAAAAAsK/+uextra", -1, true)), + .withTimers(buildWatermarkTimer("/s/gAAAAAAAAAsK/+0", 3600010)), new Action( buildSessionInput( 3, 30, 0, Collections.singletonList(8L), Collections.EMPTY_LIST)) @@ -2436,8 +2430,8 @@ public void testMergeSessionWindows() throws Exception { .withHolds( buildHold("/gAAAAAAAACkK/+uhold", -1, true), buildHold("/gAAAAAAAACkK/+uextra", -1, true), - buildHold("/gAAAAAAAAAsK/+uhold", 40, true), - buildHold("/gAAAAAAAAAsK/+uextra", 3600040, true)), + buildHold("/gAAAAAAAAAsK/+uhold", 40, false), + buildHold("/gAAAAAAAAAsK/+uextra", 3600040, false)), new Action( buildSessionInput( 6, From 0fd40f51cc3c1378becb7c4286a526cdf0900ada Mon Sep 17 00:00:00 2001 From: scwhittle Date: Mon, 4 May 2026 14:52:57 +0200 Subject: [PATCH 4/4] Update WindmillWatermarkHold.java --- .../dataflow/worker/windmill/state/WindmillWatermarkHold.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java index 67c090bcd910..b426b96cb5b9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java @@ -74,11 +74,11 @@ public class WindmillWatermarkHold extends WindmillState implements WatermarkHol public void clear() { localAdditions = null; if (cachedValue != null && !cachedValue.isPresent()) { + // No need to clear the backend as it is known empty. return; } cleared = true; cachedValue = Optional.absent(); - localAdditions = null; } @Override