Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1250,9 +1250,8 @@ public void processElement(
// before we return from this processElement call. This allows us to perform the writes/closes
// in parallel with the prior elements close calls and bounds the amount of data buffered to
// limit the number of OOMs.
CompletionStage<List<Void>> pastCloseFutures = MoreFutures.allAsList(closeFutures);
CompletionStage<Void> pastCloseFutures = MoreFutures.allOf(closeFutures);
closeFutures.clear();

// Close all writers in the background
for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
int shard = c.element().getKey().getShardNumber();
Expand All @@ -1267,7 +1266,6 @@ public void processElement(
new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey())));
closeWriterInBackground(writer);
}

// Block on completing the past closes before returning. We do so after starting the current
// closes in the background so that they can happen in parallel.
MoreFutures.get(pastCloseFutures);
Expand All @@ -1293,7 +1291,7 @@ private void closeWriterInBackground(Writer<DestinationT, OutputT> writer) {
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
try {
MoreFutures.get(MoreFutures.allAsList(closeFutures));
MoreFutures.get(MoreFutures.allOf(closeFutures));
// If all writers were closed without exception, output the results to the next stage.
for (KV<Instant, FileResult<DestinationT>> result : deferredOutput) {
c.output(result.getValue(), result.getKey(), result.getValue().getWindow());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,13 @@ public static <T> CompletionStage<List<T>> allAsList(
nothing -> Arrays.stream(f).map(CompletableFuture::join).collect(Collectors.toList()));
}

public static <T> CompletionStage<Void> allOf(
Collection<? extends CompletionStage<? extends T>> futures) {
// CompletableFuture.allOf completes exceptionally if any of the futures do.
CompletableFuture<? extends T>[] f = futuresToCompletableFutures(futures);
return CompletableFuture.allOf(f);
}

/**
* An object that represents either a result or an exceptional termination.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,7 +990,7 @@ private static void executeBatches(List<BatchInterface> batches) throws IOExcept

try {
try {
MoreFutures.get(MoreFutures.allAsList(futures));
MoreFutures.get(MoreFutures.allOf(futures));
} catch (ExecutionException e) {
if (e.getCause() instanceof FileNotFoundException) {
throw (FileNotFoundException) e.getCause();
Expand Down
Loading