Skip to content

Unify task identity: collapse _uuid into deterministic task_id#2036

Open
abhinavg4 wants to merge 11 commits into
mainfrom
abhinavg/task-id-cleanup
Open

Unify task identity: collapse _uuid into deterministic task_id#2036
abhinavg4 wants to merge 11 commits into
mainfrom
abhinavg/task-id-cleanup

Conversation

@abhinavg4
Copy link
Copy Markdown
Contributor

Summary

Collapses the random per-task `_uuid` field into a deterministic `task_id` set by the framework. `task_id` is now derived from the task's lineage path through the pipeline DAG (a sha256-32 hash); two runs of the same pipeline on the same inputs produce byte-identical `task_id`s across all tasks.

This is a small, low-risk prerequisite for the resumability work in #2033, which will be rebased on top to use `task.task_id` as the per-task dedup key instead of a separate `_deterministic_lineage_path_hash` field.

What changes

  • `Task` class (`nemo_curator/tasks/tasks.py`)
    • DROP `_uuid` field.
    • ADD `_lineage_path` field (`init=False, default=""`).
    • ADD `_set_lineage(parent_lineage_paths, child_segment)` method. Always overwrites `_lineage_path` and `task_id` (no idempotency check). `child_segment` accepts either a positional `int` index or a content-based `str` (for source-stage emissions).
    • ADD `get_deterministic_id() -> str | None` method (default `None`). Subclasses with stable content override.
  • `FileGroupTask` overrides `get_deterministic_id()` to return `get_deterministic_hash(sorted(self.data))` — stable across runs even if files are added/removed between launches.
  • `ProcessingStage.process_batch` default impl now calls `_set_lineage` per emitted child. Stages that override `process_batch` are responsible for calling it themselves (docstring updated).
  • Migrate 6 `_uuid` use sites to `task.task_id` (all dedup output-filename generators + AddId).

Why no breaking change at the constructor

`Task.task_id` is still a regular `init=True` constructor argument. User code that passes `task_id="file_group_3"` continues to work — the value is just immediately overwritten by `_set_lineage` when the task flows through any stage. The framework retains the ability to enforce `init=False` strictly in a future cleanup PR, but doing it here would force migrating ~55 internal constructor calls and isn't needed for the resumability work.

Test plan

  • `tests/tasks/test_tasks.py`: updated fanout test to call `process_batch` (the path that exercises the new lineage assignment).
  • `tests/tasks/test_lineage.py` (NEW):
    • `_set_lineage` filters empty parent paths, always overwrites, accepts `str` and `int` segments.
    • `get_deterministic_id` returns `None` by default; `FileGroupTask` returns a stable hash regardless of input list order.
    • Default `process_batch` assigns unique lineage paths to fan-out outputs.
    • Same pipeline shape on same inputs produces byte-identical `task_id`s.

Future work

  • "Strict" mode: make `task_id` `init=False` so users can't pass it at all. Requires migrating ~55 internal `task_id=...` constructor calls — mechanical but big diff. Separate PR.
  • Resumability (Pipeline resumability via source-level counter checkpointing #2033) rebases on top of this and uses `task.task_id` as the per-task dedup key.

🤖 Generated with Claude Code

@abhinavg4 abhinavg4 requested review from a team, ayushdg and praateekmahajan as code owners May 27, 2026 23:54
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 27, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 27, 2026

Greptile Summary

This PR replaces the random per-task _uuid with a deterministic task_id derived from a sha256 hash of the task's lineage path through the pipeline DAG. It also introduces is_source_stage/is_sink_stage role flags on ProcessingStage and a role-assignment step in Pipeline.build().

  • Core lineage mechanism (tasks.py, base.py): _set_lineage builds a _lineage_path by joining parent paths and a child segment with \"_\", then hashes it into a 32-char hex task_id. The default process_batch calls this for each emitted child; stages that override process_batch must call it themselves.
  • Source stage identity (file_group.py, file_partitioning.py): FileGroupTask overrides get_deterministic_id() to return a content hash of its sorted file paths; FilePartitioningStage is marked is_source_stage = True; Pipeline._assign_source_sink_roles() defaults the first/last stage to source/sink if none is explicit.
  • Migration (add_id.py, deduplication stages): six _uuid references swapped to task_id; three deduplication process_batch overrides (ConnectedComponentsStage, KMeansReadFitWriteStage) do not yet call _set_lineage on their outputs (noted in previously posted comments).

Confidence Score: 4/5

Generally safe to merge; the core lineage hash is correct and the migration is mechanical. One structural defect in Pipeline.build() could produce silently wrong source/sink role assignments if stage objects are reused across builds.

The core lineage hash and migration are mechanically correct. _assign_source_sink_roles mutates stage instances in-place during build(), so a stage reused across two pipelines at different positions will have the wrong source/sink role in the second pipeline, silently producing incorrect lineage assignments.

nemo_curator/pipeline/pipeline.py — the in-place mutation in _assign_source_sink_roles

Important Files Changed

Filename Overview
nemo_curator/tasks/tasks.py Drops _uuid, adds deterministic _set_lineage / _lineage_path contract. Core logic is sound; separator ambiguity is a latent design hazard for future subclasses.
nemo_curator/stages/base.py Adds is_source_stage/is_sink_stage class attrs and calls _set_lineage in the default process_batch. Clean except that the class-level flags are instance-mutated by Pipeline.build(), causing role pollution across builds.
nemo_curator/pipeline/pipeline.py Adds _assign_source_sink_roles() called inside build(). Mutates stage instances in-place, which silently corrupts role assignments when stages are shared across pipeline builds.
nemo_curator/tasks/file_group.py Adds get_deterministic_id() returning a stable hash of sorted file paths. Clean implementation; correctly uses hex output (no underscores) that is safe with the _ separator.
nemo_curator/stages/deduplication/fuzzy/connected_components.py Migrates _uuid to task_id for output filename. Overrides process_batch without calling _set_lineage, leaving the returned task with empty _lineage_path (previously flagged).
nemo_curator/stages/deduplication/semantic/kmeans.py Migrates _uuid to task_id for filenames. Both _process_batch_single_pass and _predict_write_pass create output _EmptyTask objects without calling _set_lineage (previously flagged).
nemo_curator/stages/text/modules/add_id.py Straightforward swap of _uuid to task_id for document ID prefix. Safe because task_id is always set by _set_lineage before AddId processes a task in a pipeline.
tests/tasks/test_lineage.py New test file with comprehensive coverage of _set_lineage, get_deterministic_id, default process_batch, source stage segment selection, and cross-run determinism. Well-structured.
tests/pipelines/test_pipeline_roles.py New tests for _assign_source_sink_roles covering defaults, explicit overrides, and error cases. Does not cover the stage-reuse mutation scenario.
tests/tasks/test_tasks.py Updated fanout test to use process_batch path. The assertion on line 747 has a dead branch that can never be true (previously flagged).

Sequence Diagram

sequenceDiagram
    participant P as Pipeline.build()
    participant S as ProcessingStage (default)
    participant T as Task._set_lineage
    participant FG as FileGroupTask.get_deterministic_id

    P->>P: _assign_source_sink_roles()
    Note over P: mutates is_source_stage / is_sink_stage on stage instances

    Note over S: process_batch(tasks)
    S->>S: process(task) to children

    loop for each child
        alt "is_source_stage and child.get_deterministic_id() != None"
            S->>FG: get_deterministic_id()
            FG-->>S: content hash (12-char hex)
            S->>T: _set_lineage([parent._lineage_path], content_hash)
        else
            S->>T: _set_lineage([parent._lineage_path], positional_index)
        end
        T->>T: "_lineage_path = join(parent_paths + segment)"
        T->>T: "task_id = sha256(_lineage_path)[:32]"
    end

    S-->>P: children with deterministic task_ids
Loading

Reviews (2): Last reviewed commit: "Add is_source_stage / is_sink_stage flag..." | Re-trigger Greptile

Comment thread tests/tasks/test_tasks.py Outdated
assert len(set(task_ids)) == 3, f"Expected unique task_id per task, got {task_ids}"
# Each child has a non-empty lineage path with the parent's path as prefix.
for i, t in enumerate(output):
assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The first branch of this assertion (f"_{i}") can never be true. The parent task starts with _lineage_path = "", which is filtered out by _set_lineage's empty-string guard, so children always get _lineage_path = str(i) (e.g. "0", not "_0"). The dead branch obscures the actual invariant being tested.

Suggested change
assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path
assert t._lineage_path == str(i), t._lineage_path

abhinavg4 added a commit that referenced this pull request May 28, 2026
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash
→ task_id and dropped user-set task_id. Adjusts the resumability adapter,
client, and tests to the new field names + reformats per ruff/black.
Copy link
Copy Markdown
Contributor

@praateekmahajan praateekmahajan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM, left some feedback re tests and clarifying questions

original_file = segments_sorted[0].get("original_file", "unknown")

combined = self._concatenate(original_file, segments_sorted, task.task_id, task.dataset_name)
combined = self._concatenate(original_file, segments_sorted, task.dataset_name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mohammadaaftabv can you / someone from audio review if this is fine? We're removing task_id

def extract_and_write(self) -> list[FileGroupTask]:
self._check_actor_obj()
current_band_min, current_band_max = self._current_band_range
_current_band_min, _current_band_max = self._current_band_range
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +50 to +57
def get_deterministic_id(self) -> str:
"""Content-based id derived from the sorted file paths. Stable
across runs even if the source stage emits the file group at a
different position (e.g. because new files were added or removed
between runs)."""
from nemo_curator.stages.text.io.writer.utils import get_deterministic_hash

return get_deterministic_hash(sorted(self.data))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!
nit: move this get_deterministic_hash out to somewhere in utils outside text? That way the whole text dependencies need not be resolved for a non text modality

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I agree.

Comment thread nemo_curator/stages/file_partitioning.py Outdated
Comment thread tests/pipelines/test_pipeline_roles.py Outdated
Comment on lines +42 to +58
def test_defaults_first_stage_to_source(self) -> None:
s0 = _NoopStage(name="s0")
s1 = _NoopStage(name="s1")
s2 = _NoopStage(name="s2")
Pipeline(name="t", stages=[s0, s1, s2]).build()
assert s0.is_source_stage is True
assert s1.is_source_stage is False
assert s2.is_source_stage is False

def test_defaults_last_stage_to_sink(self) -> None:
s0 = _NoopStage(name="s0")
s1 = _NoopStage(name="s1")
s2 = _NoopStage(name="s2")
Pipeline(name="t", stages=[s0, s1, s2]).build()
assert s2.is_sink_stage is True
assert s0.is_sink_stage is False
assert s1.is_sink_stage is False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: AI overdoes on the unittests and writes too many lines.. can we possibly consolidate into a single test maybe within test_pipelines.py where we have class TestPipelineBuild and that has a test that says tests_default_first_source_last_sing_stage(self) which tests both these things?

That way any future changes to pipeline.build(..) also live there

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup probably we can do that. @oyilmaz-nvidia what do you think?

Comment thread nemo_curator/tasks/tasks.py Outdated
# limitations under the License.

import uuid
from __future__ import annotations
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was since we had int | str or something in the typedef. Probably AI dump to support older Python version too. Can we removed?

Comment thread nemo_curator/tasks/tasks.py Outdated
t = _new()
# Empty strings in parent paths are stripped (EmptyTask's default
# _lineage_path is "").
t._set_lineage(["", "5", ""], 3)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this happen?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot. First empty is due to an Empty Task, and the second cannot occur.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait until get_deteministic_hash gives "" as the output. I think unrelated, but this is a bug. We need to ensure that get_deteministic_hash cannot return an empty string. @oyilmaz-nvidia I might be wrong, but that sound correct to you/

assert t._lineage_path == "root_abc123"


class TestGetDeterministicId:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this test be inside tasks/test_file_group_tasks.py where we have a test for file group task where we do this... under TestFileGroupTask def test_deterministic_ids(..): in which we gtest few different asserts at once..

Comment thread tests/tasks/test_lineage.py Outdated
return None # always filter out


class TestDefaultProcessBatchAssignsLineage:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are good, but can we add a new test method here https://github.com/NVIDIA-NeMo/Curator/blob/main/tests/backends/test_integration.py#L53

Where we do test_lineage() / test_task_ids()

abhinavg4 added a commit that referenced this pull request Jun 1, 2026
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash
→ task_id and dropped user-set task_id. Adjusts the resumability adapter,
client, and tests to the new field names + reformats per ruff/black.
@abhinavg4 abhinavg4 force-pushed the abhinavg/task-id-cleanup branch from 8fd345a to ac66f5f Compare June 2, 2026 02:05
@abhinavg4
Copy link
Copy Markdown
Contributor Author

/ok to test 75b9298

@abhinavg4
Copy link
Copy Markdown
Contributor Author

/ok to test 976f16d

Copy link
Copy Markdown
Contributor

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to understand these changes. Mostly questions.

@@ -65,7 +117,15 @@ def validate(self) -> bool:

@dataclass
class _EmptyTask(Task[None]):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious: Why did we decide against adding a NoneTask ? I remember it being discussed at one point

source partition keeps a stable id across reorderings regardless of
whether the source is 1→N or N→N.
"""
out = [r for r in results if r is not None]
Copy link
Copy Markdown
Contributor

@VibhuJawa VibhuJawa Jun 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are decreasing/dropping N the length here, is that fine ? Why are we doing that here ? Does Xenna support that now ?


return results

def _post_process_task_ids(self, tasks: list[Task], results: list[Task | None]) -> list[Task]:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
def _post_process_task_ids(self, tasks: list[Task], results: list[Task | None]) -> list[Task]:
def _post_process_task_ids(self, input_tasks: list[Task], output_tasks: list[Task | None]) -> list[Task]:

Comment on lines +142 to +145
elif len(out) == len(tasks):
for parent, r in zip(tasks, out, strict=True):
suffix = (r.get_deterministic_id() or 0) if is_source else 0
r._set_lineage([parent.task_id], suffix)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question:
How is the positional 1:1 mapping guaranteed here?
With codex

I searched the current process_batch overrides and there are stages whose batch cardinality is not always 1:1, e.g. PreserveByValueStage.process_batch() returns only surviving tasks after filtering, while some dedup stages aggregate many inputs into one output.

Like is this possible ?

# Input batch:
tasks = [
    Task(task_id="0_A"),
    Task(task_id="0_B"),
]

# Stage behavior:
# - A fans out into two children
# - B is filtered out (because we seem to be dropping nones now)
results = [
    child_from_A_0,
    child_from_A_1,
]

# len(results) == len(tasks), so current code takes the 1:1 branch:
for parent, r in zip(tasks, results, strict=True):
    r._set_lineage([parent.task_id], 0)

# Assigned lineage:
child_from_A_0.task_id == "0_A_0"  # correct
child_from_A_1.task_id == "0_B_0"  # wrong: this child came from A, not B

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants