Skip to content

Commit 39263ba

Browse files
committed
Fix propagation of metadata
1 parent 6a73d55 commit 39263ba

2 files changed

Lines changed: 14 additions & 2 deletions

File tree

runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,13 @@ public <K, InputT> Iterable<WindowedValue<InputT>> filter(
145145
} else {
146146
nonLateElements.add(
147147
WindowedValues.of(
148-
element.getValue(), element.getTimestamp(), window, element.getPaneInfo()));
148+
element.getValue(),
149+
element.getTimestamp(),
150+
window,
151+
element.getPaneInfo(),
152+
element.getRecordId(),
153+
element.getRecordOffset(),
154+
element.causedByDrain()));
149155
}
150156
}
151157
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillSink.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
4040
import org.apache.beam.sdk.transforms.windowing.PaneInfo.PaneInfoCoder;
4141
import org.apache.beam.sdk.util.ByteStringOutputStream;
42+
import org.apache.beam.sdk.values.CausedByDrain;
4243
import org.apache.beam.sdk.values.KV;
4344
import org.apache.beam.sdk.values.ValueWithRecordId;
4445
import org.apache.beam.sdk.values.ValueWithRecordId.ValueWithRecordIdCoder;
@@ -220,7 +221,12 @@ public long add(WindowedValue<T> data) throws IOException {
220221
ByteString id = ByteString.EMPTY;
221222
// todo #33176 specify additional metadata in the future
222223
BeamFnApi.Elements.ElementMetadata additionalMetadata =
223-
BeamFnApi.Elements.ElementMetadata.newBuilder().build();
224+
BeamFnApi.Elements.ElementMetadata.newBuilder()
225+
.setDrain(
226+
data.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN
227+
? BeamFnApi.Elements.DrainMode.Enum.DRAINING
228+
: BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
229+
.build();
224230
ByteString metadata =
225231
encodeMetadata(
226232
stream, windowsCoder, data.getWindows(), data.getPaneInfo(), additionalMetadata);

0 commit comments

Comments
 (0)