Skip to content

#754 Add schema migration support for Delta bookkeeper tables via mergeSchema on model mismatch#755

Merged
yruslan merged 2 commits into
mainfrom
bugfix/754-fix-delta-migration
May 21, 2026
Merged

#754 Add schema migration support for Delta bookkeeper tables via mergeSchema on model mismatch#755
yruslan merged 2 commits into
mainfrom
bugfix/754-fix-delta-migration

Conversation

@yruslan
Copy link
Copy Markdown
Collaborator

@yruslan yruslan commented May 21, 2026

Closes #754

Summary by CodeRabbit

  • Bug Fixes

    • Improved resilience when reading Delta tables with schema incompatibilities. The system now automatically detects and recovers from unresolved column errors by performing a schema migration and retrying the read operation.
  • Tests

    • Added test coverage for schema migration workflows across Delta table operations.

Review Change Stack

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 21, 2026

Warning

Rate limit exceeded

@yruslan has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 39 minutes and 20 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e7618e53-ef76-4662-a404-91a37646a9ad

📥 Commits

Reviewing files that changed from the base of the PR and between 496630f and 6d8c1a8.

📒 Files selected for processing (4)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala
  • pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala

Walkthrough

BookkeeperDeltaPath and BookkeeperDeltaTable now detect and recover from Delta table schema mismatches by wrapping reads in exception handlers that recognize unresolved-column errors, invoke an automatic model migration (appending an empty dataset with mergeSchema enabled), and retry the table read. Changes are applied across BookkeeperDeltaPath and three Scala versions of BookkeeperDeltaTable, with corresponding test coverage.

Changes

Delta Bookkeeper Schema Migration

Layer / File(s) Summary
BookkeeperDeltaPath schema migration and retry
pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala
Imports updated to wildcard Spark SQL imports; migrateModel and migrateModelViaEmptyDataset helpers added to write empty datasets with schema merge enabled; getBkDf wraps Delta read in try/catch to trigger migration and retry on AnalysisException or unresolved-column errors.
BookkeeperDeltaTable Scala 2.11 schema migration and retry
pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
AnalysisException imported for error handling; migrateModel and migrateModelViaEmptyDataset helpers added to append empty typed Datasets with mergeSchema enabled; getBkDf wraps spark.table(...).as[DataChunk] read in try/catch to migrate and retry on AnalysisException or UNRESOLVED_COLUMN message.
BookkeeperDeltaTable Scala 2.12 schema migration and retry
pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
Spark SQL imports broadened to wildcard; migrateModel and migrateModelViaEmptyDataset helpers added to write empty delta datasets with mergeSchema enabled; getBkDf wraps table read in try/catch to invoke migration and retry on AnalysisException or unresolved-column errors.
BookkeeperDeltaTable Scala 2.13 schema migration and retry
pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
AnalysisException imported for schema-resolution error handling; migrateModel helper added to delegate to migrateModelViaEmptyDataset for writing empty Delta datasets with mergeSchema enabled; getBkDf wraps recordsFullTableName read in try/catch to migrate and retry on AnalysisException or UNRESOLVED_COLUMN message.
Migration test coverage
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala, pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala
Test suites add DataChunk and LocalDate imports; new "test migrations work properly" cases create temporary Delta storage with pre-migration schema (missing batchId and appendedRecordCount), instantiate bookkeeper classes, and validate that getDataChunks succeeds after automatic migration.

Sequence Diagram

sequenceDiagram
  participant Caller
  participant getBkDf
  participant loadDelta
  participant migrateModel
  participant writeEmpty
  participant retryLoad
  
  Caller->>getBkDf: invoke read
  getBkDf->>loadDelta: attempt table read
  loadDelta--xgetBkDf: AnalysisException (unresolved column)
  getBkDf->>migrateModel: trigger schema migration
  migrateModel->>writeEmpty: append empty dataset<br/>with mergeSchema=true
  writeEmpty-->>migrateModel: migration complete
  getBkDf->>retryLoad: retry table read
  retryLoad-->>getBkDf: success (schema now compatible)
  getBkDf-->>Caller: return DataFrame/Dataset
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Poem

🐰 A schema's age is just a state,
When columns vanish, do not wait!
We write out empty, merge with grace,
And read again in the same place.
Old bookkeeping, new and bright—
Migration's dance makes all things right! 🌟

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding schema migration support for Delta bookkeeper tables via mergeSchema when models mismatch.
Linked Issues check ✅ Passed The implementation addresses the linked issue #754 by adding automatic schema migration via mergeSchema to handle unresolved column errors when new fields like batchId are introduced.
Out of Scope Changes check ✅ Passed All changes are directly related to implementing schema migration for Delta bookkeeper tables as specified in issue #754; no extraneous changes detected.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch bugfix/754-fix-delta-migration

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🧹 Nitpick comments (2)
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala (1)

63-75: ⚡ Quick win

Seed a legacy row and assert the migrated read result.

This only exercises an empty pre-migration table, so it doesn't prove that existing bookkeeping rows still deserialize after the new columns are added. Please write one old-schema record, run getDataChunks, and assert that the row is returned after migration instead of only checking for "no exception".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala`
around lines 63 - 75, The test currently only writes an empty Delta table and
checks no exception on migration; instead, insert one legacy-format bookkeeping
row (a DataChunk record serialized without the new columns, i.e., omit batchId
and appendedRecordCount) into the Delta at recordsPath, then call
BookkeeperDeltaPath.getDataChunks("table1", LocalDate.parse("2018-02-18"),
Some(234L)) and assert the returned Seq contains that migrated row (e.g., size
== 1 and fields match expected values or defaults). Locate the test setup around
BookkeeperDeltaPath and recordsPath and modify it to create and write a single
pre-migration row, then replace the existing no-op assertion with an explicit
assertion that the migrated row is returned.
pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala (1)

82-97: ⚡ Quick win

Verify migration with a persisted legacy record, not just an empty table.

Because this fixture is empty, the test can't catch regressions where schema merge succeeds but old rows still fail when read back through DataChunk. Add one record in the pre-migration schema and assert getDataChunks returns it after the migration.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala`
around lines 82 - 97, The test BookkeeperDeltaTableLongSuite should persist a
legacy-format DataChunk row into the empty Delta table before exercising
migration: use getNewTablePrefix to build bkTable, create a DataFrame with one
legacy DataChunk-like record (matching pre-migration columns, e.g. without new
fields), write it with df.write.format("delta").saveAsTable(bkTable), then call
getBookkeeper(prefix, 123L) and assert that bk.getDataChunks("table1",
LocalDate.parse("2018-02-18"), Some(123L)) returns the persisted record (compare
key fields or deserialize into DataChunk), ensuring migration actually reads
legacy rows; update the test around bkTable, DataChunk, getBookkeeper and
getDataChunks to add this insertion and the assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 53-65: The current try/catch in BookkeeperDeltaTable that calls
migrateModel() for any AnalysisException is too broad; change the catch so that
you only call migrateModel() when the exception indicates unresolved
column/schema-evolution (e.g., ex.getMessage contains "UNRESOLVED_COLUMN" or
equivalent Spark 2/3 schema evolution indicator) and for all other
AnalysisException cases rethrow the exception; update the df initialization
handling around spark.table(recordsFullTableName).as[DataChunk] and
migrateModel() so only the specific unresolved-column condition triggers
migration while other AnalysisException and unexpected throwables are
propagated.

In
`@pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 54-65: The try/catch around
spark.table(recordsFullTableName).as[DataChunk] is too broad; change it so only
AnalysisException instances that explicitly indicate a schema/model mismatch
(e.g., messages containing "UNRESOLVED_COLUMN" or the Spark‑2 style "cannot
resolve" / unresolved column text) trigger migrateModel() and a retry, and
rethrow any other AnalysisException or Throwable so real errors (missing table,
permission issues, misnamed table) are not silently masked; update the catch
patterns to match message content before calling migrateModel(), and ensure
other exceptions are rethrown unchanged.

In
`@pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala`:
- Around line 54-65: The current try/catch in BookkeeperDeltaTable that catches
all AnalysisException and calls migrateModel() is too broad; change it so
migration is attempted only for confirmed unresolved-column/schema-mismatch
errors and all other exceptions are rethrown. Specifically, replace the blanket
"case _: AnalysisException" branch with a guarded case that checks the exception
message (e.g., case ex: AnalysisException if
ex.getMessage.contains("UNRESOLVED_COLUMN") ||
ex.getMessage.toLowerCase.contains("cannot resolve") => migrateModel();
spark.table(recordsFullTableName).as[DataChunk]) and keep the existing guarded
Throwable branch or collapse them into a single guarded case for messages
indicating unresolved-column; for any other exceptions, do not swallow
them—rethrow (or let them propagate) so df creation failures are not turned into
silent state loss.

In
`@pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala`:
- Around line 60-71: The current catch always migrates on any AnalysisException;
change it so only AnalysisExceptions that indicate an
unresolved-column/schema-mismatch trigger migration: in BookkeeperDeltaPath wrap
load() and replace the broad case _: AnalysisException with a guarded case like
case ex: AnalysisException if ex.getMessage.contains("UNRESOLVED_COLUMN") ||
ex.getMessage.toLowerCase.contains("cannot resolve") (or another definitive
schema-mismatch marker your Spark version emits) so you call
migrateModel(recordsPath) and load() only then, and rethrow ex for any other
AnalysisException; remove or consolidate the separate Throwable branch
accordingly so unexpected errors are not swallowed.

---

Nitpick comments:
In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala`:
- Around line 63-75: The test currently only writes an empty Delta table and
checks no exception on migration; instead, insert one legacy-format bookkeeping
row (a DataChunk record serialized without the new columns, i.e., omit batchId
and appendedRecordCount) into the Delta at recordsPath, then call
BookkeeperDeltaPath.getDataChunks("table1", LocalDate.parse("2018-02-18"),
Some(234L)) and assert the returned Seq contains that migrated row (e.g., size
== 1 and fields match expected values or defaults). Locate the test setup around
BookkeeperDeltaPath and recordsPath and modify it to create and write a single
pre-migration row, then replace the existing no-op assertion with an explicit
assertion that the migrated row is returned.

In
`@pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala`:
- Around line 82-97: The test BookkeeperDeltaTableLongSuite should persist a
legacy-format DataChunk row into the empty Delta table before exercising
migration: use getNewTablePrefix to build bkTable, create a DataFrame with one
legacy DataChunk-like record (matching pre-migration columns, e.g. without new
fields), write it with df.write.format("delta").saveAsTable(bkTable), then call
getBookkeeper(prefix, 123L) and assert that bk.getDataChunks("table1",
LocalDate.parse("2018-02-18"), Some(123L)) returns the persisted record (compare
key fields or deserialize into DataChunk), ensuring migration actually reads
legacy rows; update the test around bkTable, DataChunk, getBookkeeper and
getDataChunks to add this insertion and the assertion.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 2897e9a5-0262-4811-b794-fcbcb7faa970

📥 Commits

Reviewing files that changed from the base of the PR and between 54ff738 and 496630f.

📒 Files selected for processing (6)
  • pramen/core/src/main/scala/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaPath.scala
  • pramen/core/src/main/scala_2.11/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.12/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/main/scala_2.13/za/co/absa/pramen/core/bookkeeper/BookkeeperDeltaTable.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaPathLongSuite.scala
  • pramen/core/src/test/scala/za/co/absa/pramen/core/tests/bookkeeper/BookkeeperDeltaTableLongSuite.scala

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 21, 2026

Unit Test Coverage

Overall Project 76.94% -0.06% 🍏
Files changed 85.84% 🍏

Module Coverage
pramen:core Jacoco Report 77.94% -0.07% 🍏
Files
Module File Coverage
pramen:core Jacoco Report BookkeeperDeltaPath.scala 97.34% -2.16% 🍏
BookkeeperDeltaTable.scala 88.87% -7.28% 🍏

@yruslan yruslan merged commit 1d87a7b into main May 21, 2026
7 checks passed
@yruslan yruslan deleted the bugfix/754-fix-delta-migration branch May 21, 2026 08:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add migration from old to new bookeeping model for Delta Lake tables and paths

1 participant