From 3037e17a20475b4f779c52933bb21e0981bf4f11 Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Fri, 15 May 2026 17:16:27 +0800 Subject: [PATCH 1/3] [fix](scan) Respect byte budget when merging scan blocks --- be/src/exec/scan/scanner_scheduler.cpp | 31 ++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp index fae4659361fb42..d989d9e55b00a7 100644 --- a/be/src/exec/scan/scanner_scheduler.cpp +++ b/be/src/exec/scan/scanner_scheduler.cpp @@ -220,8 +220,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, size_t raw_bytes_read = 0; bool first_read = true; int64_t limit = scanner->limit(); - // If the first block is full, then it is true. Or the first block + second block > batch_size + // If the first block is full, then it is true. Or the first block + second block + // exceeds the row/byte budget. bool has_first_full_block = false; + const size_t preferred_block_size_bytes = state->preferred_block_size_bytes(); // During low memory mode, every scan task will return at most 2 block to reduce memory usage. while (!eos && raw_bytes_read < raw_bytes_threshold && @@ -272,9 +274,30 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, // Projection will truncate useless columns, makes block size change. auto free_block_bytes = free_block->allocated_bytes(); raw_bytes_read += free_block_bytes; - if (!scan_task->cached_blocks.empty() && - scan_task->cached_blocks.back().first->rows() + free_block->rows() <= - ctx->batch_size()) { + const auto can_merge_to_last_block = [&]() { + if (scan_task->cached_blocks.empty()) { + return false; + } + + const auto* last_block = scan_task->cached_blocks.back().first.get(); + if (last_block->rows() == 0 || free_block->rows() == 0) { + return true; + } + + const bool within_row_budget = + last_block->rows() + free_block->rows() <= ctx->batch_size(); + if (!within_row_budget) { + return false; + } + + const auto free_block_data_bytes = free_block->bytes(); + const bool within_byte_budget = preferred_block_size_bytes == 0 || + last_block->bytes() + free_block_data_bytes <= + preferred_block_size_bytes; + return within_byte_budget; + }(); + + if (can_merge_to_last_block) { size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes(); MutableBlock mutable_block(scan_task->cached_blocks.back().first.get()); status = mutable_block.merge(*free_block); From c4a168504905b8ca08c6b76d768dde8f06fa06af Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Tue, 19 May 2026 15:09:33 +0800 Subject: [PATCH 2/3] fix(scan): keep row-only merge when adaptive batch disabled --- be/src/exec/scan/scanner_scheduler.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp index d989d9e55b00a7..2613595054dad4 100644 --- a/be/src/exec/scan/scanner_scheduler.cpp +++ b/be/src/exec/scan/scanner_scheduler.cpp @@ -223,7 +223,11 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, // If the first block is full, then it is true. Or the first block + second block // exceeds the row/byte budget. bool has_first_full_block = false; - const size_t preferred_block_size_bytes = state->preferred_block_size_bytes(); + // RuntimeState::preferred_block_size_bytes() returns a legal max budget when + // adaptive batch size is disabled. Use 0 as the local disabled sentinel so the + // merge path preserves the old row-budget-only behavior in that mode. + const size_t preferred_block_size_bytes = + config::enable_adaptive_batch_size ? state->preferred_block_size_bytes() : 0; // During low memory mode, every scan task will return at most 2 block to reduce memory usage. while (!eos && raw_bytes_read < raw_bytes_threshold && From 4916f6f785d742d8b187ca4f1efdd379e64e2c92 Mon Sep 17 00:00:00 2001 From: Hu Shenggang Date: Tue, 19 May 2026 15:14:39 +0800 Subject: [PATCH 3/3] fix(scan): skip byte merge budget when adaptive disabled --- be/src/exec/scan/scanner_scheduler.cpp | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/be/src/exec/scan/scanner_scheduler.cpp b/be/src/exec/scan/scanner_scheduler.cpp index 2613595054dad4..123e0515eb2df2 100644 --- a/be/src/exec/scan/scanner_scheduler.cpp +++ b/be/src/exec/scan/scanner_scheduler.cpp @@ -223,11 +223,7 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, // If the first block is full, then it is true. Or the first block + second block // exceeds the row/byte budget. bool has_first_full_block = false; - // RuntimeState::preferred_block_size_bytes() returns a legal max budget when - // adaptive batch size is disabled. Use 0 as the local disabled sentinel so the - // merge path preserves the old row-budget-only behavior in that mode. - const size_t preferred_block_size_bytes = - config::enable_adaptive_batch_size ? state->preferred_block_size_bytes() : 0; + const size_t preferred_block_size_bytes = state->preferred_block_size_bytes(); // During low memory mode, every scan task will return at most 2 block to reduce memory usage. while (!eos && raw_bytes_read < raw_bytes_threshold && @@ -294,11 +290,11 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr ctx, return false; } - const auto free_block_data_bytes = free_block->bytes(); - const bool within_byte_budget = preferred_block_size_bytes == 0 || - last_block->bytes() + free_block_data_bytes <= - preferred_block_size_bytes; - return within_byte_budget; + if (!config::enable_adaptive_batch_size) { + return true; + } + + return last_block->bytes() + free_block->bytes() <= preferred_block_size_bytes; }(); if (can_merge_to_last_block) {