Skip to content

Full Resumability PR 1: Create DAG #1994

Open
oyilmaz-nvidia wants to merge 8 commits into
NVIDIA-NeMo:mainfrom
oyilmaz-nvidia:onur/parent-children-lmdb
Open

Full Resumability PR 1: Create DAG #1994
oyilmaz-nvidia wants to merge 8 commits into
NVIDIA-NeMo:mainfrom
oyilmaz-nvidia:onur/parent-children-lmdb

Conversation

@oyilmaz-nvidia
Copy link
Copy Markdown
Contributor

@oyilmaz-nvidia oyilmaz-nvidia commented May 16, 2026

Task DAG checkpointing (PR 1/4 for pipeline resumability)

This is the first of four PRs that together will add resumability support to NeMo Curator pipelines. The remaining three will build on the on-disk DAG persisted by this PR. Each future PR will be reviewable in isolation:

  • PR 2 — run BFS starting from leaf tasks to mark tasks completed.
  • PR 3 — resume logic: skip already-completed work on rerun.
  • PR 4 — add support for the stages that overwrites process_batch.

And first, we need this PR #1993 to be merged.

What this PR does

Persists the task DAG of any pipeline that opts into checkpointing. When the user passes Pipeline.run(checkpoint_path=...), every emission flowing through process_batch is recorded to a single LMDB file keyed by the task's deterministic _udid. After the run, the file holds, for each task:

  • the list of parent _udids
  • the list of child _udids
  • a task_type tag — one of source, middle, leaf, source_leaf
  • a completed boolean (storage only; not yet auto-set — reserved for PR 2)

This is the foundation the rest of resumability will read from.

Design highlights

  • Deterministic identity. Tasks now carry _lineage_path (index path through the pipeline DAG) and _udid = sha256(_lineage_path)[:32], set by _set_lineage (nemo_curator/tasks/tasks.py). Rerunning the same pipeline shape on the same inputs produces byte-identical udids — that's what lets a future run match work to a checkpoint.
  • Single-writer Ray actor for NFS/Lustre safety. LineageWriterActor is a named, detached Ray actor that owns the LMDB env. Because exactly one process writes to the file, no cross-process locking is required and the checkpoint can live on shared storage. All executors (Xenna, RayActorPool, RayData) spawn the actor when checkpoint_path is set and tear it down in finally:.
  • Actor presence is the gate. record_lineage is a no-op when no LineageWriterActor is registered — pipelines without checkpointing pay nothing.
  • Four LMDB sub-DBs. Edges are append-only (dupsort) and idempotent under retries; task_type is recomputed eagerly inside the same write txn from current edge presence and so promotes monotonically (source_leafsource/leafmiddle) — safe under any worker scheduling order. completed is a single-key flag sub-DB.
  • Incremental parent attribution. Multi-parent join stages can record edges in multiple calls; dupsort drops exact duplicates and accumulates new parents.
  • Two-step lineage contract for process_batch. The default and any custom override calls assign_child_lineage(parent_paths, result) then record_lineage([parent._udid], [c._udid for c in children]) — two clearly named, side-effect-free calls.

Out of scope (deferred)

  • Migrating existing custom process_batch overrides (stages in nemo_curator/stages/deduplication/** and nemo_curator/stages/audio/**) to the lineage contract. They aren't called against by the default contract today, so lineage for those stages will be filled in as part of PR 2.
  • Auto-marking tasks completed. The completed sub-DB and the mark_completed / is_completed API exist; nothing in the framework calls mark_completed yet.
  • Resume / restart from a checkpoint. Read-only DAG today.

Test plan

  • tests/utils/test_lineage_store.pyLineageStore storage-layer suite (idempotent retries, incremental parent attribution, monotonic task_type promotion, source/middle/leaf/source_leaf classification, completion flag round-trip) + actor-routed tests for record_lineage.
  • tests/pipelines/test_lineage_integration.py — end-to-end through ProcessingStage.process_batch against a real LineageWriterActor. Drives a 4-stage fanout/passthrough/fanin/passthrough pipeline and asserts the on-disk DAG matches the expected udids/types/edges. Includes a no-actor-registered case to confirm zero-overhead.
  • No-checkpoint pipelines unchanged: full CPU test suite (tests/tasks/, tests/stages/common/, tests/pipelines/) is green; ruff clean on every touched file.

Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 16, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
@oyilmaz-nvidia
Copy link
Copy Markdown
Contributor Author

/ok to test eaaf4f5

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 16, 2026

Greptile Summary

This PR introduces the first piece of a four-part pipeline-resumability feature: it persists the task-execution DAG to an LMDB file whenever Pipeline.run(checkpoint_path=...) is provided. Each task gets a deterministic identity (_udid = sha256(_lineage_path)[:32]), edges are written by a single-writer Ray actor (LineageWriterActor), and the store classifies each task as source, middle, leaf, or source_leaf — forming the on-disk foundation that future PRs will read to resume incomplete runs.

  • New module lineage_store.py: LineageStore (direct LMDB owner with dupsort edges) + LineageWriterActor (named, detached Ray actor wrapping the store) + record_lineage helper called from ProcessingStage.process_batch.
  • Task and stage changes: Task._set_lineage / _lineage_path / _udid fields added in tasks.py; assign_child_lineage, assign_root_lineage, and the record_lineage call are wired into the default process_batch in stages/base.py.
  • Executor plumbing: All three executors (RayActorPool, RayData, Xenna) receive a checkpoint_path kwarg, spawn the writer actor in try, and kill it in finally.

Confidence Score: 4/5

Safe to merge as a foundation layer, but open design gaps in task identity must be resolved before the resume logic in PR 2 can be correct.

The lineage store and DAG-recording machinery work correctly for single-parent and deterministic pipelines. The underscore separator collision and non-deterministic parent ordering for distributed FanIn are open gaps foundational to the entire resumability feature.

nemo_curator/tasks/tasks.py and nemo_curator/stages/base.py carry the identity and ordering design gaps; all three executor files share the same actor-reuse pattern.

Important Files Changed

Filename Overview
nemo_curator/utils/lineage_store.py New LMDB-backed lineage store: LineageStore, LineageWriterActor, and record_lineage helper. Core logic is correct (ACID writes, dupsort idempotency, monotonic task_type promotion). Minor concerns: ray.get_actor() GCS lookup on every record_lineage call, always-writable LMDB open (no readonly mode), and single retry on MapFullError.
nemo_curator/tasks/tasks.py Adds _lineage_path and _udid fields plus _set_lineage(). Underscore separator in _lineage_path can produce collisions for multi-parent joins (flagged in previous review thread). Core sha256-truncation logic is correct for single-parent paths.
nemo_curator/stages/base.py Adds assign_child_lineage, assign_root_lineage, and record_lineage call in default process_batch. Non-deterministic parent-path ordering in distributed FanIn overrides is a known open issue (flagged in previous thread). The default single-parent path through process_batch is correct.
nemo_curator/backends/ray_actor_pool/executor.py Spawns LineageWriterActor with get_if_exists=True (actor reuse across runs flagged in previous thread). Correctly uses try/finally with close() + ray.kill() + ray.shutdown().
nemo_curator/backends/ray_data/executor.py Same get_if_exists=True pattern as ray_actor_pool executor. Correctly tears down actor in finally block.
nemo_curator/backends/xenna/executor.py Same get_if_exists=True pattern. Correctly tears down actor in finally block.
nemo_curator/pipeline/pipeline.py Adds checkpoint_path parameter to Pipeline.run(), calls assign_root_lineage on initial_tasks, and passes path to the executor. Clean and straightforward.
tests/utils/test_lineage_store.py Thorough storage-layer suite covering idempotency, incremental parent attribution, monotonic type promotion, self-loop handling, diamond dedup, and actor-routed round-trips.
tests/pipelines/test_lineage_integration.py End-to-end integration test driving a 4-stage fanout/passthrough/fanin/passthrough pipeline through BaseStageAdapter with a real LineageWriterActor; verifies full DAG topology and the no-actor no-op case.

Sequence Diagram

sequenceDiagram
    participant PL as Pipeline.run()
    participant EX as Executor
    participant LWA as LineageWriterActor
    participant PB as ProcessingStage.process_batch
    participant LS as LineageStore

    PL->>PL: assign_root_lineage(initial_tasks)
    PL->>EX: execute(stages, tasks, checkpoint_path)
    EX->>LWA: "options(name, get_if_exists=True).remote(path)"
    loop each stage
        EX->>PB: process_batch(tasks)
        PB->>PB: assign_child_lineage(parent_paths, result)
        PB->>LWA: record_emission.remote(parent_udids, child_udids)
        Note over PB,LWA: ray.get() blocks worker until write completes
        LWA->>LS: _record_emission_once + task_type update
        LWA-->>PB: done
    end
    EX->>LWA: close.remote()
    LWA->>LS: env.close()
    EX->>LWA: ray.kill()
    EX->>EX: ray.shutdown()
Loading

Reviews (3): Last reviewed commit: "Get unique id update" | Re-trigger Greptile

Comment thread nemo_curator/tasks/tasks.py Outdated
Comment on lines +55 to +59
def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None:
# DAG structure does. Clear cache directories when changing config.
parts = [*[p for p in parent_lineage_paths if p], str(child_index)]
self._lineage_path = "_".join(parts)
self._udid = hashlib.sha256(self._lineage_path.encode()).hexdigest()[:32]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Ambiguous _lineage_path encoding for multi-parent joins

_ is used as the separator between parts, but parent paths themselves already contain _. This means distinct parent-set combinations can produce the same _lineage_path, and therefore the same _udid. For example, a single parent with path "0_1" and child index 0 produces "0_1_0", which is identical to two parents "0" and "1" with child index 0. Any collision silently overwrites one task's edges in the LMDB store with the other's, corrupting the checkpoint DAG.

Comment on lines +39 to +67
def assign_child_lineage(
parent_paths: list[str],
result: Task | list[Task] | None,
) -> list[Task]:
"""Normalize a stage's ``process()`` result and assign deterministic lineage.

Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived
from ``(parent_paths, i)`` so that the same pipeline run twice on the same
inputs produces byte-identical task IDs. Call this from any custom
``process_batch`` override to keep outputs consistent with the rest of the
pipeline.

Args:
parent_paths: One element per logical parent (typically
``[task._lineage_path]`` for 1:N stages, or multiple paths for
join/aggregate stages).
result: Whatever ``process()`` (or your custom batch logic) returned for
this parent set — a single task, a list, or ``None``.

Returns:
The normalized list of children with lineage assigned. May be empty.
"""
if result is None:
return []
children = result if isinstance(result, list) else [result]
children = [c for c in children if c is not None]
for i, child in enumerate(children):
child._set_lineage(parent_paths, i)
return children
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Non-deterministic _lineage_path for multi-parent (FanIn) stages in distributed execution

assign_child_lineage([t._lineage_path for t in tasks], ...) encodes the parent paths in list insertion order. When called from a custom process_batch override in a distributed executor (RayActorPool, RayData, Xenna), the order of tasks can vary between runs depending on scheduler decisions, breaking the stated guarantee that "rerunning the same pipeline shape on the same inputs produces byte-identical udids." The same pipeline with the same inputs could produce "0_0_1_0_2_0_0" on one run and "1_0_0_0_2_0_0" on another, making checkpoint keys incompatible across runs.

Comment thread nemo_curator/tasks/tasks.py Outdated
Comment on lines +55 to +57
def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None:
# DAG structure does. Clear cache directories when changing config.
parts = [*[p for p in parent_lineage_paths if p], str(child_index)]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The comment is a leftover artifact from another context and doesn't describe what the function actually does.

Suggested change
def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None:
# DAG structure does. Clear cache directories when changing config.
parts = [*[p for p in parent_lineage_paths if p], str(child_index)]
def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None:
parts = [*[p for p in parent_lineage_paths if p], str(child_index)]

Comment thread nemo_curator/stages/base.py Outdated
) -> list[Task]:
"""Normalize a stage's ``process()`` result and assign deterministic lineage.

Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The docstring says _uuid but the field assigned by _set_lineage is _udid. _uuid is the separate random UUID4 field that is not touched by this function.

Suggested change
Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived
Each surviving ``children[i]`` gets ``_lineage_path`` and ``_udid`` derived

Comment on lines 103 to +110
session_id = uuid.uuid4().bytes

lineage_actor = None
try:
# Initialize Ray and register loguru serializer
register_loguru_serializer()
ray.init(ignore_reinit_error=True, runtime_env=_parse_runtime_env(self.config.get("runtime_env", {})))
if checkpoint_path is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 get_if_exists=True silently reuses an actor with a different checkpoint path

If a LineageWriterActor from a previous pipeline run is still alive, get_if_exists=True returns the existing actor without calling its constructor — meaning the path=absolute_checkpoint_path argument is silently ignored and the new run writes lineage into the old file. The same pattern appears in ray_data/executor.py and xenna/executor.py. Consider using a run-scoped unique actor name or validating the returned actor's path.

@oyilmaz-nvidia oyilmaz-nvidia changed the title Onur/parent children lmdb Full Resumability PR 1: Create DAG May 16, 2026
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant