Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions be/src/exec/scan/scanner_scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,10 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,

size_t raw_bytes_read = 0;
bool first_read = true; int64_t limit = scanner->limit();
// If the first block is full, then it is true. Or the first block + second block > batch_size
// If the first block is full, then it is true. Or the first block + second block
// exceeds the row/byte budget.
bool has_first_full_block = false;
const size_t preferred_block_size_bytes = state->preferred_block_size_bytes();

// During low memory mode, every scan task will return at most 2 block to reduce memory usage.
while (!eos && raw_bytes_read < raw_bytes_threshold &&
Expand Down Expand Up @@ -272,9 +274,30 @@ void ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
// Projection will truncate useless columns, makes block size change.
auto free_block_bytes = free_block->allocated_bytes();
raw_bytes_read += free_block_bytes;
if (!scan_task->cached_blocks.empty() &&
scan_task->cached_blocks.back().first->rows() + free_block->rows() <=
ctx->batch_size()) {
const auto can_merge_to_last_block = [&]() {
if (scan_task->cached_blocks.empty()) {
return false;
}

const auto* last_block = scan_task->cached_blocks.back().first.get();
if (last_block->rows() == 0 || free_block->rows() == 0) {
return true;
}

const bool within_row_budget =
last_block->rows() + free_block->rows() <= ctx->batch_size();
if (!within_row_budget) {
return false;
}

if (!config::enable_adaptive_batch_size) {
return true;
}

return last_block->bytes() + free_block->bytes() <= preferred_block_size_bytes;
}();

if (can_merge_to_last_block) {
size_t block_size = scan_task->cached_blocks.back().first->allocated_bytes();
MutableBlock mutable_block(scan_task->cached_blocks.back().first.get());
status = mutable_block.merge(*free_block);
Expand Down
Loading