What happened?
I'm attempting to use early triggering and PaneInfo to limit bundle sizes to avoid running into the dataflow limit of 80MB and have found that PaneInfo does not appear to be populated correctly.
Runner: Dataflow
Beam Version: 2.55.1
Here's a test that I believe demonstrates the problem:
func init() {
register.Function2x0(produceFn)
register.Function4x0(getPanes)
register.Emitter1[int]()
}
func produceFn(_ []byte, emit func(beam.EventTime, int)) {
baseT := mtime.Now()
for i := 0; i < 10; i++ {
emit(baseT.Add(time.Minute), i)
}
}
func Produce(s beam.Scope) beam.PCollection {
return beam.ParDo(s, produceFn, beam.Impulse(s))
}
func getPanes(ctx context.Context, pi typex.PaneInfo, _ int, emit func(int)) {
log.Output(ctx, log.SevWarn, 0, fmt.Sprintf("got pane %+v", pi))
emit(int(pi.Index))
}
func TestPanes(t *testing.T) {
p, scp := beam.NewPipelineWithRoot()
c := Produce(scp)
windowed := beam.WindowInto(
scp,
window.NewFixedWindows(5*time.Minute),
c,
beam.Trigger(trigger.AfterEndOfWindow().
EarlyFiring(
trigger.Repeat(
trigger.AfterCount(2),
),
),
),
beam.PanesDiscard(),
)
panes := beam.ParDo(scp, getPanes, windowed)
paneIdxs := beam.WindowInto(scp, window.NewGlobalWindows(), panes)
passert.Count(scp, paneIdxs, "pane idxs", 10)
passert.EqualsList(scp, paneIdxs, []int{0, 0, 1, 1, 2, 0, 0, 1, 1, 2})
ptest.RunAndValidate(t, p)
}
The logs are all:
got pane {Timing:0 IsFirst:false IsLast:false Index:0 NonSpeculativeIndex:0}
Even if I don't have the indexes correct in the test (the test is failing on the EqualsList), I would expect these to be internally consistent. That is, I would expect there to be at least one IsFirst:true and IsLast:true each.
Issue Priority
Priority: 2 (default)
Issue Components
What happened?
I'm attempting to use early triggering and
PaneInfoto limit bundle sizes to avoid running into the dataflow limit of 80MB and have found that PaneInfo does not appear to be populated correctly.Runner: Dataflow
Beam Version: 2.55.1
Here's a test that I believe demonstrates the problem:
The logs are all:
Even if I don't have the indexes correct in the test (the test is failing on the
EqualsList), I would expect these to be internally consistent. That is, I would expect there to be at least oneIsFirst:trueandIsLast:trueeach.Issue Priority
Priority: 2 (default)
Issue Components