Skip to content

Pipeline resumability via source-level counter checkpointing#2033

Open
abhinavg4 wants to merge 10 commits into
abhinavg/task-id-cleanupfrom
abhinavg/resumability
Open

Pipeline resumability via source-level counter checkpointing#2033
abhinavg4 wants to merge 10 commits into
abhinavg/task-id-cleanupfrom
abhinavg/resumability

Conversation

@abhinavg4
Copy link
Copy Markdown
Contributor

@abhinavg4 abhinavg4 commented May 27, 2026

Discussion (Design Doc)

#2034

Summary

Adds opt-in resumability to NeMo Curator pipelines. When the user passes Pipeline.run(checkpoint_path="..."), completed source partitions are tracked in an LMDB file via a single Ray actor. On rerun against the same checkpoint, the source stage skips sources already marked complete and processes only the remaining work.

Design discussion: the full design rationale, principles, and algorithm details will go into a GitHub Discussion (to be linked here once posted). The top-level invariants:

  • Workers never block on the resumability layer (fire-and-forget event flow; backpressure via Ray's _max_pending_calls).
  • One LMDB write per source, at end-of-life — counter math runs in memory on the actor; we touch the disk only when a source's pending counter reaches zero.
  • Single integration point in BaseStageAdapter.process_batch + one method in Pipeline.run; per-backend executors (Xenna, RayData, RayActorPool) are not touched.
  • No code change for existing pipelines that don't pass checkpoint_path — the resumability hook is a no-op when no actor is registered.

What's in this PR

  • Task field additions (nemo_curator/tasks/tasks.py)
    • _lineage_path: index path through the pipeline DAG.
    • _deterministic_lineage_path_hash: sha256(lineage_path)[:32] — stable across reruns.
    • _source_id: explicitly stamped by the source-stage adapter and inherited downstream. Empty for pre-source tasks.
    • _set_lineage(parent_paths, child_index) method.
  • NoneTask / FailedTask sentinels (nemo_curator/tasks/sentinels.py) — Task subclasses with a msg field. Returned from user code in process_batch to mark specific slots as filtered or failed.
  • ProcessingStage flags (nemo_curator/stages/base.py)
    • is_source_stage: bool = False — user-overridable; defaults to the first stage at build time.
    • _is_terminal_stage: bool = False — same; defaults to the last stage.
    • assign_root_lineage and assign_child_lineage helpers for lineage propagation.
  • ResumabilityActor (nemo_curator/utils/resumability_actor.py) — singleton Ray actor with apply_deltas, are_completed, errors, close methods. Maintains in-memory _pending (per-source counter) and _applied (per-task-hash dedup + non-determinism detection). Writes to LMDB only when a source's counter hits zero.
  • Client helpers (nemo_curator/utils/resumability_client.py) — _is_active, _flush_deltas, _skip_completed_sources.
  • Stage adapter hook (nemo_curator/backends/base.py) — _resumability_postprocess on BaseStageAdapter that auto-wraps NoneNoneTask, validates batch fan-out rules, computes counter deltas, and fires them fire-and-forget.
  • Pipeline.run lifecycle (nemo_curator/pipeline/pipeline.py)
    • New checkpoint_path argument.
    • Build-time validation: at most one stage may be explicitly marked is_source_stage=True or _is_terminal_stage=True; otherwise defaults to first/last.
    • _run_with_resumability(...) method: spawns the actor, starts the watchdog thread, runs the pipeline, does a final error check, closes the actor.

Test plan

Unit tests cover:

  • ResumabilityActor.apply_deltas counter math for all delta combinations.
  • NonDeterministicTaskError fires on conflicting delta for the same task hash.
  • NegativePendingCountError fires when a counter would go below zero.
  • Same delta re-fired for the same task is silently idempotent.
  • LMDB write happens exactly when pending[sid] hits 0; LMDB state survives close + reopen.
  • are_completed returns parallel bool list consistent with _completed set.
  • errors() returns a snapshot; mutation does not affect actor state.
  • NoneTask / FailedTask.from_input constructs with inherited identity.
  • Task._set_lineage sets lineage path and hash, idempotent on re-call, filters empty parent paths.
  • assign_root_lineage sets lineage on user-provided tasks, skips EmptyTask, does not set _source_id.
  • assign_child_lineage propagates lineage and inherits _source_id from parents.
  • _resumability_postprocess auto-wraps None as NoneTask, no-ops for pre-source tasks (empty _source_id), raises on batched fan-out with len(input)>1 and len(output)!=len(input).
  • Source-stage handling: stamps _source_id, filters already-completed sources, fires +1 per surviving source.
  • Pipeline.build raises on multiple explicit source stages and on multiple explicit terminal stages; defaults to first/last when unmarked.

Functional / end-to-end (SIGKILL-resume integration, autoscale mid-run) is noted as follow-up — see "Future work" below.

Future work (will be updated as this PR evolves)

  • Functional / SIGINT integration test — drive a real pipeline, kill mid-run, restart, verify only remaining sources are reprocessed.
  • Actor-death survival — periodically snapshot _pending and _applied to LMDB so the actor can recover its state after restart.
  • Bounded _applied dict — drop entries for sources that moved to _completed (current implementation lets _applied grow unbounded for the duration of the run).
  • 64-bit hashes — extend get_deterministic_hash to take a length parameter; use 16 hex chars (64 bits) for resumability source_ids to avoid birthday collisions at 100M+ scale.
  • Configurable watchdog poll interval — currently hard-coded at 60 seconds; expose as Pipeline.run(..., resumability_poll_interval=...).
  • Explicit is_source_stage requirement — currently defaults to the first stage; consider requiring an explicit marker to avoid surprising "filter-as-source" behavior in pipelines without a reader.
  • Reader ordering invariant — formalize the requirement that source stages emit sources in a deterministic order across runs (currently informally enforced by FilePartitioningStage.sort_by_size / sorted file paths).
  • Composite stages — verify is_source_stage / _is_terminal_stage defaults work correctly when the user's pipeline contains CompositeStage instances that decompose into multiple primitive stages.
  • Pipeline.describe() integration — include resumability config in the pipeline description output.

🤖 Generated with Claude Code

@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 27, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 27, 2026

Greptile Summary

Adds opt-in pipeline resumability via a singleton Ray actor (ResumabilityActor) that tracks per-source pending counters in an LMDB file. When Pipeline.run(checkpoint_path=...) is set, completed source partitions are persisted across runs and skipped on rerun; existing pipelines without checkpoint_path pay zero cost.

  • Core mechanism: source stage fires +1 per new source; each downstream stage fires 0 (pass-through), −1 (filter/sink), or +(n−1) (fan-out); when a counter reaches zero the source_id is written to LMDB and skipped on rerun.
  • Sentinel types: NoneTask (filter, decrements counter) and FailedTask (error, counter held so source reruns) are new public Task subclasses returned from process_batch.
  • Design trade-offs: fire-and-forget delta delivery, "never raises" actor, and detached actor lifetime are deliberate choices with recovery paths for most anomaly cases; several edge cases (stale actor reuse, actor-death recovery) are called out as future work.

Confidence Score: 3/5

Two defects in the changed code path need fixes before merging: a raw-None crash in the pre-source fan-out branch, and uncertainty about silent delta loss at high throughput when the actor queue is full.

The pre-source sentinel filter omits r is not None, so any pre-source fan-out stage returning bare None slots crashes the perf-stats loop immediately. The _max_pending_calls=100 + fire-and-forget combination could silently discard deltas if Ray rejects calls when the queue is full rather than blocking — at high batch concurrency this leaves sources permanently pending in the checkpoint, silently defeating the resumability guarantee. Both issues are on the new resumability code path. The rest of the implementation is well-structured and unit-tested.

nemo_curator/backends/base.py (pre-source None filter) and nemo_curator/pipeline/pipeline.py (_max_pending_calls + fire-and-forget interaction) are the two files that need attention before this is merged.

Important Files Changed

Filename Overview
nemo_curator/backends/base.py Adds _resumability_postprocess and _handle_source_stage to BaseStageAdapter. The pre-source fan-out path fails to exclude raw None values from its output, which crashes the add_stage_perf loop.
nemo_curator/utils/resumability_actor.py New singleton Ray actor; "never raises" design is a sound simplification. The newly_done mid-batch zero-crossing anomaly is low-risk with correct delta accounting but worth a guard for robustness.
nemo_curator/utils/resumability_client.py Module-level helpers for worker→actor communication. Each call independently invokes _actor() with a GCS round-trip; caching the handle would reduce per-batch overhead (flagged in previous review).
nemo_curator/pipeline/pipeline.py Adds checkpoint_path to Pipeline.run and the _run_with_resumability lifecycle. The _max_pending_calls=100 + fire-and-forget combination may silently lose deltas at high throughput.
nemo_curator/stages/base.py Adds assign_root_lineage and assign_child_lineage helpers; logic is clean and None entries are filtered correctly.
nemo_curator/tasks/tasks.py Adds _source_id field and _set_lineage / get_deterministic_id methods to Task. Implementation is straightforward and consistent with the design.
nemo_curator/tasks/sentinels.py New NoneTask and FailedTask sentinel classes; well-structured with from_input constructors that inherit task identity correctly.
pyproject.toml Adds lmdb>=1.4 dependency; appropriate minimum version for a stable API.

Sequence Diagram

sequenceDiagram
    participant PR as Pipeline.run(checkpoint_path)
    participant RA as ResumabilityActor (detached)
    participant LMDB as LMDB file
    participant SA as BaseStageAdapter (source stage)
    participant DS as BaseStageAdapter (downstream)
    participant SK as BaseStageAdapter (sink)

    PR->>RA: "spawn (get_if_exists=True, max_pending_calls=100)"
    PR->>SA: executor.execute(stages, initial_tasks)

    SA->>RA: are_completed([sid0, sid1, ...]) [ray.get blocking]
    RA-->>SA: [False, True, ...]
    SA->>SA: filter completed sources
    SA->>RA: apply_deltas([(task_id, sid, +1), ...]) [fire-and-forget]

    loop each non-sink stage batch
        DS->>RA: apply_deltas([(task_id, sid, 0 or -1), ...]) [fire-and-forget]
    end

    SK->>RA: apply_deltas([(task_id, sid, -1), ...]) [fire-and-forget]

    RA->>RA: _pending[sid] reaches 0
    RA->>LMDB: write sid as complete

    PR->>RA: "close() [ray.get, timeout=10s]"
    PR->>RA: ray.kill(actor)

    Note over PR,LMDB: On rerun: actor loads _completed from LMDB, are_completed() skips finished sources
Loading

Reviews (4): Last reviewed commit: "Adapt resumability layer to task_id coll..." | Re-trigger Greptile

Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment on lines +347 to +356
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined]
if final_errs and not collected_error:
raise final_errs[0]
except ray.exceptions.RayActorError: # type: ignore[attr-defined]
pass
except Exception as e: # noqa: BLE001
logger.warning(f"final resumability error check failed: {e}")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Final error check silently swallowed by except Exception — the raise final_errs[0] on line 352 lives inside the try block that owns the broad except Exception as e handler. When the actor records a NonDeterministicTaskError or NegativePendingCountError that only surfaces at teardown (e.g., from the last in-flight batch), the raise is immediately re-caught and demoted to a logger.warning(...). The pipeline then exits cleanly with return results even though data-integrity errors were detected.

Suggested change
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined]
if final_errs and not collected_error:
raise final_errs[0]
except ray.exceptions.RayActorError: # type: ignore[attr-defined]
pass
except Exception as e: # noqa: BLE001
logger.warning(f"final resumability error check failed: {e}")
_deferred_error: Exception | None = None
try:
ray.init(ignore_reinit_error=True)
actor_handle = ray.get_actor(ACTOR_NAME)
final_errs = ray.get(actor_handle.errors.remote(), timeout=30) # type: ignore[attr-defined]
if final_errs and not collected_error:
_deferred_error = final_errs[0]
except ray.exceptions.RayActorError: # type: ignore[attr-defined]
pass
except Exception as e: # noqa: BLE001
logger.warning(f"final resumability error check failed: {e}")
if _deferred_error is not None:
raise _deferred_error

Comment on lines +135 to +168
def _apply_impl(self, per_task: list[tuple[str, str, int]]) -> None:
newly_done: list[str] = []
for task_hash, sid, d in per_task:
existing = self._applied.get(task_hash)
if existing is not None:
if existing != d:
msg = (
f"Task {task_hash} produced delta={d} but previously "
f"produced delta={existing}. User code must be "
f"deterministic w.r.t. its input for resumability "
f"(e.g. consistent NoneTask vs real-output decisions)."
)
raise NonDeterministicTaskError(msg)
continue # idempotent re-fire
self._applied[task_hash] = d
if sid in self._completed:
continue
self._pending[sid] = self._pending.get(sid, 0) + d
if self._pending[sid] == 0:
newly_done.append(sid)
elif self._pending[sid] < 0:
msg = (
f"source {sid!r} pending count went to {self._pending[sid]}. "
f"This shouldn't happen with deterministic user code — likely "
f"a fan-out stage returned a different number of outputs "
f"between runs, or a NoneTask was issued more times than "
f"the source had tasks. Last delta applied: {d} for task "
f"{task_hash}."
)
raise NegativePendingCountError(msg)
if newly_done:
self._persist_completed(newly_done)
for sid in newly_done:
self._completed.add(sid)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Inconsistent _pending state after mid-batch error

_apply_impl accumulates newly_done entries while iterating the batch, then calls _persist_completed only if the loop finishes cleanly. If the loop raises NegativePendingCountError (e.g., two -1 deltas for the same source in one batch), any source that had already reached zero earlier in that same batch is never persisted and never added to _completed, but _pending[sid] for the failing source is left at a negative value. Subsequent apply_deltas calls on the same actor see neither the correct _completed entry nor a sane counter, which can lead to a false "source completed" if a later delta happens to bring the negative value back to zero.

Comment on lines +37 to +39
def _is_active() -> bool:
"""True if a resumability actor is registered in this Ray cluster."""
return _actor() is not None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 ray.get_actor() GCS round-trip on every processed batch

_is_active() delegates to _actor(), which calls ray.get_actor(ACTOR_NAME) unconditionally whenever Ray is initialized — even for pipelines that never set checkpoint_path. For an executor that runs hundreds of thousands of small batches this adds a GCS lookup per batch. Caching the handle (or the False sentinel) at module level after the first lookup would eliminate the per-batch overhead.

Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment on lines +300 to +305
actor = ResumabilityActor.options( # type: ignore[attr-defined]
name=ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
_max_pending_calls=100,
).remote(str(checkpoint_path))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 get_if_exists=True can silently reuse a stale detached actor

When get_if_exists=True, Ray returns the existing actor handle without calling the constructor if an actor named nemo_curator_resumability is already alive — for instance from a previous run that crashed before the cleanup block ran. That stale actor owns a different LMDB file (from its own checkpoint_path), so all new deltas accumulate in the wrong place and are_completed queries reflect the wrong state. The caller has no indication that the path argument was ignored. Consider checking whether the returned actor is pointing at the expected path (e.g., via a path() accessor), or kill any pre-existing actor and spawn fresh.

Copy link
Copy Markdown
Contributor Author

@abhinavg4 abhinavg4 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Improvements we should probably apply to v0.

Priority:

  1. Rename terminal stage to sink. Source and sink is better than source and terminal
  2. Here, instead of erroring out, can we just rewrite the new value? and removing the whole demon logic
  3. Unify task_id, _uuid and _deterministic_lineage_path_hash.
  4. support a dynamic dataset

Comment thread nemo_curator/tasks/tasks.py Outdated
# hashed into `_deterministic_lineage_path_hash`, the deterministic
# task id used for retry-deduplication on the resumability actor.
_lineage_path: str = field(init=False, default="")
_deterministic_lineage_path_hash: str = field(init=False, default="")
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this? We have task_id, _uuid and _deterministic_lineage_path_hash. They can be combined into a single variable. But before that we need to ensure the usage of task_id and uuid is not affected. Eg:- Writer stage might use task_id right now, we should change to this.

Comment thread nemo_curator/pipeline/pipeline.py Outdated
Comment thread nemo_curator/backends/base.py
Comment thread nemo_curator/utils/resumability_actor.py Outdated
abhinavg4 added a commit that referenced this pull request May 27, 2026
Two changes from review feedback on PR #2033:

1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source"
   and "sink" reads better than "source" and "terminal".

2. Resumability never fails the pipeline.
   - Drop `NonDeterministicTaskError`, `NegativePendingCountError`,
     `_errors`, `errors()`, `_record_error` on the actor.
   - `apply_deltas` is now never-raising: on a retry firing a different
     delta for the same task hash, rewrite the pending counter via
     `(-old + new)` rather than erroring. The newest observation wins.
   - On the two anomaly cases (different delta for a task whose source
     is already completed, OR a never-seen task for an already-completed
     source), log a loud warning AND remove the source from the
     completed set (in-memory + LMDB) so it's reprocessed cleanly on
     the next run. The warning points users at
     https://github.com/NVIDIA-NeMo/Curator to file an issue.
   - Drop the watchdog thread and `_thread.interrupt_main` in
     `Pipeline._run_with_resumability`. The method shrinks from ~80
     lines to ~25 — just spawn, run, close.

Tests updated to assert: rewrite-on-conflict math; un-complete-on-
anomaly with LMDB-survival; `apply_deltas` never raises under any
input.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Comment thread nemo_curator/backends/base.py
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.

Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.

Changes:

- `Task` (nemo_curator/tasks/tasks.py):
  - DROP `_uuid` field.
  - ADD `_lineage_path` field (init=False, default="").
  - ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
    Always overwrites both `_lineage_path` and `task_id`. The
    `child_segment` can be a positional index (int) for plain emissions
    or a content-based string (e.g. from `get_deterministic_id()`) for
    source-stage emissions where stability across input reordering
    matters.
  - ADD `get_deterministic_id() -> str | None` method (default None).
    Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
  - Override `get_deterministic_id()` to return
    `get_deterministic_hash(sorted(self.data))` — stable across runs
    even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
  - Default implementation now calls `_set_lineage` on each emitted
    child. Stages that override `process_batch` are responsible for
    calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
  - `stages/text/modules/add_id.py:71`
  - `stages/deduplication/semantic/kmeans.py:235, 395`
  - `stages/deduplication/fuzzy/buckets_to_edges.py:83`
  - `stages/deduplication/fuzzy/connected_components.py:176`
  - `stages/deduplication/fuzzy/minhash.py:309`

Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
  `process_batch` (which exercises the new lineage assignment) and
  assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
  `_set_lineage` (always overwrites, filters empty parent paths,
  accepts string segments), `get_deterministic_id` (default None,
  FileGroupTask content-hash with stable sort), and the default
  `process_batch` lineage assignment.

This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
abhinavg4 and others added 2 commits May 28, 2026 11:03
Makes PR-A self-contained: `get_deterministic_id()` is now actually
used. The default `process_batch` calls it for stages flagged
`is_source_stage=True` and falls back to the positional index otherwise.

- Add `is_source_stage: bool = False` and `is_sink_stage: bool = False`
  ClassVars on `ProcessingStage`.
- `Pipeline.build()` now auto-assigns the first stage as source and the
  last as sink if no stage is explicitly marked. Raises on multiple
  explicit marks. The sink flag itself has no behavior in this PR — it
  exists for the resumability layer in a follow-up.
- Default `process_batch` uses `child.get_deterministic_id() or i` as
  the lineage segment when the stage is a source. Content-based ids
  make `task_id` stable across input reordering for stages whose Task
  subclass implements `get_deterministic_id` (currently `FileGroupTask`).
- Mark `FilePartitioningStage` as `is_source_stage = True` so the
  default reader path gets content-based ids out of the box.
- Tests: `TestSourceStageSegment` (3 tests covering content-hash use,
  positional fallback, and non-source-stage passthrough);
  `TestSourceSinkRoleAssignment` in new `tests/pipelines/test_pipeline_roles.py`
  (defaults, explicit overrides, multi-mark validation).
- Also: move `StagePerfStats` import to `TYPE_CHECKING` block in
  tasks.py (ruff TC001).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
task_id is now framework-controlled — it is assigned exclusively by
`_set_lineage` at each stage boundary. Users no longer have any way
to set it.

- Make `Task.task_id` `init=False, default=""` (was a required `__init__` arg).
- Drop EmptyTask's `task_id="empty"` literal.
- Strip `task_id=...` kwargs from all 78 Task constructors across
  nemo_curator/stages/.
- Strip `task_id=...` from all test fixtures (107 files).
- Delete ~40 test assertions like `assert result.task_id == "test_task"`
  that exercised the old user-set-task_id-passes-through behavior.
- Drop now-dead helper params (`task_id` arg to `_concatenate`, conftest
  helpers, etc.) and dead loop counters that only existed to format task_ids.

Output filenames that previously derived from `task_id` (e.g. `f"{task_id}.parquet"`)
still work — they now use the lineage-derived sha256-32 hash, which is
deterministic across runs.
abhinavg4 added a commit that referenced this pull request May 28, 2026
Two changes from review feedback on PR #2033:

1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source"
   and "sink" reads better than "source" and "terminal".

2. Resumability never fails the pipeline.
   - Drop `NonDeterministicTaskError`, `NegativePendingCountError`,
     `_errors`, `errors()`, `_record_error` on the actor.
   - `apply_deltas` is now never-raising: on a retry firing a different
     delta for the same task hash, rewrite the pending counter via
     `(-old + new)` rather than erroring. The newest observation wins.
   - On the two anomaly cases (different delta for a task whose source
     is already completed, OR a never-seen task for an already-completed
     source), log a loud warning AND remove the source from the
     completed set (in-memory + LMDB) so it's reprocessed cleanly on
     the next run. The warning points users at
     https://github.com/NVIDIA-NeMo/Curator to file an issue.
   - Drop the watchdog thread and `_thread.interrupt_main` in
     `Pipeline._run_with_resumability`. The method shrinks from ~80
     lines to ~25 — just spawn, run, close.

Tests updated to assert: rewrite-on-conflict math; un-complete-on-
anomaly with LMDB-survival; `apply_deltas` never raises under any
input.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhinavg4 abhinavg4 force-pushed the abhinavg/resumability branch from 733fa37 to ec75bf2 Compare May 28, 2026 18:39
@abhinavg4 abhinavg4 changed the base branch from main to abhinavg/task-id-cleanup May 28, 2026 18:49

if checkpoint_path is None:
return executor.execute(self.stages, initial_tasks)
return self._run_with_resumability(executor, initial_tasks, checkpoint_path)
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like this, the reason this is being done is probably coz we call ray init inside the executor and not here, so we need to differentiate that. Not ideal for sure. Maybe we can make a function for _start_resumability_actor and call it inside each executor. Design choice.

abhinavg4 and others added 7 commits June 1, 2026 15:40
…ng source flag

- Rename `child_segment`/`segment` -> `current_task_id_suffix` (this task's
  own segment of the lineage path). Clearer per review.
- Move `get_deterministic_hash` to `nemo_curator/utils/hash_utils.py` so
  non-text modalities (FileGroupTask, dedup) don't pull in text deps. The
  text `writer/utils.py` re-exports it for backward compat.
- Drop `is_source_stage = True` from FilePartitioningStage; rely on
  Pipeline.build()'s "first stage is source" default, so users who put a
  different stage first (e.g. URL generation) get the right behavior.
- Fix dead-branch assertion in test_tasks.py (parent lineage is always ""
  -> filtered, so children are str(i), never "_i").
…t hash imports, test reorg

- task_id is now the readable underscore-joined lineage path itself (no
  sha256). Easier to debug; collapses the redundant `_lineage_path` field.
  `_set_lineage(parent_task_ids, current_task_id_suffix)`.
- Drop `from __future__ import annotations` from tasks.py; restore the
  runtime StagePerfStats import (project requires py>=3.11, so `str | int`
  is native).
- get_deterministic_hash lives only in nemo_curator/utils/hash_utils.py;
  all call sites import it directly (no writer/utils re-export). Fix the
  3 writer tests that mocked it via the writer_utils namespace.
- Test reorg per review:
  * get_deterministic_hash tests -> tests/utils/test_hash_utils.py
  * FileGroupTask id test -> tests/tasks/test_file_group_tasks.py
    (consolidated into one test_deterministic_ids)
  * role-assignment tests -> tests/pipelines/test_pipelines.py
    TestPipelineBuild (7 -> 3); delete test_pipeline_roles.py
  * add test_task_id_lineage to backends/test_integration.py
When `Pipeline.run(checkpoint_path="...")` is set, completed source
partitions are tracked across runs in an LMDB file and skipped on
rerun. The whole runtime fits in a small surface:

- One singleton Ray actor (`ResumabilityActor`) that owns the LMDB.
- One hook in `BaseStageAdapter.process_batch`.
- One method in `Pipeline.run` for the actor + watchdog lifecycle.
- No changes to per-backend executors (Xenna, RayData, RayActorPool).

Workers fire counter deltas fire-and-forget; backpressure is handled by
Ray's `_max_pending_calls`. The actor writes to LMDB only when a
source's pending counter reaches zero (one tiny write per source at
end-of-life, not per task or per batch).

`NoneTask` and `FailedTask` sentinels let user code in `process_batch`
mark per-slot outcomes. Returning `None` from `process()` is
auto-wrapped to `NoneTask` for backward compatibility with Curator's
existing convention.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Two changes from review feedback on PR #2033:

1. Rename `_is_terminal_stage` → `_is_sink_stage` throughout. "Source"
   and "sink" reads better than "source" and "terminal".

2. Resumability never fails the pipeline.
   - Drop `NonDeterministicTaskError`, `NegativePendingCountError`,
     `_errors`, `errors()`, `_record_error` on the actor.
   - `apply_deltas` is now never-raising: on a retry firing a different
     delta for the same task hash, rewrite the pending counter via
     `(-old + new)` rather than erroring. The newest observation wins.
   - On the two anomaly cases (different delta for a task whose source
     is already completed, OR a never-seen task for an already-completed
     source), log a loud warning AND remove the source from the
     completed set (in-memory + LMDB) so it's reprocessed cleanly on
     the next run. The warning points users at
     https://github.com/NVIDIA-NeMo/Curator to file an issue.
   - Drop the watchdog thread and `_thread.interrupt_main` in
     `Pipeline._run_with_resumability`. The method shrinks from ~80
     lines to ~25 — just spawn, run, close.

Tests updated to assert: rewrite-on-conflict math; un-complete-on-
anomaly with LMDB-survival; `apply_deltas` never raises under any
input.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash
→ task_id and dropped user-set task_id. Adjusts the resumability adapter,
client, and tests to the new field names + reformats per ruff/black.
Two correctness fixes flagged in review:

1. Source stage was using positional index `i` for both `_lineage_path`
   segment and `_source_id`, ignoring `Task.get_deterministic_id()`. This
   defeats the whole point of content-based ids: adding a file at the
   front of the input would shift every source_id and force a full
   reprocess. Now uses `get_deterministic_id() or i` to match what the
   default `process_batch` (in `stages/base.py`) already does.

2. Single-input fan-out (1→N) computed `delta = n - 1` from the raw
   result count, then filtered out `None` / `NoneTask` / `FailedTask`
   slots. Counter goes off by one per filtered sentinel — a 1→3 fan-out
   with one NoneTask would fire delta=+2 but only add 2 real children.
   Fixed by filtering first, then computing `n` from the real count.

Adds tests for both:
- `test_fanout_filters_sentinels_before_computing_delta`
- `test_fanout_all_sentinels_acts_like_filter`
- `test_source_stage_uses_content_hash_when_get_deterministic_id_returns_value`
- `test_source_stage_falls_back_to_index_when_no_content_id`
Rebased onto the updated PR-A where task_id IS the lineage path (no hash,
no separate _lineage_path) and is init=False.

- sentinels.py: NoneTask/FailedTask.from_input can't pass task_id= to the
  constructor anymore (init=False); copy task_id + _source_id after
  construction instead.
- tests: drop task_id= constructor args, replace _lineage_path assertions
  with task_id, and update the lineage tests to the always-overwrite
  semantics (no idempotency): an in-place returned task is re-lineaged
  with its own segment appended.
@abhinavg4 abhinavg4 force-pushed the abhinavg/resumability branch from 5af7e17 to b6a6c7f Compare June 1, 2026 22:59
Comment on lines +141 to +142
if all(not t._source_id for t in tasks):
return [r for r in results if not isinstance(r, (NoneTask, FailedTask))]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 The pre-source sentinel filter must also exclude raw None values; otherwise a fan-out stage that returns None in its output list will pass those values through to the add_stage_perf loop, causing an AttributeError at runtime.

Suggested change
if all(not t._source_id for t in tasks):
return [r for r in results if not isinstance(r, (NoneTask, FailedTask))]
if all(not t._source_id for t in tasks):
return [r for r in results if r is not None and not isinstance(r, (NoneTask, FailedTask))]

Comment on lines +286 to +291
ResumabilityActor.options( # type: ignore[attr-defined]
name=ACTOR_NAME,
lifetime="detached",
get_if_exists=True,
_max_pending_calls=100,
).remote(str(checkpoint_path))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Fire-and-forget delta loss when _max_pending_calls=100 is exceeded

_max_pending_calls=100 on the actor causes Ray to embed a RayActorError in the returned ObjectRef when the queue is full. Because _flush_deltas never calls ray.get() on that ref, the rejection is silently swallowed and the delta is permanently lost. For a pipeline processing more than 100 batches before the actor can drain its queue (trivially possible at high throughput), any source whose counter delta was lost will have _pending stuck above 0 and will never be written to LMDB. The PR description claims "_max_pending_calls provides backpressure", but the actual behavior is silent delta loss, not blocking — the two have opposite throughput implications. A limit of 100 is also very low relative to a typical pipeline's batch concurrency. Consider either raising the limit substantially, adding a ray.get() health check in the watchdog thread to surface errors, or verifying the actual Ray behavior for the fire-and-forget case.

abhinavg4 added a commit that referenced this pull request Jun 2, 2026
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.

Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.

Changes:

- `Task` (nemo_curator/tasks/tasks.py):
  - DROP `_uuid` field.
  - ADD `_lineage_path` field (init=False, default="").
  - ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
    Always overwrites both `_lineage_path` and `task_id`. The
    `child_segment` can be a positional index (int) for plain emissions
    or a content-based string (e.g. from `get_deterministic_id()`) for
    source-stage emissions where stability across input reordering
    matters.
  - ADD `get_deterministic_id() -> str | None` method (default None).
    Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
  - Override `get_deterministic_id()` to return
    `get_deterministic_hash(sorted(self.data))` — stable across runs
    even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
  - Default implementation now calls `_set_lineage` on each emitted
    child. Stages that override `process_batch` are responsible for
    calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
  - `stages/text/modules/add_id.py:71`
  - `stages/deduplication/semantic/kmeans.py:235, 395`
  - `stages/deduplication/fuzzy/buckets_to_edges.py:83`
  - `stages/deduplication/fuzzy/connected_components.py:176`
  - `stages/deduplication/fuzzy/minhash.py:309`

Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
  `process_batch` (which exercises the new lineage assignment) and
  assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
  `_set_lineage` (always overwrites, filters empty parent paths,
  accepts string segments), `get_deterministic_id` (default None,
  FileGroupTask content-hash with stable sort), and the default
  `process_batch` lineage assignment.

This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhinavg4 abhinavg4 force-pushed the abhinavg/task-id-cleanup branch from 75b9298 to 976f16d Compare June 2, 2026 16:45
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant