@@ -234,7 +234,7 @@ type ElementManager struct {
234234 livePending atomic.Int64 // An accessible live pending count. DEBUG USE ONLY
235235 pendingElements sync.WaitGroup // pendingElements counts all unprocessed elements in a job. Jobs with no pending elements terminate successfully.
236236
237- processTimeEvents * stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time.
237+ processTimeEvents * stageRefreshQueue // Manages sequence of stage updates when interfacing with processing time. Callers must hold refreshCond.L lock.
238238 testStreamHandler * testStreamHandler // Optional test stream handler when a test stream is in the pipeline.
239239}
240240
@@ -398,7 +398,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
398398 for {
399399 em .refreshCond .L .Lock ()
400400 // Check if processing time has advanced before the wait loop.
401- emNow := em .ProcessingTimeNow ()
401+ emNow := em .processingTimeNow ()
402402 changedByProcessingTime := em .processTimeEvents .AdvanceTo (emNow )
403403 em .changedStages .merge (changedByProcessingTime )
404404
@@ -415,7 +415,7 @@ func (em *ElementManager) Bundles(ctx context.Context, upstreamCancelFn context.
415415 em .refreshCond .Wait () // until watermarks may have changed.
416416
417417 // Update if the processing time has advanced while we waited, and add refreshes here. (TODO waking on real time here for prod mode)
418- emNow = em .ProcessingTimeNow ()
418+ emNow = em .processingTimeNow ()
419419 changedByProcessingTime = em .processTimeEvents .AdvanceTo (emNow )
420420 em .changedStages .merge (changedByProcessingTime )
421421 }
@@ -521,7 +521,7 @@ func (em *ElementManager) DumpStages() string {
521521 stageState = append (stageState , fmt .Sprintf ("TestStreamHandler: completed %v, curIndex %v of %v events: %+v, processingTime %v, %v, ptEvents %v \n " ,
522522 em .testStreamHandler .completed , em .testStreamHandler .nextEventIndex , len (em .testStreamHandler .events ), em .testStreamHandler .events , em .testStreamHandler .processingTime , mtime .FromTime (em .testStreamHandler .processingTime ), em .processTimeEvents ))
523523 } else {
524- stageState = append (stageState , fmt .Sprintf ("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n " , em .ProcessingTimeNow (), em .processTimeEvents .events , em .injectedBundles ))
524+ stageState = append (stageState , fmt .Sprintf ("ElementManager Now: %v processingTimeEvents: %v injectedBundles: %v\n " , em .processingTimeNow (), em .processTimeEvents .events , em .injectedBundles ))
525525 }
526526 sort .Strings (ids )
527527 for _ , id := range ids {
@@ -880,8 +880,23 @@ func (em *ElementManager) PersistBundle(rb RunBundle, col2Coders map[string]PCol
880880 slog .Int ("newPending" , len (newPending )), "consumers" , consumers , "sideConsumers" , sideConsumers ,
881881 "pendingDelta" , len (newPending )* len (consumers ))
882882 for _ , sID := range consumers {
883+
883884 consumer := em .stages [sID ]
884- count := consumer .AddPending (em , newPending )
885+ var count int
886+ _ , isAggregateStage := consumer .kind .(* aggregateStageKind )
887+ if isAggregateStage {
888+ // While adding pending elements in aggregate stage, we may need to
889+ // access em.processTimeEvents to determine triggered bundles.
890+ // To avoid deadlocks, we acquire the em.refreshCond.L lock here before
891+ // AddPending is called.
892+ func () {
893+ em .refreshCond .L .Lock ()
894+ defer em .refreshCond .L .Unlock ()
895+ count = consumer .AddPending (em , newPending )
896+ }()
897+ } else {
898+ count = consumer .AddPending (em , newPending )
899+ }
885900 em .addPending (count )
886901 }
887902 for _ , link := range sideConsumers {
@@ -993,7 +1008,7 @@ func (em *ElementManager) triageTimers(d TentativeData, inputInfo PColInfo, stag
9931008 win typex.Window
9941009 }
9951010 em .refreshCond .L .Lock ()
996- emNow := em .ProcessingTimeNow ()
1011+ emNow := em .processingTimeNow ()
9971012 em .refreshCond .L .Unlock ()
9981013
9991014 var pendingEventTimers []element
@@ -1317,11 +1332,12 @@ func (ss *stageState) AddPending(em *ElementManager, newPending []element) int {
13171332
13181333func (ss * stageState ) injectTriggeredBundlesIfReady (em * ElementManager , window typex.Window , key string ) int {
13191334 // Check on triggers for this key.
1320- // We use an empty linkID as the key into state for aggregations.
1335+ // Callers must hold em.refreshCond.L
13211336 count := 0
13221337 if ss .state == nil {
13231338 ss .state = make (map [LinkID ]map [typex.Window ]map [string ]StateData )
13241339 }
1340+ // We use an empty linkID as the key into state for aggregations.
13251341 lv , ok := ss .state [LinkID {}]
13261342 if ! ok {
13271343 lv = make (map [typex.Window ]map [string ]StateData )
@@ -1337,7 +1353,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
13371353 ready := ss .strat .IsTriggerReady (triggerInput {
13381354 newElementCount : 1 ,
13391355 endOfWindowReached : endOfWindowReached ,
1340- emNow : em .ProcessingTimeNow (),
1356+ emNow : em .processingTimeNow (),
13411357 }, & state )
13421358
13431359 if ready {
@@ -1374,9 +1390,7 @@ func (ss *stageState) injectTriggeredBundlesIfReady(em *ElementManager, window t
13741390 // TODO: how to deal with watermark holds for this implicit processing time timer
13751391 // ss.watermarkHolds.Add(timer.holdTimestamp, 1)
13761392 ss .processingTimeTimers .Persist (firingTime , timer , notYetHolds )
1377- em .refreshCond .L .Lock ()
13781393 em .processTimeEvents .Schedule (firingTime , ss .ID )
1379- em .refreshCond .L .Unlock ()
13801394 em .wakeUpAt (firingTime )
13811395 }
13821396 }
@@ -1618,7 +1632,7 @@ func (ss *stageState) buildTriggeredBundle(em *ElementManager, key string, win t
16181632 return toProcess , accumulationDiff
16191633}
16201634
1621- // startTriggeredBundle must be called with the stage.mu lock held.
1635+ // startTriggeredBundle must be called with the stage.mu lock and em.refreshCond.L lock held.
16221636// Returns the accumulation diff that the pending work needs to be adjusted by, as completed work is subtracted from the pending count.
16231637// When in discarding mode, returns 0, as the pending work already includes these elements.
16241638// When in accumulating mode, returns the number of fired elements, since those elements remain pending even after this bundle is fired.
@@ -1653,10 +1667,8 @@ func (ss *stageState) startTriggeredBundle(em *ElementManager, key string, win t
16531667 // TODO: Use ss.bundlesToInject rather than em.injectedBundles
16541668 // ss.bundlesToInject = append(ss.bundlesToInject, rb)
16551669 // Bundle is marked in progress here to prevent a race condition.
1656- em .refreshCond .L .Lock ()
16571670 em .injectedBundles = append (em .injectedBundles , rb )
16581671 em .inprogressBundles .insert (rb .BundleID )
1659- em .refreshCond .L .Unlock ()
16601672 return accumulationDiff
16611673}
16621674
@@ -2006,6 +2018,7 @@ func (ss *stageState) startProcessingTimeBundle(em *ElementManager, emNow mtime.
20062018}
20072019
20082020// handleProcessingTimeTimer contains the common code for handling processing-time timers for aggregation stages and stateful stages.
2021+ // Callers must hold em.refreshCond.L lock.
20092022func handleProcessingTimeTimer (ss * stageState , em * ElementManager , emNow mtime.Time ,
20102023 processTimerFn func (e element , toProcess []element , holdsInBundle map [mtime.Time ]int , panesInBundle []bundlePane ) ([]element , []bundlePane , int )) (elementHeap , mtime.Time , set [string ], map [mtime.Time ]int , []bundlePane , bool , int ) {
20112024 // TODO: Determine if it's possible and a good idea to treat all EventTime processing as a MinTime
@@ -2440,8 +2453,8 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
24402453 return upstreamW , ready , ptimeEventsReady , injectedReady
24412454}
24422455
2443- // ProcessingTimeNow gives the current processing time for the runner.
2444- func (em * ElementManager ) ProcessingTimeNow () (ret mtime.Time ) {
2456+ // processingTimeNow gives the current processing time for the runner.
2457+ func (em * ElementManager ) processingTimeNow () (ret mtime.Time ) {
24452458 if em .testStreamHandler != nil && ! em .testStreamHandler .completed {
24462459 return em .testStreamHandler .Now ()
24472460 }
0 commit comments