diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java index c1b56a2b4458..8404f15842ee 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java @@ -1250,9 +1250,8 @@ public void processElement( // before we return from this processElement call. This allows us to perform the writes/closes // in parallel with the prior elements close calls and bounds the amount of data buffered to // limit the number of OOMs. - CompletionStage> pastCloseFutures = MoreFutures.allAsList(closeFutures); + CompletionStage pastCloseFutures = MoreFutures.allOf(closeFutures); closeFutures.clear(); - // Close all writers in the background for (Map.Entry> entry : writers.entrySet()) { int shard = c.element().getKey().getShardNumber(); @@ -1267,7 +1266,6 @@ public void processElement( new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()))); closeWriterInBackground(writer); } - // Block on completing the past closes before returning. We do so after starting the current // closes in the background so that they can happen in parallel. MoreFutures.get(pastCloseFutures); @@ -1293,7 +1291,7 @@ private void closeWriterInBackground(Writer writer) { @FinishBundle public void finishBundle(FinishBundleContext c) throws Exception { try { - MoreFutures.get(MoreFutures.allAsList(closeFutures)); + MoreFutures.get(MoreFutures.allOf(closeFutures)); // If all writers were closed without exception, output the results to the next stage. for (KV> result : deferredOutput) { c.output(result.getValue(), result.getKey(), result.getValue().getWindow()); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java index 0999f2ad0771..441e604af3a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/MoreFutures.java @@ -159,6 +159,13 @@ public static CompletionStage> allAsList( nothing -> Arrays.stream(f).map(CompletableFuture::join).collect(Collectors.toList())); } + public static CompletionStage allOf( + Collection> futures) { + // CompletableFuture.allOf completes exceptionally if any of the futures do. + CompletableFuture[] f = futuresToCompletableFutures(futures); + return CompletableFuture.allOf(f); + } + /** * An object that represents either a result or an exceptional termination. * diff --git a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java index c56f5be0000a..77670eafbb40 100644 --- a/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java +++ b/sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtil.java @@ -990,7 +990,7 @@ private static void executeBatches(List batches) throws IOExcept try { try { - MoreFutures.get(MoreFutures.allAsList(futures)); + MoreFutures.get(MoreFutures.allOf(futures)); } catch (ExecutionException e) { if (e.getCause() instanceof FileNotFoundException) { throw (FileNotFoundException) e.getCause();