diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle index e75fda999e14..fd3631fd4a70 100644 --- a/runners/prism/java/build.gradle +++ b/runners/prism/java/build.gradle @@ -94,6 +94,16 @@ def sickbayTests = [ 'org.apache.beam.sdk.testing.TestStreamTest.testProcessingTimeTrigger', 'org.apache.beam.sdk.testing.TestStreamTest.testLateDataAccumulating', // Uses processing time trigger for early firings. + // A regression introduced when we use number of pending elements rather than watermark to determine + // the bundle readiness of a stateless stage. + // Currently, Prism processes a bundle of [100, ..., 1000] when watermark is set to 100, + // and then a second bundle of [1, ... 99] when the watermark is set to +inf. + // As a result, it yields an output of [-999, 1, 1...], where -999 comes from the difference between 1000 and 1. + // According to https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/DoFn.RequiresTimeSortedInput.html, + // the stateful dofn with `RequiresTimeSortedInput` annotation should buffer an element until the element's timestamp + allowed_lateness. + // This stateful dofn feature is not yet supported in Prism. + 'org.apache.beam.sdk.transforms.ParDoTest$StateTests.testRequiresTimeSortedInputWithLateDataAndAllowedLateness', + // Triggered Side Inputs not yet implemented in Prism. // https://github.com/apache/beam/issues/31438 'org.apache.beam.sdk.transforms.ViewTest.testTriggeredLatestSingleton', diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go index 6af030f36228..d03d906e47de 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go @@ -1215,7 +1215,9 @@ type stageKind interface { // buildEventTimeBundle handles building bundles for the stage per it's kind. buildEventTimeBundle(ss *stageState, watermark mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool, pendingAdjustment int) - + // buildProcessingTimeBundle handles building processing-time bundles for the stage per it's kind. + buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (toProcess elementHeap, minTs mtime.Time, newKeys set[string], + holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane, schedulable bool) // getPaneOrDefault based on the stage state, element metadata, and bundle id. getPaneOrDefault(ss *stageState, defaultPane typex.PaneInfo, w typex.Window, keyBytes []byte, bundID string) typex.PaneInfo } @@ -1327,17 +1329,54 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t ready := ss.strat.IsTriggerReady(triggerInput{ newElementCount: 1, endOfWindowReached: endOfWindowReached, + emNow: em.ProcessingTimeNow(), }, &state) if ready { state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached) + } else { + if pts := ss.strat.GetAfterProcessingTimeTriggers(); pts != nil { + for _, t := range pts { + ts := (&state).getTriggerState(t) + if ts.extra == nil || t.shouldFire((&state)) { + // Skipping inserting a processing time timer if the firing time + // is not set or it already should fire. + // When the after processing time triggers should fire, there are + // two scenarios: + // (1) the entire trigger of this window is ready to fire. In this + // case, `ready` should be true and we won't reach here. + // (2) we are still waiting for other triggers (subtriggers) to + // fire (e.g. AfterAll). + continue + } + firingTime := ts.extra.(afterProcessingTimeState).firingTime + notYetHolds := map[mtime.Time]int{} + timer := element{ + window: window, + timestamp: firingTime, + holdTimestamp: window.MaxTimestamp(), + pane: typex.NoFiringPane(), + transform: ss.ID, // Use stage id to fake transform id + family: "AfterProcessingTime", + tag: "", + sequence: 1, + elmBytes: nil, + keyBytes: []byte(key), + } + // TODO: how to deal with watermark holds for this implicit processing time timer + // ss.watermarkHolds.Add(timer.holdTimestamp, 1) + ss.processingTimeTimers.Persist(firingTime, timer, notYetHolds) + em.processTimeEvents.Schedule(firingTime, ss.ID) + em.wakeUpAt(firingTime) + } + } } // Store the state as triggers may have changed it. ss.state[LinkID{}][window][key] = state // If we're ready, it's time to fire! if ready { - count += ss.buildTriggeredBundle(em, key, window) + count += ss.startTriggeredBundle(em, key, window) } return count } @@ -1524,16 +1563,11 @@ func (ss *stageState) savePanes(bundID string, panesInBundle []bundlePane) { } } -// buildTriggeredBundle must be called with the stage.mu lock held. -// When in discarding mode, returns 0. -// When in accumulating mode, returns the number of fired elements to maintain a correct pending count. -func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win typex.Window) int { +func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win typex.Window) ([]element, int) { var toProcess []element dnt := ss.pendingByKeys[key] var notYet []element - rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input} - // Look at all elements for this key, and only for this window. for dnt.elements.Len() > 0 { e := heap.Pop(&dnt.elements).(element) @@ -1564,6 +1598,19 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t heap.Init(&dnt.elements) } + return toProcess, accumulationDiff +} + +// startTriggeredBundle must be called with the stage.mu lock held. +// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count. +// When in discarding mode, returns 0, as the pending work already includes these elements. +// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired. +func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win typex.Window) int { + toProcess, accumulationDiff := ss.buildTriggeredBundle(em, key, win) + if len(toProcess) == 0 { + return accumulationDiff + } + if ss.inprogressKeys == nil { ss.inprogressKeys = set[string]{} } @@ -1575,6 +1622,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t }, } + rb := RunBundle{StageID: ss.ID, BundleID: "agg-" + em.nextBundID(), Watermark: ss.input} ss.makeInProgressBundle( func() string { return rb.BundleID }, toProcess, @@ -1585,9 +1633,11 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t ) slog.Debug("started a triggered bundle", "stageID", ss.ID, "bundleID", rb.BundleID, "size", len(toProcess)) - ss.bundlesToInject = append(ss.bundlesToInject, rb) + // TODO: Use ss.bundlesToInject rather than em.injectedBundles + // ss.bundlesToInject = append(ss.bundlesToInject, rb) // Bundle is marked in progress here to prevent a race condition. em.refreshCond.L.Lock() + em.injectedBundles = append(em.injectedBundles, rb) em.inprogressBundles.insert(rb.BundleID) em.refreshCond.L.Unlock() return accumulationDiff @@ -1927,6 +1977,20 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. ss.mu.Lock() defer ss.mu.Unlock() + toProcess, minTs, newKeys, holdsInBundle, panesInBundle, stillSchedulable := ss.kind.buildProcessingTimeBundle(ss, em, emNow) + + if len(toProcess) == 0 { + // If we have nothing + return "", false, stillSchedulable + } + bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle, panesInBundle) + slog.Debug("started a processing time bundle", "stageID", ss.ID, "bundleID", bundID, "size", len(toProcess), "emNow", emNow) + return bundID, true, stillSchedulable +} + +// handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages. +func handleProcessingTimeTimer(ss *stageState, em *ElementManager, emNow mtime.Time, + processTimerFn func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane)) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime // Special Case for ProcessingTime handling. // Eg. Always queue EventTime elements at minTime. @@ -1935,6 +1999,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. // Potentially puts too much work on the scheduling thread though. var toProcess []element + var panesInBundle []bundlePane minTs := mtime.MaxTimestamp holdsInBundle := map[mtime.Time]int{} @@ -1968,10 +2033,8 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. if e.timestamp < minTs { minTs = e.timestamp } - holdsInBundle[e.holdTimestamp]++ - // We're going to process this timer! - toProcess = append(toProcess, e) + toProcess, panesInBundle = processTimerFn(e, toProcess, holdsInBundle, panesInBundle) } nextTime = ss.processingTimeTimers.Peek() @@ -1986,19 +2049,58 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime. for _, v := range notYet { ss.processingTimeTimers.Persist(v.firing, v.timer, notYetHolds) em.processTimeEvents.Schedule(v.firing, ss.ID) + em.wakeUpAt(v.firing) } // Add a refresh if there are still processing time events to process. stillSchedulable := (nextTime < emNow && nextTime != mtime.MaxTimestamp || len(notYet) > 0) - if len(toProcess) == 0 { - // If we have nothing - return "", false, stillSchedulable - } - bundID := ss.makeInProgressBundle(genBundID, toProcess, minTs, newKeys, holdsInBundle, nil) + return toProcess, minTs, newKeys, holdsInBundle, panesInBundle, stillSchedulable +} - slog.Debug("started a processing time bundle", "stageID", ss.ID, "bundleID", bundID, "size", len(toProcess), "emNow", emNow) - return bundID, true, stillSchedulable +// buildProcessingTimeBundle for stateful stages prepares bundles for processing-time timers +func (*statefulStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { + return handleProcessingTimeTimer(ss, em, emNow, func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane) { + holdsInBundle[e.holdTimestamp]++ + // We're going to process this timer! + toProcess = append(toProcess, e) + return toProcess, nil + }) +} + +// buildProcessingTimeBundle for aggregation stages prepares bundles for after-processing-time triggers +func (*aggregateStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { + return handleProcessingTimeTimer(ss, em, emNow, func(e element, toProcess []element, holdsInBundle map[mtime.Time]int, panesInBundle []bundlePane) ([]element, []bundlePane) { + // Different from `buildProcessingTimeBundle` for stateful stage, + // triggers don't hold back the watermark, so no holds are in the triggered bundle. + state := ss.state[LinkID{}][e.window][string(e.keyBytes)] + endOfWindowReached := e.window.MaxTimestamp() < ss.input + ready := ss.strat.IsTriggerReady(triggerInput{ + newElementCount: 0, + endOfWindowReached: endOfWindowReached, + emNow: emNow, + }, &state) + + if ready { + state.Pane = computeNextTriggeredPane(state.Pane, endOfWindowReached) + + // We're going to process this trigger! + elems, _ := ss.buildTriggeredBundle(em, string(e.keyBytes), e.window) + toProcess = append(toProcess, elems...) + + ss.state[LinkID{}][e.window][string(e.keyBytes)] = state + + panesInBundle = append(panesInBundle, bundlePane{}) + } + + return toProcess, panesInBundle + }) +} + +// buildProcessingTimeBundle for stateless stages is not supposed to be called currently +func (*ordinaryStageKind) buildProcessingTimeBundle(ss *stageState, em *ElementManager, emNow mtime.Time) (elementHeap, mtime.Time, set[string], map[mtime.Time]int, []bundlePane, bool) { + slog.Error("ordinary stages can't have processing time elements") + return nil, mtime.MinTimestamp, nil, nil, nil, false } // makeInProgressBundle is common code to store a set of elements as a bundle in progress. @@ -2281,13 +2383,23 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T inputW := ss.input _, upstreamW := ss.UpstreamWatermark() previousInputW := ss.previousInput - if inputW == upstreamW && previousInputW == inputW { + + _, isOrdinaryStage := ss.kind.(*ordinaryStageKind) + if isOrdinaryStage && len(ss.sides) == 0 { + // For ordinary stage with no side inputs, we use whether there are pending elements to determine + // whether a bundle is ready or not. + if len(ss.pending) == 0 { + return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady + } + } else if inputW == upstreamW && previousInputW == inputW { + // Otherwise, use the progression of watermark to determine the bundle readiness. slog.Debug("bundleReady: unchanged upstream watermark", slog.String("stage", ss.ID), slog.Group("watermark", slog.Any("upstream == input == previousInput", inputW))) return mtime.MinTimestamp, false, ptimeEventsReady, injectedReady } + ready := true for _, side := range ss.sides { pID, ok := em.pcolParents[side.Global] @@ -2329,3 +2441,17 @@ func (em *ElementManager) ProcessingTimeNow() (ret mtime.Time) { func rebaseProcessingTime(localNow, scheduled mtime.Time) mtime.Time { return localNow + (scheduled - mtime.Now()) } + +// wakeUpAt schedules a wakeup signal for the bundle processing loop. +// This is used for processing time timers to ensure the loop re-evaluates +// stages when a processing time timer is expected to fire. +func (em *ElementManager) wakeUpAt(t mtime.Time) { + if em.testStreamHandler == nil && em.config.EnableRTC { + // only create this goroutine if we have real-time clock enabled and the pipeline does not have TestStream. + go func(fireAt time.Time) { + time.AfterFunc(time.Until(fireAt), func() { + em.refreshCond.Broadcast() + }) + }(t.ToTime()) + } +} diff --git a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go index 044b9806c1b1..2aef5fcf332f 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go +++ b/sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go @@ -73,6 +73,49 @@ func (ws WinStrat) IsNeverTrigger() bool { return ok } +func getAfterProcessingTimeTriggers(t Trigger) []*TriggerAfterProcessingTime { + if t == nil { + return nil + } + var triggers []*TriggerAfterProcessingTime + switch at := t.(type) { + case *TriggerAfterProcessingTime: + return []*TriggerAfterProcessingTime{at} + case *TriggerAfterAll: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterAny: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterEach: + for _, st := range at.SubTriggers { + triggers = append(triggers, getAfterProcessingTimeTriggers(st)...) + } + return triggers + case *TriggerAfterEndOfWindow: + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Early)...) + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Late)...) + return triggers + case *TriggerOrFinally: + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Main)...) + triggers = append(triggers, getAfterProcessingTimeTriggers(at.Finally)...) + return triggers + case *TriggerRepeatedly: + return getAfterProcessingTimeTriggers(at.Repeated) + default: + return nil + } +} + +// GetAfterProcessingTimeTriggers returns all AfterProcessingTime triggers within the trigger. +func (ws WinStrat) GetAfterProcessingTimeTriggers() []*TriggerAfterProcessingTime { + return getAfterProcessingTimeTriggers(ws.Trigger) +} + func (ws WinStrat) String() string { return fmt.Sprintf("WinStrat[AllowedLateness:%v Trigger:%v]", ws.AllowedLateness, ws.Trigger) } diff --git a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go index f00838152111..12c3c42c2e92 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go +++ b/sdks/go/pkg/beam/runners/prism/internal/jobservices/management.go @@ -316,7 +316,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ * func hasUnsupportedTriggers(tpb *pipepb.Trigger) bool { unsupported := false switch at := tpb.GetTrigger().(type) { - case *pipepb.Trigger_AfterProcessingTime_, *pipepb.Trigger_AfterSynchronizedProcessingTime_: + case *pipepb.Trigger_AfterSynchronizedProcessingTime_: return true case *pipepb.Trigger_AfterAll_: for _, st := range at.AfterAll.GetSubtriggers() { diff --git a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go index 7a742c22d0fb..d54955f43d46 100644 --- a/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go +++ b/sdks/go/pkg/beam/runners/prism/internal/unimplemented_test.go @@ -53,7 +53,6 @@ func TestUnimplemented(t *testing.T) { // Currently unimplemented triggers. // https://github.com/apache/beam/issues/31438 {pipeline: primitives.TriggerAfterSynchronizedProcessingTime}, - {pipeline: primitives.TriggerAfterProcessingTime}, } for _, test := range tests { @@ -93,6 +92,8 @@ func TestImplemented(t *testing.T) { {pipeline: primitives.TriggerAfterEach}, {pipeline: primitives.TriggerAfterEndOfWindow}, {pipeline: primitives.TriggerRepeat}, + {pipeline: primitives.TriggerAfterProcessingTime}, + {pipeline: primitives.TriggerAfterProcessingTimeNotTriggered}, } for _, test := range tests { diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 8d951fe8ce96..eae64dcb2053 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -171,6 +171,7 @@ var flinkFilters = []string{ "TestBigQueryIO.*", "TestBigtableIO.*", "TestSpannerIO.*", + "TestTriggerAfterProcessingTime", // The number of produced outputs in AfterSynchronizedProcessingTime varies in different runs. "TestTriggerAfterSynchronizedProcessingTime", // The flink runner does not support pipeline drain for SDF. diff --git a/sdks/go/test/integration/primitives/windowinto.go b/sdks/go/test/integration/primitives/windowinto.go index d33e464b76f0..f5d01bdfbba5 100644 --- a/sdks/go/test/integration/primitives/windowinto.go +++ b/sdks/go/test/integration/primitives/windowinto.go @@ -217,14 +217,32 @@ func TriggerElementCount(s beam.Scope) { }, 2) } -// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger, it fires output panes once 't' processing time has passed +// TriggerAfterProcessingTimeNotTriggered tests the AfterProcessingTime Trigger. It won't fire because 't' processing time is not reached +// Not yet supported by the flink runner: +// java.lang.UnsupportedOperationException: Advancing Processing time is not supported by the Flink Runner. +func TriggerAfterProcessingTimeNotTriggered(s beam.Scope) { + con := teststream.NewConfig() + con.AdvanceProcessingTime(100) + con.AddElements(1000, 1.0, 2.0, 3.0) + con.AdvanceProcessingTime(4999) // advance processing time but not enough to fire the trigger + con.AddElements(22000, 4.0) + + col := teststream.Create(s, con) + + validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col, + []beam.WindowIntoOption{ + beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5 * time.Second)), + }, 10.0) +} + +// TriggerAfterProcessingTime tests the AfterProcessingTime Trigger. It fires output panes once 't' processing time has passed // Not yet supported by the flink runner: // java.lang.UnsupportedOperationException: Advancing Processing time is not supported by the Flink Runner. func TriggerAfterProcessingTime(s beam.Scope) { con := teststream.NewConfig() con.AdvanceProcessingTime(100) con.AddElements(1000, 1.0, 2.0, 3.0) - con.AdvanceProcessingTime(2000) + con.AdvanceProcessingTime(5000) // advance processing time to fire the trigger con.AddElements(22000, 4.0) col := teststream.Create(s, con) @@ -232,7 +250,7 @@ func TriggerAfterProcessingTime(s beam.Scope) { validateEquals(s.Scope("Global"), window.NewGlobalWindows(), col, []beam.WindowIntoOption{ beam.Trigger(trigger.AfterProcessingTime().PlusDelay(5 * time.Second)), - }, 6.0) + }, 6.0, 4.0) } // TriggerRepeat tests the repeat trigger. As of now is it is configure to take only one trigger as a subtrigger. diff --git a/sdks/go/test/integration/primitives/windowinto_test.go b/sdks/go/test/integration/primitives/windowinto_test.go index 0f2cff5d8f24..39a1df6e9e74 100644 --- a/sdks/go/test/integration/primitives/windowinto_test.go +++ b/sdks/go/test/integration/primitives/windowinto_test.go @@ -77,6 +77,12 @@ func TestTriggerAfterAny(t *testing.T) { ptest.BuildAndRun(t, TriggerAfterAny) } +func TestTriggerAfterProcessingTime(t *testing.T) { + integration.CheckFilters(t) + ptest.BuildAndRun(t, TriggerAfterProcessingTime) + ptest.BuildAndRun(t, TriggerAfterProcessingTimeNotTriggered) +} + func TestTriggerAfterSynchronizedProcessingTime(t *testing.T) { integration.CheckFilters(t) ptest.BuildAndRun(t, TriggerAfterSynchronizedProcessingTime)