Full Resumability PR 1: Create DAG #1994
Conversation
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
|
/ok to test eaaf4f5 |
Greptile SummaryThis PR introduces the first piece of a four-part pipeline-resumability feature: it persists the task-execution DAG to an LMDB file whenever
Confidence Score: 4/5Safe to merge as a foundation layer, but open design gaps in task identity must be resolved before the resume logic in PR 2 can be correct. The lineage store and DAG-recording machinery work correctly for single-parent and deterministic pipelines. The underscore separator collision and non-deterministic parent ordering for distributed FanIn are open gaps foundational to the entire resumability feature. nemo_curator/tasks/tasks.py and nemo_curator/stages/base.py carry the identity and ordering design gaps; all three executor files share the same actor-reuse pattern. Important Files Changed
Sequence DiagramsequenceDiagram
participant PL as Pipeline.run()
participant EX as Executor
participant LWA as LineageWriterActor
participant PB as ProcessingStage.process_batch
participant LS as LineageStore
PL->>PL: assign_root_lineage(initial_tasks)
PL->>EX: execute(stages, tasks, checkpoint_path)
EX->>LWA: "options(name, get_if_exists=True).remote(path)"
loop each stage
EX->>PB: process_batch(tasks)
PB->>PB: assign_child_lineage(parent_paths, result)
PB->>LWA: record_emission.remote(parent_udids, child_udids)
Note over PB,LWA: ray.get() blocks worker until write completes
LWA->>LS: _record_emission_once + task_type update
LWA-->>PB: done
end
EX->>LWA: close.remote()
LWA->>LS: env.close()
EX->>LWA: ray.kill()
EX->>EX: ray.shutdown()
Reviews (3): Last reviewed commit: "Get unique id update" | Re-trigger Greptile |
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | ||
| # DAG structure does. Clear cache directories when changing config. | ||
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] | ||
| self._lineage_path = "_".join(parts) | ||
| self._udid = hashlib.sha256(self._lineage_path.encode()).hexdigest()[:32] |
There was a problem hiding this comment.
Ambiguous
_lineage_path encoding for multi-parent joins
_ is used as the separator between parts, but parent paths themselves already contain _. This means distinct parent-set combinations can produce the same _lineage_path, and therefore the same _udid. For example, a single parent with path "0_1" and child index 0 produces "0_1_0", which is identical to two parents "0" and "1" with child index 0. Any collision silently overwrites one task's edges in the LMDB store with the other's, corrupting the checkpoint DAG.
| def assign_child_lineage( | ||
| parent_paths: list[str], | ||
| result: Task | list[Task] | None, | ||
| ) -> list[Task]: | ||
| """Normalize a stage's ``process()`` result and assign deterministic lineage. | ||
|
|
||
| Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived | ||
| from ``(parent_paths, i)`` so that the same pipeline run twice on the same | ||
| inputs produces byte-identical task IDs. Call this from any custom | ||
| ``process_batch`` override to keep outputs consistent with the rest of the | ||
| pipeline. | ||
|
|
||
| Args: | ||
| parent_paths: One element per logical parent (typically | ||
| ``[task._lineage_path]`` for 1:N stages, or multiple paths for | ||
| join/aggregate stages). | ||
| result: Whatever ``process()`` (or your custom batch logic) returned for | ||
| this parent set — a single task, a list, or ``None``. | ||
|
|
||
| Returns: | ||
| The normalized list of children with lineage assigned. May be empty. | ||
| """ | ||
| if result is None: | ||
| return [] | ||
| children = result if isinstance(result, list) else [result] | ||
| children = [c for c in children if c is not None] | ||
| for i, child in enumerate(children): | ||
| child._set_lineage(parent_paths, i) | ||
| return children |
There was a problem hiding this comment.
Non-deterministic
_lineage_path for multi-parent (FanIn) stages in distributed execution
assign_child_lineage([t._lineage_path for t in tasks], ...) encodes the parent paths in list insertion order. When called from a custom process_batch override in a distributed executor (RayActorPool, RayData, Xenna), the order of tasks can vary between runs depending on scheduler decisions, breaking the stated guarantee that "rerunning the same pipeline shape on the same inputs produces byte-identical udids." The same pipeline with the same inputs could produce "0_0_1_0_2_0_0" on one run and "1_0_0_0_2_0_0" on another, making checkpoint keys incompatible across runs.
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | ||
| # DAG structure does. Clear cache directories when changing config. | ||
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] |
There was a problem hiding this comment.
The comment is a leftover artifact from another context and doesn't describe what the function actually does.
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | |
| # DAG structure does. Clear cache directories when changing config. | |
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] | |
| def _set_lineage(self, parent_lineage_paths: list[str], child_index: int) -> None: | |
| parts = [*[p for p in parent_lineage_paths if p], str(child_index)] |
| ) -> list[Task]: | ||
| """Normalize a stage's ``process()`` result and assign deterministic lineage. | ||
|
|
||
| Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived |
There was a problem hiding this comment.
The docstring says
_uuid but the field assigned by _set_lineage is _udid. _uuid is the separate random UUID4 field that is not touched by this function.
| Each surviving ``children[i]`` gets ``_lineage_path`` and ``_uuid`` derived | |
| Each surviving ``children[i]`` gets ``_lineage_path`` and ``_udid`` derived |
| session_id = uuid.uuid4().bytes | ||
|
|
||
| lineage_actor = None | ||
| try: | ||
| # Initialize Ray and register loguru serializer | ||
| register_loguru_serializer() | ||
| ray.init(ignore_reinit_error=True, runtime_env=_parse_runtime_env(self.config.get("runtime_env", {}))) | ||
| if checkpoint_path is not None: |
There was a problem hiding this comment.
get_if_exists=True silently reuses an actor with a different checkpoint path
If a LineageWriterActor from a previous pipeline run is still alive, get_if_exists=True returns the existing actor without calling its constructor — meaning the path=absolute_checkpoint_path argument is silently ignored and the new run writes lineage into the old file. The same pattern appears in ray_data/executor.py and xenna/executor.py. Consider using a run-scoped unique actor name or validating the returned actor's path.
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Signed-off-by: Onur Yilmaz <oyilmaz@nvidia.com>
Task DAG checkpointing (PR 1/4 for pipeline resumability)
This is the first of four PRs that together will add resumability support to NeMo Curator pipelines. The remaining three will build on the on-disk DAG persisted by this PR. Each future PR will be reviewable in isolation:
And first, we need this PR #1993 to be merged.
What this PR does
Persists the task DAG of any pipeline that opts into checkpointing. When the user passes
Pipeline.run(checkpoint_path=...), every emission flowing throughprocess_batchis recorded to a single LMDB file keyed by the task's deterministic_udid. After the run, the file holds, for each task:_udids_udidstask_typetag — one ofsource,middle,leaf,source_leafcompletedboolean (storage only; not yet auto-set — reserved for PR 2)This is the foundation the rest of resumability will read from.
Design highlights
_lineage_path(index path through the pipeline DAG) and_udid = sha256(_lineage_path)[:32], set by_set_lineage(nemo_curator/tasks/tasks.py). Rerunning the same pipeline shape on the same inputs produces byte-identical udids — that's what lets a future run match work to a checkpoint.LineageWriterActoris a named, detached Ray actor that owns the LMDB env. Because exactly one process writes to the file, no cross-process locking is required and the checkpoint can live on shared storage. All executors (Xenna, RayActorPool, RayData) spawn the actor whencheckpoint_pathis set and tear it down infinally:.record_lineageis a no-op when noLineageWriterActoris registered — pipelines without checkpointing pay nothing.task_typeis recomputed eagerly inside the same write txn from current edge presence and so promotes monotonically (source_leaf→source/leaf→middle) — safe under any worker scheduling order.completedis a single-key flag sub-DB.dupsortdrops exact duplicates and accumulates new parents.process_batch. The default and any custom override callsassign_child_lineage(parent_paths, result)thenrecord_lineage([parent._udid], [c._udid for c in children])— two clearly named, side-effect-free calls.Out of scope (deferred)
process_batchoverrides (stages innemo_curator/stages/deduplication/**andnemo_curator/stages/audio/**) to the lineage contract. They aren't called against by the default contract today, so lineage for those stages will be filled in as part of PR 2.completedsub-DB and themark_completed/is_completedAPI exist; nothing in the framework callsmark_completedyet.Test plan
tests/utils/test_lineage_store.py—LineageStorestorage-layer suite (idempotent retries, incremental parent attribution, monotonictask_typepromotion, source/middle/leaf/source_leaf classification, completion flag round-trip) + actor-routed tests forrecord_lineage.tests/pipelines/test_lineage_integration.py— end-to-end throughProcessingStage.process_batchagainst a realLineageWriterActor. Drives a 4-stage fanout/passthrough/fanin/passthrough pipeline and asserts the on-disk DAG matches the expected udids/types/edges. Includes a no-actor-registered case to confirm zero-overhead.tests/tasks/,tests/stages/common/,tests/pipelines/) is green; ruff clean on every touched file.