From 5e86d7e0fb4a457380a2959b674794d141078004 Mon Sep 17 00:00:00 2001 From: Bruno Volpato Date: Thu, 2 Apr 2026 10:24:32 -0400 Subject: [PATCH 1/2] Fix flaky FileIOTest.testMatchWatchForNewFiles test under CI filesystems addresses #19480 The `FileIOTest.testMatchWatchForNewFiles` test occasionally flakes in the CI environment because the `updOptions` configuration in `CopyFilesFn` does not preserve file attributes when overwriting existing files. In some CI filesystems, this causes the copied file to register a `lastModifiedMillis` timestamp of `0`. Thus, when `ExtractFilenameAndLastUpdateFn` parses this file, it throws a `RuntimeException` at `FileIO.java:800`, failing the pipeline run. This PR adds `StandardCopyOption.COPY_ATTRIBUTES` to preserve the file's original timestamps, avoiding the exception. --- .../core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index dffc4943bfab..dc4457bb4a20 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -249,7 +249,9 @@ public void processElement(ProcessContext context, @StateId("count") ValueState< context.output(Objects.requireNonNull(context.element()).getValue()); CopyOption[] cpOptions = {StandardCopyOption.COPY_ATTRIBUTES}; - CopyOption[] updOptions = {StandardCopyOption.REPLACE_EXISTING}; + CopyOption[] updOptions = { + StandardCopyOption.REPLACE_EXISTING, StandardCopyOption.COPY_ATTRIBUTES + }; final Path sourcePath = Paths.get(sourcePathStr); final Path watchPath = Paths.get(watchPathStr); From 9aa0988403676b9b5e34d1502c4741ce5373ca7b Mon Sep 17 00:00:00 2001 From: Bruno Volpato Date: Tue, 5 May 2026 01:47:28 -0400 Subject: [PATCH 2/2] Stabilize FileIOTest updated-file timestamp assertions --- .../src/test/java/org/apache/beam/sdk/io/FileIOTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java index dc4457bb4a20..dcfcb3224454 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java @@ -257,10 +257,17 @@ public void processElement(ProcessContext context, @StateId("count") ValueState< if (0 == current) { Thread.sleep(100); + // Ensure overwrite updates get a distinct mtime even when COPY_ATTRIBUTES is enabled. + Files.setLastModifiedTime( + sourcePath.resolve("first"), FileTime.fromMillis(System.currentTimeMillis() + 2000)); Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), cpOptions); } else if (1 == current) { Thread.sleep(100); + Files.setLastModifiedTime( + sourcePath.resolve("first"), FileTime.fromMillis(System.currentTimeMillis() + 4000)); + Files.setLastModifiedTime( + sourcePath.resolve("second"), FileTime.fromMillis(System.currentTimeMillis() + 4000)); Files.copy(sourcePath.resolve("first"), watchPath.resolve("first"), updOptions); Files.copy(sourcePath.resolve("second"), watchPath.resolve("second"), updOptions); Files.copy(sourcePath.resolve("third"), watchPath.resolve("third"), cpOptions);