Implement high-performance sorted writing support in IcebergIO#38406
Implement high-performance sorted writing support in IcebergIO#38406atognolag wants to merge 19 commits intoapache:masterfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces native support for sorted writes in IcebergIO. By integrating a memory-safe, disk-spilling sorter, the implementation ensures that data written to sorted Iceberg tables is perfectly ordered, which significantly improves performance and reduces resource contention by minimizing concurrent file writers. The changes include a comprehensive byte-encoding protocol for lexicographical sorting and updated write-path logic to handle sorted tables efficiently. Highlights
New Features🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request implements row sorting for Iceberg writes by introducing the IcebergRowSorter utility, which leverages BufferedExternalSorter to handle large datasets. The implementation supports complex Iceberg SortOrder configurations by encoding sort keys into byte arrays. Review feedback identified critical logic errors in the null-ordering byte prefix calculation for descending columns, which were also propagated into incorrect test assertions. Additionally, several performance optimizations were suggested to avoid expensive row conversions and redundant lookups within loops, and to reduce GC pressure from frequent buffer allocations. A potential reliability issue was also noted regarding the consumption of non-re-iterable collections when extracting schemas.
…racting Schema from Coder
…Order().isSorted() for readability
…e its conditional state
… key encoding performance
|
@ahmedabu98 mind taking a look? Thanks! |
ahmedabu98
left a comment
There was a problem hiding this comment.
Left some comments.
I think more thought needs to be put into the architecture of Sorted writes. This is a primitive approach that might work well for small writes, but will quickly degrade for medium~larger writes.
Maybe take a look at how Flink or Spark do it (look for "range" distribution mode). IIRC they do a global sort before passing data to writers. This way, input iterables are already sorted so writers can break them up into files with tight min/max ranges.
| if (icebergRecord == null) { | ||
| icebergRecord = IcebergUtils.beamRowToIcebergRecord(icebergSchema, row); | ||
| } | ||
| Object icebergVal = icebergRecord.getField(colName); | ||
| if (icebergVal != null) { | ||
| val = field.transform().apply(icebergVal); |
There was a problem hiding this comment.
This is pretty heavy to extract the transformed value. Will make writes pretty slow
Can we apply the transform to the Beam field object directly? If not, maybe we need a "beam object to iceberg object" conversion. IcebergUtils.copyFieldIntoRecord does something similar, but we may need to refactor it a little to fit this use case.
| try { | ||
| for (Row row : rows) { | ||
| byte[] keyBytes = encodeSortKey(row, sortOrder, columnNames, icebergSchema, beamSchema); | ||
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Let's to clear and reuse the output stream object instead of creating a new one each iteration
| org.apache.beam.sdk.schemas.Schema beamSchema) | ||
| throws IOException { | ||
|
|
||
| ByteArrayOutputStream baos = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Same thing here, let's create ByteArrayOutputStream it in the caller function and pass it in here. Each call should reset and use the same one for efficiency
| } else { | ||
| writeString(val.toString(), baos, invert); | ||
| } |
There was a problem hiding this comment.
Does Iceberg support sorting on complex types (lists, maps, structs) ?
We should either 1) add support for that or 2) throw an Unsupported Exception early on.
I'd rather throw an error than fall back on String because it may lead to unexpected sorting behavior for some types.
There was a problem hiding this comment.
It is not supported. It now raises an UnsupportedOperationException
| Iterable<Row> sortedOrUnsortedRows = | ||
| IcebergRowSorter.sortRows( | ||
| element.getValue(), table.sortOrder(), table.schema(), dataSchema); | ||
| for (Row row : sortedOrUnsortedRows) { |
There was a problem hiding this comment.
We should only be sorting if the user asked us to.
There was a problem hiding this comment.
IcebergRowSorter.sortRows() will only sort if the table schema defines any sort configuration or else, it does nothing.
…xpose NONE, HASH, and RANGE modes, optimize stream reuse, and add comprehensive sorting tests
…ively document distribution modes
…n mode with custom sharding function
… auto-sharding overlap limitations
|
I have updated this PR to address your (Ahmed) feedback and optimize the sorting implementation. Below is a summary of the changes: 1. Configurable Distribution Modes
2. Performance Optimizations (GC & CPU Overhead Reductions)
3. Sorter Correctness & Safety
4. Verification & Test Coverage
|
fbad2a0 to
acf0e56
Compare
…value-level toString conversion
…o String conversions in IcebergUtilsTest.java
…ring conversions in IcebergUtilsTest.java
… in WritePartitionedRowsToFiles.java
Pull Request: High-Performance Sorted Writing Support in Native IcebergIO
Description
This pull request implements robust, high-performance sorted writing support in native
IcebergIO(sdks/java/io/iceberg).When writing to sorted Iceberg tables, writing unsorted data causes massive performance degradation, high memory overhead, and "file thrashing" due to too many concurrent file writers being kept open on workers. By dynamically pre-sorting incoming
PCollection<Row>elements based on the active Iceberg tableSortOrderinside the write transform, we produce perfectly ordered Parquet files, optimize worker resources, and reduce the number of concurrent file handles.Core Technical Implementation
1. Memory-Safe Spill-to-Disk Sorter (
IcebergRowSorter.java):sdks:java:extensions:sorter(viaBufferedExternalSorter).OOM) crashes.2. Dynamic Unsigned Lexicographical Byte Encoding
byte[]):~byte) to reverse the unsigned byte comparison order naturally.NULLS_FIRST&NULLS_LAST): Prefix headers are mapped statically to direct standard unsigned comparators correctly:0x00/ ASC NULLS_LAST ->0xFF0xFF/ DESC NULLS_LAST ->0x000x00 -> [0x01, 0x01],0x01 -> [0x01, 0x02]) terminated by a safe0x00byte. This prevents column boundary bleeding on composite keys (e.g."abc"+"def"vs"abcdef"+null).ReadableInstant,java.time.Instant, andjava.util.Dateconversions, preventing runner-specific casting crashes.3. Shard-Routing and Write-Path Bypasses
WriteUngroupedRowsToFiles. If the table has an activeSortOrder, it skips direct ungrouped writing and spills elements to the grouped, shuffled path where they are properly partitioned and pre-sorted before writing.Verification and Test Coverage
1. Expanded Unit Tests (
IcebergRowSorterTest.java)2. End-to-End Pipeline Integration Tests (
IcebergIOWriteTest.java)assertFilesAreInternallySortedverification helper which parses individual Parquet files committed to the table directly using Iceberg scan APIs, ensuring that each written file is perfectly sorted internally, regardless of the runner's sharding factor.NONE,HASH, andHASH_WITH_AUTOSHARDINGdistribution modes.