Pipeline resumability via source-level counter checkpointing#2033
Pipeline resumability via source-level counter checkpointing#2033abhinavg4 wants to merge 10 commits into
Conversation
Greptile SummaryAdds opt-in pipeline resumability via a singleton Ray actor (
Confidence Score: 3/5Two defects in the changed code path need fixes before merging: a raw-None crash in the pre-source fan-out branch, and uncertainty about silent delta loss at high throughput when the actor queue is full. The pre-source sentinel filter omits
Important Files Changed
Sequence DiagramsequenceDiagram
participant PR as Pipeline.run(checkpoint_path)
participant RA as ResumabilityActor (detached)
participant LMDB as LMDB file
participant SA as BaseStageAdapter (source stage)
participant DS as BaseStageAdapter (downstream)
participant SK as BaseStageAdapter (sink)
PR->>RA: "spawn (get_if_exists=True, max_pending_calls=100)"
PR->>SA: executor.execute(stages, initial_tasks)
SA->>RA: are_completed([sid0, sid1, ...]) [ray.get blocking]
RA-->>SA: [False, True, ...]
SA->>SA: filter completed sources
SA->>RA: apply_deltas([(task_id, sid, +1), ...]) [fire-and-forget]
loop each non-sink stage batch
DS->>RA: apply_deltas([(task_id, sid, 0 or -1), ...]) [fire-and-forget]
end
SK->>RA: apply_deltas([(task_id, sid, -1), ...]) [fire-and-forget]
RA->>RA: _pending[sid] reaches 0
RA->>LMDB: write sid as complete
PR->>RA: "close() [ray.get, timeout=10s]"
PR->>RA: ray.kill(actor)
Note over PR,LMDB: On rerun: actor loads _completed from LMDB, are_completed() skips finished sources
Reviews (4): Last reviewed commit: "Adapt resumability layer to task_id coll..." | Re-trigger Greptile |
| try: | ||
| ray.init(ignore_reinit_error=True) | ||
| actor_handle = ray.get_actor(ACTOR_NAME) | ||
| final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined] | ||
| if final_errs and not collected_error: | ||
| raise final_errs[0] | ||
| except ray.exceptions.RayActorError: # type: ignore[attr-defined] | ||
| pass | ||
| except Exception as e: # noqa: BLE001 | ||
| logger.warning(f"final resumability error check failed: {e}") |
There was a problem hiding this comment.
Final error check silently swallowed by
except Exception — the raise final_errs[0] on line 352 lives inside the try block that owns the broad except Exception as e handler. When the actor records a NonDeterministicTaskError or NegativePendingCountError that only surfaces at teardown (e.g., from the last in-flight batch), the raise is immediately re-caught and demoted to a logger.warning(...). The pipeline then exits cleanly with return results even though data-integrity errors were detected.
| try: | |
| ray.init(ignore_reinit_error=True) | |
| actor_handle = ray.get_actor(ACTOR_NAME) | |
| final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined] | |
| if final_errs and not collected_error: | |
| raise final_errs[0] | |
| except ray.exceptions.RayActorError: # type: ignore[attr-defined] | |
| pass | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"final resumability error check failed: {e}") | |
| _deferred_error: Exception | None = None | |
| try: | |
| ray.init(ignore_reinit_error=True) | |
| actor_handle = ray.get_actor(ACTOR_NAME) | |
| final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined] | |
| if final_errs and not collected_error: | |
| _deferred_error = final_errs[0] | |
| except ray.exceptions.RayActorError: # type: ignore[attr-defined] | |
| pass | |
| except Exception as e: # noqa: BLE001 | |
| logger.warning(f"final resumability error check failed: {e}") | |
| if _deferred_error is not None: | |
| raise _deferred_error |
| def _apply_impl(self, per_task: list[tuple[str, str, int]]) -> None: | ||
| newly_done: list[str] = [] | ||
| for task_hash, sid, d in per_task: | ||
| existing = self._applied.get(task_hash) | ||
| if existing is not None: | ||
| if existing != d: | ||
| msg = ( | ||
| f"Task {task_hash} produced delta={d} but previously " | ||
| f"produced delta={existing}. User code must be " | ||
| f"deterministic w.r.t. its input for resumability " | ||
| f"(e.g. consistent NoneTask vs real-output decisions)." | ||
| ) | ||
| raise NonDeterministicTaskError(msg) | ||
| continue # idempotent re-fire | ||
| self._applied[task_hash] = d | ||
| if sid in self._completed: | ||
| continue | ||
| self._pending[sid] = self._pending.get(sid, 0) + d | ||
| if self._pending[sid] == 0: | ||
| newly_done.append(sid) | ||
| elif self._pending[sid] < 0: | ||
| msg = ( | ||
| f"source {sid!r} pending count went to {self._pending[sid]}. " | ||
| f"This shouldn't happen with deterministic user code — likely " | ||
| f"a fan-out stage returned a different number of outputs " | ||
| f"between runs, or a NoneTask was issued more times than " | ||
| f"the source had tasks. Last delta applied: {d} for task " | ||
| f"{task_hash}." | ||
| ) | ||
| raise NegativePendingCountError(msg) | ||
| if newly_done: | ||
| self._persist_completed(newly_done) | ||
| for sid in newly_done: | ||
| self._completed.add(sid) |
There was a problem hiding this comment.
Inconsistent
_pending state after mid-batch error
_apply_impl accumulates newly_done entries while iterating the batch, then calls _persist_completed only if the loop finishes cleanly. If the loop raises NegativePendingCountError (e.g., two -1 deltas for the same source in one batch), any source that had already reached zero earlier in that same batch is never persisted and never added to _completed, but _pending[sid] for the failing source is left at a negative value. Subsequent apply_deltas calls on the same actor see neither the correct _completed entry nor a sane counter, which can lead to a false "source completed" if a later delta happens to bring the negative value back to zero.
| def _is_active() -> bool: | ||
| """True if a resumability actor is registered in this Ray cluster.""" | ||
| return _actor() is not None |
There was a problem hiding this comment.
ray.get_actor() GCS round-trip on every processed batch
_is_active() delegates to _actor(), which calls ray.get_actor(ACTOR_NAME) unconditionally whenever Ray is initialized — even for pipelines that never set checkpoint_path. For an executor that runs hundreds of thousands of small batches this adds a GCS lookup per batch. Caching the handle (or the False sentinel) at module level after the first lookup would eliminate the per-batch overhead.
| actor = ResumabilityActor.options( # type: ignore[attr-defined] | ||
| name=ACTOR_NAME, | ||
| lifetime="detached", | ||
| get_if_exists=True, | ||
| _max_pending_calls=100, | ||
| ).remote(str(checkpoint_path)) |
There was a problem hiding this comment.
get_if_exists=True can silently reuse a stale detached actor
When get_if_exists=True, Ray returns the existing actor handle without calling the constructor if an actor named nemo_curator_resumability is already alive — for instance from a previous run that crashed before the cleanup block ran. That stale actor owns a different LMDB file (from its own checkpoint_path), so all new deltas accumulate in the wrong place and are_completed queries reflect the wrong state. The caller has no indication that the path argument was ignored. Consider checking whether the returned actor is pointing at the expected path (e.g., via a path() accessor), or kill any pre-existing actor and spawn fresh.
There was a problem hiding this comment.
Improvements we should probably apply to v0.
Priority:
- Rename terminal stage to sink. Source and sink is better than source and terminal
- Here, instead of erroring out, can we just rewrite the new value? and removing the whole demon logic
- Unify task_id, _uuid and _deterministic_lineage_path_hash.
- support a dynamic dataset
| # hashed into `_deterministic_lineage_path_hash`, the deterministic | ||
| # task id used for retry-deduplication on the resumability actor. | ||
| _lineage_path: str = field(init=False, default="") | ||
| _deterministic_lineage_path_hash: str = field(init=False, default="") |
There was a problem hiding this comment.
Do we really need this? We have task_id, _uuid and _deterministic_lineage_path_hash. They can be combined into a single variable. But before that we need to ensure the usage of task_id and uuid is not affected. Eg:- Writer stage might use task_id right now, we should change to this.
Two changes from review feedback on PR #2033: 1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source" and "sink" reads better than "source" and "terminal". 2. Resumability never fails the pipeline. - Drop `NonDeterministicTaskError`, `NegativePendingCountError`, `_errors`, `errors()`, `_record_error` on the actor. - `apply_deltas` is now never-raising: on a retry firing a different delta for the same task hash, rewrite the pending counter via `(-old + new)` rather than erroring. The newest observation wins. - On the two anomaly cases (different delta for a task whose source is already completed, OR a never-seen task for an already-completed source), log a loud warning AND remove the source from the completed set (in-memory + LMDB) so it's reprocessed cleanly on the next run. The warning points users at https://github.com/NVIDIA-NeMo/Curator to file an issue. - Drop the watchdog thread and `_thread.interrupt_main` in `Pipeline._run_with_resumability`. The method shrinks from ~80 lines to ~25 — just spawn, run, close. Tests updated to assert: rewrite-on-conflict math; un-complete-on- anomaly with LMDB-survival; `apply_deltas` never raises under any input. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.
Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.
Changes:
- `Task` (nemo_curator/tasks/tasks.py):
- DROP `_uuid` field.
- ADD `_lineage_path` field (init=False, default="").
- ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
Always overwrites both `_lineage_path` and `task_id`. The
`child_segment` can be a positional index (int) for plain emissions
or a content-based string (e.g. from `get_deterministic_id()`) for
source-stage emissions where stability across input reordering
matters.
- ADD `get_deterministic_id() -> str | None` method (default None).
Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
- Override `get_deterministic_id()` to return
`get_deterministic_hash(sorted(self.data))` — stable across runs
even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
- Default implementation now calls `_set_lineage` on each emitted
child. Stages that override `process_batch` are responsible for
calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
- `stages/text/modules/add_id.py:71`
- `stages/deduplication/semantic/kmeans.py:235, 395`
- `stages/deduplication/fuzzy/buckets_to_edges.py:83`
- `stages/deduplication/fuzzy/connected_components.py:176`
- `stages/deduplication/fuzzy/minhash.py:309`
Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
`process_batch` (which exercises the new lineage assignment) and
assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
`_set_lineage` (always overwrites, filters empty parent paths,
accepts string segments), `get_deterministic_id` (default None,
FileGroupTask content-hash with stable sort), and the default
`process_batch` lineage assignment.
This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Makes PR-A self-contained: `get_deterministic_id()` is now actually used. The default `process_batch` calls it for stages flagged `is_source_stage=True` and falls back to the positional index otherwise. - Add `is_source_stage: bool = False` and `is_sink_stage: bool = False` ClassVars on `ProcessingStage`. - `Pipeline.build()` now auto-assigns the first stage as source and the last as sink if no stage is explicitly marked. Raises on multiple explicit marks. The sink flag itself has no behavior in this PR — it exists for the resumability layer in a follow-up. - Default `process_batch` uses `child.get_deterministic_id() or i` as the lineage segment when the stage is a source. Content-based ids make `task_id` stable across input reordering for stages whose Task subclass implements `get_deterministic_id` (currently `FileGroupTask`). - Mark `FilePartitioningStage` as `is_source_stage = True` so the default reader path gets content-based ids out of the box. - Tests: `TestSourceStageSegment` (3 tests covering content-hash use, positional fallback, and non-source-stage passthrough); `TestSourceSinkRoleAssignment` in new `tests/pipelines/test_pipeline_roles.py` (defaults, explicit overrides, multi-mark validation). - Also: move `StagePerfStats` import to `TYPE_CHECKING` block in tasks.py (ruff TC001). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
task_id is now framework-controlled — it is assigned exclusively by
`_set_lineage` at each stage boundary. Users no longer have any way
to set it.
- Make `Task.task_id` `init=False, default=""` (was a required `__init__` arg).
- Drop EmptyTask's `task_id="empty"` literal.
- Strip `task_id=...` kwargs from all 78 Task constructors across
nemo_curator/stages/.
- Strip `task_id=...` from all test fixtures (107 files).
- Delete ~40 test assertions like `assert result.task_id == "test_task"`
that exercised the old user-set-task_id-passes-through behavior.
- Drop now-dead helper params (`task_id` arg to `_concatenate`, conftest
helpers, etc.) and dead loop counters that only existed to format task_ids.
Output filenames that previously derived from `task_id` (e.g. `f"{task_id}.parquet"`)
still work — they now use the lineage-derived sha256-32 hash, which is
deterministic across runs.
Two changes from review feedback on PR #2033: 1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source" and "sink" reads better than "source" and "terminal". 2. Resumability never fails the pipeline. - Drop `NonDeterministicTaskError`, `NegativePendingCountError`, `_errors`, `errors()`, `_record_error` on the actor. - `apply_deltas` is now never-raising: on a retry firing a different delta for the same task hash, rewrite the pending counter via `(-old + new)` rather than erroring. The newest observation wins. - On the two anomaly cases (different delta for a task whose source is already completed, OR a never-seen task for an already-completed source), log a loud warning AND remove the source from the completed set (in-memory + LMDB) so it's reprocessed cleanly on the next run. The warning points users at https://github.com/NVIDIA-NeMo/Curator to file an issue. - Drop the watchdog thread and `_thread.interrupt_main` in `Pipeline._run_with_resumability`. The method shrinks from ~80 lines to ~25 — just spawn, run, close. Tests updated to assert: rewrite-on-conflict math; un-complete-on- anomaly with LMDB-survival; `apply_deltas` never raises under any input. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
733fa37 to
ec75bf2
Compare
|
|
||
| if checkpoint_path is None: | ||
| return executor.execute(self.stages, initial_tasks) | ||
| return self._run_with_resumability(executor, initial_tasks, checkpoint_path) |
There was a problem hiding this comment.
I do not like this, the reason this is being done is probably coz we call ray init inside the executor and not here, so we need to differentiate that. Not ideal for sure. Maybe we can make a function for _start_resumability_actor and call it inside each executor. Design choice.
…ng source flag - Rename `child_segment`/`segment` -> `current_task_id_suffix` (this task's own segment of the lineage path). Clearer per review. - Move `get_deterministic_hash` to `nemo_curator/utils/hash_utils.py` so non-text modalities (FileGroupTask, dedup) don't pull in text deps. The text `writer/utils.py` re-exports it for backward compat. - Drop `is_source_stage = True` from FilePartitioningStage; rely on Pipeline.build()'s "first stage is source" default, so users who put a different stage first (e.g. URL generation) get the right behavior. - Fix dead-branch assertion in test_tasks.py (parent lineage is always "" -> filtered, so children are str(i), never "_i").
…t hash imports, test reorg
- task_id is now the readable underscore-joined lineage path itself (no
sha256). Easier to debug; collapses the redundant `_lineage_path` field.
`_set_lineage(parent_task_ids, current_task_id_suffix)`.
- Drop `from __future__ import annotations` from tasks.py; restore the
runtime StagePerfStats import (project requires py>=3.11, so `str | int`
is native).
- get_deterministic_hash lives only in nemo_curator/utils/hash_utils.py;
all call sites import it directly (no writer/utils re-export). Fix the
3 writer tests that mocked it via the writer_utils namespace.
- Test reorg per review:
* get_deterministic_hash tests -> tests/utils/test_hash_utils.py
* FileGroupTask id test -> tests/tasks/test_file_group_tasks.py
(consolidated into one test_deterministic_ids)
* role-assignment tests -> tests/pipelines/test_pipelines.py
TestPipelineBuild (7 -> 3); delete test_pipeline_roles.py
* add test_task_id_lineage to backends/test_integration.py
When `Pipeline.run(checkpoint_path="...")` is set, completed source partitions are tracked across runs in an LMDB file and skipped on rerun. The whole runtime fits in a small surface: - One singleton Ray actor (`ResumabilityActor`) that owns the LMDB. - One hook in `BaseStageAdapter.process_batch`. - One method in `Pipeline.run` for the actor + watchdog lifecycle. - No changes to per-backend executors (Xenna, RayData, RayActorPool). Workers fire counter deltas fire-and-forget; backpressure is handled by Ray's `_max_pending_calls`. The actor writes to LMDB only when a source's pending counter reaches zero (one tiny write per source at end-of-life, not per task or per batch). `NoneTask` and `FailedTask` sentinels let user code in `process_batch` mark per-slot outcomes. Returning `None` from `process()` is auto-wrapped to `NoneTask` for backward compatibility with Curator's existing convention. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes from review feedback on PR #2033: 1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source" and "sink" reads better than "source" and "terminal". 2. Resumability never fails the pipeline. - Drop `NonDeterministicTaskError`, `NegativePendingCountError`, `_errors`, `errors()`, `_record_error` on the actor. - `apply_deltas` is now never-raising: on a retry firing a different delta for the same task hash, rewrite the pending counter via `(-old + new)` rather than erroring. The newest observation wins. - On the two anomaly cases (different delta for a task whose source is already completed, OR a never-seen task for an already-completed source), log a loud warning AND remove the source from the completed set (in-memory + LMDB) so it's reprocessed cleanly on the next run. The warning points users at https://github.com/NVIDIA-NeMo/Curator to file an issue. - Drop the watchdog thread and `_thread.interrupt_main` in `Pipeline._run_with_resumability`. The method shrinks from ~80 lines to ~25 — just spawn, run, close. Tests updated to assert: rewrite-on-conflict math; un-complete-on- anomaly with LMDB-survival; `apply_deltas` never raises under any input. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash → task_id and dropped user-set task_id. Adjusts the resumability adapter, client, and tests to the new field names + reformats per ruff/black.
Two correctness fixes flagged in review: 1. Source stage was using positional index `i` for both `_lineage_path` segment and `_source_id`, ignoring `Task.get_deterministic_id()`. This defeats the whole point of content-based ids: adding a file at the front of the input would shift every source_id and force a full reprocess. Now uses `get_deterministic_id() or i` to match what the default `process_batch` (in `stages/base.py`) already does. 2. Single-input fan-out (1→N) computed `delta = n - 1` from the raw result count, then filtered out `None` / `NoneTask` / `FailedTask` slots. Counter goes off by one per filtered sentinel — a 1→3 fan-out with one NoneTask would fire delta=+2 but only add 2 real children. Fixed by filtering first, then computing `n` from the real count. Adds tests for both: - `test_fanout_filters_sentinels_before_computing_delta` - `test_fanout_all_sentinels_acts_like_filter` - `test_source_stage_uses_content_hash_when_get_deterministic_id_returns_value` - `test_source_stage_falls_back_to_index_when_no_content_id`
Rebased onto the updated PR-A where task_id IS the lineage path (no hash, no separate _lineage_path) and is init=False. - sentinels.py: NoneTask/FailedTask.from_input can't pass task_id= to the constructor anymore (init=False); copy task_id + _source_id after construction instead. - tests: drop task_id= constructor args, replace _lineage_path assertions with task_id, and update the lineage tests to the always-overwrite semantics (no idempotency): an in-place returned task is re-lineaged with its own segment appended.
5af7e17 to
b6a6c7f
Compare
| if all(not t._source_id for t in tasks): | ||
| return [r for r in results if not isinstance(r, (NoneTask, FailedTask))] |
There was a problem hiding this comment.
The pre-source sentinel filter must also exclude raw
None values; otherwise a fan-out stage that returns None in its output list will pass those values through to the add_stage_perf loop, causing an AttributeError at runtime.
| if all(not t._source_id for t in tasks): | |
| return [r for r in results if not isinstance(r, (NoneTask, FailedTask))] | |
| if all(not t._source_id for t in tasks): | |
| return [r for r in results if r is not None and not isinstance(r, (NoneTask, FailedTask))] |
| ResumabilityActor.options( # type: ignore[attr-defined] | ||
| name=ACTOR_NAME, | ||
| lifetime="detached", | ||
| get_if_exists=True, | ||
| _max_pending_calls=100, | ||
| ).remote(str(checkpoint_path)) |
There was a problem hiding this comment.
Fire-and-forget delta loss when
_max_pending_calls=100 is exceeded
_max_pending_calls=100 on the actor causes Ray to embed a RayActorError in the returned ObjectRef when the queue is full. Because _flush_deltas never calls ray.get() on that ref, the rejection is silently swallowed and the delta is permanently lost. For a pipeline processing more than 100 batches before the actor can drain its queue (trivially possible at high throughput), any source whose counter delta was lost will have _pending stuck above 0 and will never be written to LMDB. The PR description claims "_max_pending_calls provides backpressure", but the actual behavior is silent delta loss, not blocking — the two have opposite throughput implications. A limit of 100 is also very low relative to a typical pipeline's batch concurrency. Consider either raising the limit substantially, adding a ray.get() health check in the watchdog thread to surface errors, or verifying the actual Ray behavior for the fire-and-forget case.
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.
Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.
Changes:
- `Task` (nemo_curator/tasks/tasks.py):
- DROP `_uuid` field.
- ADD `_lineage_path` field (init=False, default="").
- ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
Always overwrites both `_lineage_path` and `task_id`. The
`child_segment` can be a positional index (int) for plain emissions
or a content-based string (e.g. from `get_deterministic_id()`) for
source-stage emissions where stability across input reordering
matters.
- ADD `get_deterministic_id() -> str | None` method (default None).
Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
- Override `get_deterministic_id()` to return
`get_deterministic_hash(sorted(self.data))` — stable across runs
even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
- Default implementation now calls `_set_lineage` on each emitted
child. Stages that override `process_batch` are responsible for
calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
- `stages/text/modules/add_id.py:71`
- `stages/deduplication/semantic/kmeans.py:235, 395`
- `stages/deduplication/fuzzy/buckets_to_edges.py:83`
- `stages/deduplication/fuzzy/connected_components.py:176`
- `stages/deduplication/fuzzy/minhash.py:309`
Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
`process_batch` (which exercises the new lineage assignment) and
assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
`_set_lineage` (always overwrites, filters empty parent paths,
accepts string segments), `get_deterministic_id` (default None,
FileGroupTask content-hash with stable sort), and the default
`process_batch` lineage assignment.
This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
75b9298 to
976f16d
Compare
Discussion (Design Doc)
#2034
Summary
Adds opt-in resumability to NeMo Curator pipelines. When the user passes
Pipeline.run(checkpoint_path="..."), completed source partitions are tracked in an LMDB file via a single Ray actor. On rerun against the same checkpoint, the source stage skips sources already marked complete and processes only the remaining work.Design discussion: the full design rationale, principles, and algorithm details will go into a GitHub Discussion (to be linked here once posted). The top-level invariants:
_max_pending_calls).BaseStageAdapter.process_batch+ one method inPipeline.run; per-backend executors (Xenna, RayData, RayActorPool) are not touched.checkpoint_path— the resumability hook is a no-op when no actor is registered.What's in this PR
Taskfield additions (nemo_curator/tasks/tasks.py)_lineage_path: index path through the pipeline DAG._deterministic_lineage_path_hash:sha256(lineage_path)[:32]— stable across reruns._source_id: explicitly stamped by the source-stage adapter and inherited downstream. Empty for pre-source tasks._set_lineage(parent_paths, child_index)method.NoneTask/FailedTasksentinels (nemo_curator/tasks/sentinels.py) —Tasksubclasses with amsgfield. Returned from user code inprocess_batchto mark specific slots as filtered or failed.ProcessingStageflags (nemo_curator/stages/base.py)is_source_stage: bool = False— user-overridable; defaults to the first stage at build time._is_terminal_stage: bool = False— same; defaults to the last stage.assign_root_lineageandassign_child_lineagehelpers for lineage propagation.ResumabilityActor(nemo_curator/utils/resumability_actor.py) — singleton Ray actor withapply_deltas,are_completed,errors,closemethods. Maintains in-memory_pending(per-source counter) and_applied(per-task-hash dedup + non-determinism detection). Writes to LMDB only when a source's counter hits zero.nemo_curator/utils/resumability_client.py) —_is_active,_flush_deltas,_skip_completed_sources.nemo_curator/backends/base.py) —_resumability_postprocessonBaseStageAdapterthat auto-wrapsNone→NoneTask, validates batch fan-out rules, computes counter deltas, and fires them fire-and-forget.Pipeline.runlifecycle (nemo_curator/pipeline/pipeline.py)checkpoint_pathargument.is_source_stage=Trueor_is_terminal_stage=True; otherwise defaults to first/last._run_with_resumability(...)method: spawns the actor, starts the watchdog thread, runs the pipeline, does a final error check, closes the actor.Test plan
Unit tests cover:
ResumabilityActor.apply_deltascounter math for all delta combinations.NonDeterministicTaskErrorfires on conflicting delta for the same task hash.NegativePendingCountErrorfires when a counter would go below zero.pending[sid]hits 0; LMDB state survives close + reopen.are_completedreturns parallel bool list consistent with_completedset.errors()returns a snapshot; mutation does not affect actor state.NoneTask/FailedTask.from_input constructs with inherited identity.Task._set_lineagesets lineage path and hash, idempotent on re-call, filters empty parent paths.assign_root_lineagesets lineage on user-provided tasks, skipsEmptyTask, does not set_source_id.assign_child_lineagepropagates lineage and inherits_source_idfrom parents._resumability_postprocessauto-wrapsNoneasNoneTask, no-ops for pre-source tasks (empty_source_id), raises on batched fan-out withlen(input)>1 and len(output)!=len(input)._source_id, filters already-completed sources, fires+1per surviving source.Pipeline.buildraises on multiple explicit source stages and on multiple explicit terminal stages; defaults to first/last when unmarked.Functional / end-to-end (SIGKILL-resume integration, autoscale mid-run) is noted as follow-up — see "Future work" below.
Future work (will be updated as this PR evolves)
_pendingand_appliedto LMDB so the actor can recover its state after restart._applieddict — drop entries for sources that moved to_completed(current implementation lets_appliedgrow unbounded for the duration of the run).get_deterministic_hashto take alengthparameter; use 16 hex chars (64 bits) for resumability source_ids to avoid birthday collisions at 100M+ scale.Pipeline.run(..., resumability_poll_interval=...).is_source_stagerequirement — currently defaults to the first stage; consider requiring an explicit marker to avoid surprising "filter-as-source" behavior in pipelines without a reader.FilePartitioningStage.sort_by_size/ sorted file paths).is_source_stage/_is_terminal_stagedefaults work correctly when the user's pipeline containsCompositeStageinstances that decompose into multiple primitive stages.Pipeline.describe()integration — include resumability config in the pipeline description output.🤖 Generated with Claude Code