Skip to content

Commit 4528d1f

Browse files
committed
fix(window): 修复事件时间窗口未触发问题并添加调试日志
1 parent 6ce76d5 commit 4528d1f

4 files changed

Lines changed: 157 additions & 13 deletions

File tree

streamsql_sliding_window_test.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1066,9 +1066,20 @@ func TestSQLSlidingWindow_MaxOutOfOrderness(t *testing.T) {
10661066
}
10671067

10681068
// 第三阶段:发送更多正常数据,推进 watermark
1069+
// 关键:要触发窗口,需要 watermark >= windowEnd
1070+
// watermark = maxEventTime - maxOutOfOrderness
1071+
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
1072+
windowSizeMs := int64(2000) // 2秒
1073+
maxOutOfOrdernessMs := int64(1000) // 1秒
1074+
firstWindowEnd := baseTime + windowSizeMs
1075+
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
10691076
t.Log("第三阶段:继续发送正常数据,推进 watermark")
10701077
for i := 10; i < 15; i++ {
10711078
eventTime := baseTime + int64(i*200)
1079+
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
1080+
if i == 10 && eventTime < requiredEventTimeForTrigger {
1081+
eventTime = requiredEventTimeForTrigger
1082+
}
10721083
ssql.Emit(map[string]interface{}{
10731084
"deviceId": "sensor001",
10741085
"eventTime": eventTime,
@@ -1192,11 +1203,17 @@ func TestSQLSlidingWindow_AllowedLateness(t *testing.T) {
11921203
}
11931204

11941205
// 推进watermark,触发第一个窗口
1195-
// 发送事件时间超过第一个窗口结束时间的数据
1196-
firstWindowEnd := baseTime + int64(2000) // 第一个窗口结束时间
1206+
// 关键:要触发窗口,需要 watermark >= windowEnd
1207+
// watermark = maxEventTime - maxOutOfOrderness
1208+
// 所以需要:maxEventTime >= windowEnd + maxOutOfOrderness
1209+
windowSizeMs := int64(2000) // 2秒
1210+
maxOutOfOrdernessMs := int64(1000) // 1秒
1211+
firstWindowEnd := baseTime + windowSizeMs
1212+
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
1213+
// 发送事件时间 >= requiredEventTimeForTrigger 的数据,确保 watermark >= windowEnd
11971214
ssql.Emit(map[string]interface{}{
11981215
"deviceId": "sensor001",
1199-
"eventTime": firstWindowEnd + int64(2000),
1216+
"eventTime": requiredEventTimeForTrigger,
12001217
"temperature": 100.0,
12011218
})
12021219

streamsql_tumbling_window_test.go

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,17 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
448448
})
449449

450450
// 模拟完整的延迟数据处理场景
451-
baseTime := time.Now().UnixMilli() - 10000
451+
// 关键:确保 baseTime 对齐到窗口边界,以便窗口对齐行为可预测
452+
windowSizeMs := int64(2000) // 2秒
453+
baseTimeRaw := time.Now().UnixMilli() - 10000
454+
baseTime := (baseTimeRaw / windowSizeMs) * windowSizeMs // 对齐到窗口边界
455+
maxOutOfOrdernessMs := int64(1000) // 1秒
456+
firstWindowEnd := baseTime + windowSizeMs
457+
// 关键:要触发窗口,需要 watermark >= windowEnd
458+
// watermark = maxEventTime - maxOutOfOrderness
459+
// 所以需要:maxEventTime - maxOutOfOrderness >= windowEnd
460+
// 即:maxEventTime >= windowEnd + maxOutOfOrderness
461+
requiredEventTimeForTrigger := firstWindowEnd + maxOutOfOrdernessMs
452462

453463
// 第一阶段:发送正常顺序的数据
454464
t.Log("第一阶段:发送正常顺序的数据")
@@ -482,9 +492,14 @@ func TestSQLTumblingWindow_BothConfigs(t *testing.T) {
482492
}
483493

484494
// 第三阶段:继续发送正常数据,推进 watermark
495+
// 关键:必须发送事件时间 >= requiredEventTimeForTrigger 的数据,才能让 watermark >= windowEnd
485496
t.Log("第三阶段:继续发送正常数据,推进 watermark")
486497
for i := 10; i < 15; i++ {
487498
eventTime := baseTime + int64(i*200)
499+
// 确保至少有一个数据的事件时间 >= requiredEventTimeForTrigger
500+
if i == 10 && eventTime < requiredEventTimeForTrigger {
501+
eventTime = requiredEventTimeForTrigger
502+
}
488503
ssql.Emit(map[string]interface{}{
489504
"deviceId": "sensor001",
490505
"eventTime": eventTime,

window/sliding_window.go

Lines changed: 63 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package window
1919
import (
2020
"context"
2121
"fmt"
22+
"log"
2223
"sync"
2324
"time"
2425

@@ -27,6 +28,17 @@ import (
2728
"github.com/rulego/streamsql/types"
2829
)
2930

31+
// debugLogSliding logs debug information only when EnableDebug is true
32+
// This function is optimized to avoid unnecessary string formatting when debug is disabled
33+
func debugLogSliding(format string, args ...interface{}) {
34+
// Fast path: if debug is disabled, return immediately without evaluating args
35+
// The compiler should optimize this check away when EnableDebug is a compile-time constant false
36+
if !EnableDebug {
37+
return
38+
}
39+
log.Printf("[SlidingWindow] "+format, args...)
40+
}
41+
3042
// Ensure SlidingWindow implements the Window interface
3143
var _ Window = (*SlidingWindow)(nil)
3244

@@ -171,11 +183,17 @@ func (sw *SlidingWindow) Add(data interface{}) {
171183
// For event time, align window start to window boundaries
172184
alignedStart := alignWindowStart(eventTime, sw.slide)
173185
sw.currentSlot = sw.createSlotFromStart(alignedStart)
186+
debugLogSliding("Add: initialized with EventTime, eventTime=%v, alignedStart=%v, window=[%v, %v)",
187+
eventTime.UnixMilli(), alignedStart.UnixMilli(),
188+
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
174189
} else {
175190
// For processing time, use current time or event time as-is
176191
sw.currentSlot = sw.createSlot(eventTime)
177192
// Record when first window started (processing time)
178193
sw.firstWindowStartTime = time.Now()
194+
debugLogSliding("Add: initialized with ProcessingTime, eventTime=%v, window=[%v, %v)",
195+
eventTime.UnixMilli(),
196+
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
179197
}
180198
// Don't start timer here, wait for first window to end
181199
// Send initialization complete signal
@@ -193,6 +211,10 @@ func (sw *SlidingWindow) Add(data interface{}) {
193211
Timestamp: eventTime,
194212
}
195213
sw.data = append(sw.data, row)
214+
debugLogSliding("Add: added data, eventTime=%v, totalData=%d, currentSlot=[%v, %v), inWindow=%v",
215+
eventTime.UnixMilli(), len(sw.data),
216+
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli(),
217+
sw.currentSlot.Contains(eventTime))
196218

197219
// Check if data is late and handle allowedLateness (after data is added)
198220
if timeChar == types.EventTime && sw.watermark != nil {
@@ -211,7 +233,7 @@ func (sw *SlidingWindow) Add(data interface{}) {
211233
break
212234
}
213235
}
214-
236+
215237
// If not belonging to triggered window, check if it belongs to currentSlot
216238
// This handles the case where watermark has advanced but window hasn't triggered yet
217239
if !belongsToTriggeredWindow && sw.initialized && sw.currentSlot != nil && sw.currentSlot.Contains(eventTime) {
@@ -379,6 +401,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
379401
defer sw.mu.Unlock()
380402

381403
if !sw.initialized || sw.currentSlot == nil {
404+
debugLogSliding("checkAndTriggerWindows: not initialized or currentSlot is nil")
382405
return
383406
}
384407

@@ -391,27 +414,43 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
391414
// Which means: maxEventTime >= windowEnd + maxOutOfOrderness
392415
// This ensures all data for the window has arrived (within maxOutOfOrderness tolerance)
393416
// Use a small threshold (1ms) only for floating point precision issues
417+
totalDataCount := len(sw.data)
418+
debugLogSliding("checkAndTriggerWindows: watermark=%v, totalData=%d, currentSlot=[%v, %v)",
419+
watermarkTime.UnixMilli(), totalDataCount,
420+
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
421+
394422
for sw.currentSlot != nil {
395423
windowEnd := sw.currentSlot.End
396-
424+
windowStart := sw.currentSlot.Start
425+
397426
// Check if watermark >= windowEnd
398427
// Use !Before() instead of After() to include equality case
399428
// This is equivalent to watermarkTime >= windowEnd
400429
shouldTrigger := !watermarkTime.Before(*windowEnd)
401-
430+
431+
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), watermark=%v, shouldTrigger=%v",
432+
windowStart.UnixMilli(), windowEnd.UnixMilli(), watermarkTime.UnixMilli(), shouldTrigger)
433+
402434
if !shouldTrigger {
403435
// Watermark hasn't reached windowEnd yet, stop checking
436+
debugLogSliding("checkAndTriggerWindows: watermark hasn't reached windowEnd, stopping")
404437
break
405438
}
406439
// Check if window has data before triggering
407440
hasData := false
441+
dataInWindow := 0
442+
var dataTimestamps []int64
408443
for _, item := range sw.data {
409444
if sw.currentSlot.Contains(item.Timestamp) {
410445
hasData = true
411-
break
446+
dataInWindow++
447+
dataTimestamps = append(dataTimestamps, item.Timestamp.UnixMilli())
412448
}
413449
}
414450

451+
debugLogSliding("checkAndTriggerWindows: window=[%v, %v), hasData=%v, dataInWindow=%d, dataTimestamps=%v",
452+
windowStart.UnixMilli(), windowEnd.UnixMilli(), hasData, dataInWindow, dataTimestamps)
453+
415454
// Trigger current window only if it has data
416455
if hasData {
417456
// Count data in window before triggering
@@ -421,7 +460,7 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
421460
dataInWindow++
422461
}
423462
}
424-
463+
425464
// Save snapshot data before triggering (for Flink-like late update behavior)
426465
var snapshotData []types.Row
427466
if allowedLateness > 0 {
@@ -438,8 +477,11 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
438477
}
439478
}
440479
}
441-
480+
481+
debugLogSliding("checkAndTriggerWindows: triggering window [%v, %v) with %d data items",
482+
windowStart.UnixMilli(), windowEnd.UnixMilli(), dataInWindow)
442483
sw.triggerWindowLocked()
484+
debugLogSliding("checkAndTriggerWindows: window triggered successfully")
443485

444486
// If allowedLateness > 0, keep window open for late data
445487
if allowedLateness > 0 {
@@ -450,11 +492,23 @@ func (sw *SlidingWindow) checkAndTriggerWindows(watermarkTime time.Time) {
450492
closeTime: closeTime,
451493
snapshotData: snapshotData, // Save snapshot for late updates
452494
}
495+
debugLogSliding("checkAndTriggerWindows: window [%v, %v) kept open for late data until %v",
496+
windowStart.UnixMilli(), windowEnd.UnixMilli(), closeTime.UnixMilli())
453497
}
498+
} else {
499+
debugLogSliding("checkAndTriggerWindows: window [%v, %v) has no data, skipping trigger",
500+
windowStart.UnixMilli(), windowEnd.UnixMilli())
454501
}
455502

456503
// Move to next window (even if current window was empty)
457504
sw.currentSlot = sw.NextSlot()
505+
if sw.currentSlot != nil {
506+
debugLogSliding("checkAndTriggerWindows: moved to next window [%v, %v)",
507+
sw.currentSlot.Start.UnixMilli(), sw.currentSlot.End.UnixMilli())
508+
} else {
509+
debugLogSliding("checkAndTriggerWindows: NextSlot returned nil, stopping")
510+
break
511+
}
458512
}
459513

460514
// Close windows that have exceeded allowedLateness
@@ -804,7 +858,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
804858

805859
// Collect all data for this window: original snapshot + late data from sw.data
806860
resultData := make([]types.Row, 0)
807-
861+
808862
// First, add original snapshot data (if exists)
809863
if windowInfo != nil && len(windowInfo.snapshotData) > 0 {
810864
// Create copies of snapshot data
@@ -816,7 +870,7 @@ func (sw *SlidingWindow) triggerLateUpdateLocked(slot *types.TimeSlot) {
816870
})
817871
}
818872
}
819-
873+
820874
// Then, add late data from sw.data (newly arrived late data)
821875
lateDataCount := 0
822876
for _, item := range sw.data {
@@ -884,7 +938,7 @@ func (sw *SlidingWindow) closeExpiredWindows(watermarkTime time.Time) {
884938
delete(sw.triggeredWindows, key)
885939
}
886940
}
887-
941+
888942
// Clean up data that belongs to expired windows (if any)
889943
if len(expiredWindows) > 0 {
890944
newData := make([]types.Row, 0)

0 commit comments

Comments
 (0)