#754 Add schema migration support for Delta bookkeeper tables via mergeSchema on model mismatch#755
Conversation
…geSchema on model mismatch.
|
Warning Rate limit exceeded
You’ve run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
WalkthroughBookkeeperDeltaPath and BookkeeperDeltaTable now detect and recover from Delta table schema mismatches by wrapping reads in exception handlers that recognize unresolved-column errors, invoke an automatic model migration (appending an empty dataset with mergeSchema enabled), and retry the table read. Changes are applied across BookkeeperDeltaPath and three Scala versions of BookkeeperDeltaTable, with corresponding test coverage. ChangesDelta Bookkeeper Schema Migration
Sequence DiagramsequenceDiagram
participant Caller
participant getBkDf
participant loadDelta
participant migrateModel
participant writeEmpty
participant retryLoad
Caller->>getBkDf: invoke read
getBkDf->>loadDelta: attempt table read
loadDelta--xgetBkDf: AnalysisException (unresolved column)
getBkDf->>migrateModel: trigger schema migration
migrateModel->>writeEmpty: append empty dataset<br/>with mergeSchema=true
writeEmpty-->>migrateModel: migration complete
getBkDf->>retryLoad: retry table read
retryLoad-->>getBkDf: success (schema now compatible)
getBkDf-->>Caller: return DataFrame/Dataset
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (1)
63-75: ⚡ Quick winSeed a legacy row and assert the migrated read result.
This only exercises an empty pre-migration table, so it doesn't prove that existing bookkeeping rows still deserialize after the new columns are added. Please write one old-schema record, run
getDataChunks, and assert that the row is returned after migration instead of only checking for "no exception".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala` around lines 63 - 75, The test currently only writes an empty Delta table and checks no exception on migration; instead, insert one legacy-format bookkeeping row (a DataChunk record serialized without the new columns, i.e., omit batchId and appendedRecordCount) into the Delta at recordsPath, then call BookkeeperDeltaPath.getDataChunks("table1", LocalDate.parse("2018-02-18"), Some(234L)) and assert the returned Seq contains that migrated row (e.g., size == 1 and fields match expected values or defaults). Locate the test setup around BookkeeperDeltaPath and recordsPath and modify it to create and write a single pre-migration row, then replace the existing no-op assertion with an explicit assertion that the migrated row is returned.pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)
82-97: ⚡ Quick winVerify migration with a persisted legacy record, not just an empty table.
Because this fixture is empty, the test can't catch regressions where schema merge succeeds but old rows still fail when read back through
DataChunk. Add one record in the pre-migration schema and assertgetDataChunksreturns it after the migration.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala` around lines 82 - 97, The test BookkeeperDeltaTableLongSuite should persist a legacy-format DataChunk row into the empty Delta table before exercising migration: use getNewTablePrefix to build bkTable, create a DataFrame with one legacy DataChunk-like record (matching pre-migration columns, e.g. without new fields), write it with df.write.format("delta").saveAsTable(bkTable), then call getBookkeeper(prefix, 123L) and assert that bk.getDataChunks("table1", LocalDate.parse("2018-02-18"), Some(123L)) returns the persisted record (compare key fields or deserialize into DataChunk), ensuring migration actually reads legacy rows; update the test around bkTable, DataChunk, getBookkeeper and getDataChunks to add this insertion and the assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In
`@pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 53-65: The current try/catch in BookkeeperDeltaTable that calls
migrateModel() for any AnalysisException is too broad; change the catch so that
you only call migrateModel() when the exception indicates unresolved
column/schema-evolution (e.g., ex.getMessage contains "UNRESOLVED_COLUMN" or
equivalent Spark 2/3 schema evolution indicator) and for all other
AnalysisException cases rethrow the exception; update the df initialization
handling around spark.table(recordsFullTableName).as[DataChunk] and
migrateModel() so only the specific unresolved-column condition triggers
migration while other AnalysisException and unexpected throwables are
propagated.
In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 54-65: The try/catch around
spark.table(recordsFullTableName).as[DataChunk] is too broad; change it so only
AnalysisException instances that explicitly indicate a schema/model mismatch
(e.g., messages containing "UNRESOLVED_COLUMN" or the Spark‑2 style "cannot
resolve" / unresolved column text) trigger migrateModel() and a retry, and
rethrow any other AnalysisException or Throwable so real errors (missing table,
permission issues, misnamed table) are not silently masked; update the catch
patterns to match message content before calling migrateModel(), and ensure
other exceptions are rethrown unchanged.
In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 54-65: The current try/catch in BookkeeperDeltaTable that catches
all AnalysisException and calls migrateModel() is too broad; change it so
migration is attempted only for confirmed unresolved-column/schema-mismatch
errors and all other exceptions are rethrown. Specifically, replace the blanket
"case _: AnalysisException" branch with a guarded case that checks the exception
message (e.g., case ex: AnalysisException if
ex.getMessage.contains("UNRESOLVED_COLUMN") ||
ex.getMessage.toLowerCase.contains("cannot resolve") => migrateModel();
spark.table(recordsFullTableName).as[DataChunk]) and keep the existing guarded
Throwable branch or collapse them into a single guarded case for messages
indicating unresolved-column; for any other exceptions, do not swallow
them—rethrow (or let them propagate) so df creation failures are not turned into
silent state loss.
In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`:
- Around line 60-71: The current catch always migrates on any AnalysisException;
change it so only AnalysisExceptions that indicate an
unresolved-column/schema-mismatch trigger migration: in BookkeeperDeltaPath wrap
load() and replace the broad case _: AnalysisException with a guarded case like
case ex: AnalysisException if ex.getMessage.contains("UNRESOLVED_COLUMN") ||
ex.getMessage.toLowerCase.contains("cannot resolve") (or another definitive
schema-mismatch marker your Spark version emits) so you call
migrateModel(recordsPath) and load() only then, and rethrow ex for any other
AnalysisException; remove or consolidate the separate Throwable branch
accordingly so unexpected errors are not swallowed.
---
Nitpick comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala`:
- Around line 63-75: The test currently only writes an empty Delta table and
checks no exception on migration; instead, insert one legacy-format bookkeeping
row (a DataChunk record serialized without the new columns, i.e., omit batchId
and appendedRecordCount) into the Delta at recordsPath, then call
BookkeeperDeltaPath.getDataChunks("table1", LocalDate.parse("2018-02-18"),
Some(234L)) and assert the returned Seq contains that migrated row (e.g., size
== 1 and fields match expected values or defaults). Locate the test setup around
BookkeeperDeltaPath and recordsPath and modify it to create and write a single
pre-migration row, then replace the existing no-op assertion with an explicit
assertion that the migrated row is returned.
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala`:
- Around line 82-97: The test BookkeeperDeltaTableLongSuite should persist a
legacy-format DataChunk row into the empty Delta table before exercising
migration: use getNewTablePrefix to build bkTable, create a DataFrame with one
legacy DataChunk-like record (matching pre-migration columns, e.g. without new
fields), write it with df.write.format("delta").saveAsTable(bkTable), then call
getBookkeeper(prefix, 123L) and assert that bk.getDataChunks("table1",
LocalDate.parse("2018-02-18"), Some(123L)) returns the persisted record (compare
key fields or deserialize into DataChunk), ensuring migration actually reads
legacy rows; update the test around bkTable, DataChunk, getBookkeeper and
getDataChunks to add this insertion and the assertion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2897e9a5-0262-4811-b794-fcbcb7faa970
📒 Files selected for processing (6)
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scalapramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scalapramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
Unit Test Coverage
Files
|
…related to unresolved new columns.
Closes #754
Summary by CodeRabbit
Bug Fixes
Tests