Unify task identity: collapse _uuid into deterministic task_id#2036
Unify task identity: collapse _uuid into deterministic task_id#2036abhinavg4 wants to merge 11 commits into
Conversation
User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.
Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.
Changes:
- `Task` (nemo_curator/tasks/tasks.py):
- DROP `_uuid` field.
- ADD `_lineage_path` field (init=False, default="").
- ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
Always overwrites both `_lineage_path` and `task_id`. The
`child_segment` can be a positional index (int) for plain emissions
or a content-based string (e.g. from `get_deterministic_id()`) for
source-stage emissions where stability across input reordering
matters.
- ADD `get_deterministic_id() -> str | None` method (default None).
Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
- Override `get_deterministic_id()` to return
`get_deterministic_hash(sorted(self.data))` — stable across runs
even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
- Default implementation now calls `_set_lineage` on each emitted
child. Stages that override `process_batch` are responsible for
calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
- `stages/text/modules/add_id.py:71`
- `stages/deduplication/semantic/kmeans.py:235, 395`
- `stages/deduplication/fuzzy/buckets_to_edges.py:83`
- `stages/deduplication/fuzzy/connected_components.py:176`
- `stages/deduplication/fuzzy/minhash.py:309`
Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
`process_batch` (which exercises the new lineage assignment) and
assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
`_set_lineage` (always overwrites, filters empty parent paths,
accepts string segments), `get_deterministic_id` (default None,
FileGroupTask content-hash with stable sort), and the default
`process_batch` lineage assignment.
This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Greptile SummaryThis PR replaces the random per-task
Confidence Score: 4/5Generally safe to merge; the core lineage hash is correct and the migration is mechanical. One structural defect in Pipeline.build() could produce silently wrong source/sink role assignments if stage objects are reused across builds. The core lineage hash and migration are mechanically correct. _assign_source_sink_roles mutates stage instances in-place during build(), so a stage reused across two pipelines at different positions will have the wrong source/sink role in the second pipeline, silently producing incorrect lineage assignments. nemo_curator/pipeline/pipeline.py — the in-place mutation in _assign_source_sink_roles Important Files Changed
Sequence DiagramsequenceDiagram
participant P as Pipeline.build()
participant S as ProcessingStage (default)
participant T as Task._set_lineage
participant FG as FileGroupTask.get_deterministic_id
P->>P: _assign_source_sink_roles()
Note over P: mutates is_source_stage / is_sink_stage on stage instances
Note over S: process_batch(tasks)
S->>S: process(task) to children
loop for each child
alt "is_source_stage and child.get_deterministic_id() != None"
S->>FG: get_deterministic_id()
FG-->>S: content hash (12-char hex)
S->>T: _set_lineage([parent._lineage_path], content_hash)
else
S->>T: _set_lineage([parent._lineage_path], positional_index)
end
T->>T: "_lineage_path = join(parent_paths + segment)"
T->>T: "task_id = sha256(_lineage_path)[:32]"
end
S-->>P: children with deterministic task_ids
Reviews (2): Last reviewed commit: "Add is_source_stage / is_sink_stage flag..." | Re-trigger Greptile |
| assert len(set(task_ids)) == 3, f"Expected unique task_id per task, got {task_ids}" | ||
| # Each child has a non-empty lineage path with the parent's path as prefix. | ||
| for i, t in enumerate(output): | ||
| assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path |
There was a problem hiding this comment.
The first branch of this assertion (
f"_{i}") can never be true. The parent task starts with _lineage_path = "", which is filtered out by _set_lineage's empty-string guard, so children always get _lineage_path = str(i) (e.g. "0", not "_0"). The dead branch obscures the actual invariant being tested.
| assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path | |
| assert t._lineage_path == str(i), t._lineage_path |
Makes PR-A self-contained: `get_deterministic_id()` is now actually used. The default `process_batch` calls it for stages flagged `is_source_stage=True` and falls back to the positional index otherwise. - Add `is_source_stage: bool = False` and `is_sink_stage: bool = False` ClassVars on `ProcessingStage`. - `Pipeline.build()` now auto-assigns the first stage as source and the last as sink if no stage is explicitly marked. Raises on multiple explicit marks. The sink flag itself has no behavior in this PR — it exists for the resumability layer in a follow-up. - Default `process_batch` uses `child.get_deterministic_id() or i` as the lineage segment when the stage is a source. Content-based ids make `task_id` stable across input reordering for stages whose Task subclass implements `get_deterministic_id` (currently `FileGroupTask`). - Mark `FilePartitioningStage` as `is_source_stage = True` so the default reader path gets content-based ids out of the box. - Tests: `TestSourceStageSegment` (3 tests covering content-hash use, positional fallback, and non-source-stage passthrough); `TestSourceSinkRoleAssignment` in new `tests/pipelines/test_pipeline_roles.py` (defaults, explicit overrides, multi-mark validation). - Also: move `StagePerfStats` import to `TYPE_CHECKING` block in tasks.py (ruff TC001). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
task_id is now framework-controlled — it is assigned exclusively by
`_set_lineage` at each stage boundary. Users no longer have any way
to set it.
- Make `Task.task_id` `init=False, default=""` (was a required `__init__` arg).
- Drop EmptyTask's `task_id="empty"` literal.
- Strip `task_id=...` kwargs from all 78 Task constructors across
nemo_curator/stages/.
- Strip `task_id=...` from all test fixtures (107 files).
- Delete ~40 test assertions like `assert result.task_id == "test_task"`
that exercised the old user-set-task_id-passes-through behavior.
- Drop now-dead helper params (`task_id` arg to `_concatenate`, conftest
helpers, etc.) and dead loop counters that only existed to format task_ids.
Output filenames that previously derived from `task_id` (e.g. `f"{task_id}.parquet"`)
still work — they now use the lineage-derived sha256-32 hash, which is
deterministic across runs.
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash → task_id and dropped user-set task_id. Adjusts the resumability adapter, client, and tests to the new field names + reformats per ruff/black.
praateekmahajan
left a comment
There was a problem hiding this comment.
Mostly LGTM, left some feedback re tests and clarifying questions
| original_file = segments_sorted[0].get("original_file", "unknown") | ||
|
|
||
| combined = self._concatenate(original_file, segments_sorted, task.task_id, task.dataset_name) | ||
| combined = self._concatenate(original_file, segments_sorted, task.dataset_name) |
There was a problem hiding this comment.
@mohammadaaftabv can you / someone from audio review if this is fine? We're removing task_id
| def extract_and_write(self) -> list[FileGroupTask]: | ||
| self._check_actor_obj() | ||
| current_band_min, current_band_max = self._current_band_range | ||
| _current_band_min, _current_band_max = self._current_band_range |
| def get_deterministic_id(self) -> str: | ||
| """Content-based id derived from the sorted file paths. Stable | ||
| across runs even if the source stage emits the file group at a | ||
| different position (e.g. because new files were added or removed | ||
| between runs).""" | ||
| from nemo_curator.stages.text.io.writer.utils import get_deterministic_hash | ||
|
|
||
| return get_deterministic_hash(sorted(self.data)) |
There was a problem hiding this comment.
Nice!
nit: move this get_deterministic_hash out to somewhere in utils outside text? That way the whole text dependencies need not be resolved for a non text modality
| def test_defaults_first_stage_to_source(self) -> None: | ||
| s0 = _NoopStage(name="s0") | ||
| s1 = _NoopStage(name="s1") | ||
| s2 = _NoopStage(name="s2") | ||
| Pipeline(name="t", stages=[s0, s1, s2]).build() | ||
| assert s0.is_source_stage is True | ||
| assert s1.is_source_stage is False | ||
| assert s2.is_source_stage is False | ||
|
|
||
| def test_defaults_last_stage_to_sink(self) -> None: | ||
| s0 = _NoopStage(name="s0") | ||
| s1 = _NoopStage(name="s1") | ||
| s2 = _NoopStage(name="s2") | ||
| Pipeline(name="t", stages=[s0, s1, s2]).build() | ||
| assert s2.is_sink_stage is True | ||
| assert s0.is_sink_stage is False | ||
| assert s1.is_sink_stage is False |
There was a problem hiding this comment.
nit: AI overdoes on the unittests and writes too many lines.. can we possibly consolidate into a single test maybe within test_pipelines.py where we have class TestPipelineBuild and that has a test that says tests_default_first_source_last_sing_stage(self) which tests both these things?
That way any future changes to pipeline.build(..) also live there
There was a problem hiding this comment.
Yup probably we can do that. @oyilmaz-nvidia what do you think?
| # limitations under the License. | ||
|
|
||
| import uuid | ||
| from __future__ import annotations |
There was a problem hiding this comment.
why do we need future?
There was a problem hiding this comment.
I think it was since we had int | str or something in the typedef. Probably AI dump to support older Python version too. Can we removed?
| t = _new() | ||
| # Empty strings in parent paths are stripped (EmptyTask's default | ||
| # _lineage_path is ""). | ||
| t._set_lineage(["", "5", ""], 3) |
There was a problem hiding this comment.
When can this happen?
There was a problem hiding this comment.
It cannot. First empty is due to an Empty Task, and the second cannot occur.
There was a problem hiding this comment.
Wait until get_deteministic_hash gives "" as the output. I think unrelated, but this is a bug. We need to ensure that get_deteministic_hash cannot return an empty string. @oyilmaz-nvidia I might be wrong, but that sound correct to you/
| assert t._lineage_path == "root_abc123" | ||
|
|
||
|
|
||
| class TestGetDeterministicId: |
There was a problem hiding this comment.
Can this test be inside tasks/test_file_group_tasks.py where we have a test for file group task where we do this... under TestFileGroupTask def test_deterministic_ids(..): in which we gtest few different asserts at once..
| return None # always filter out | ||
|
|
||
|
|
||
| class TestDefaultProcessBatchAssignsLineage: |
There was a problem hiding this comment.
These tests are good, but can we add a new test method here https://github.com/NVIDIA-NeMo/Curator/blob/main/tests/backends/test_integration.py#L53
Where we do test_lineage() / test_task_ids()
…ng source flag - Rename `child_segment`/`segment` -> `current_task_id_suffix` (this task's own segment of the lineage path). Clearer per review. - Move `get_deterministic_hash` to `nemo_curator/utils/hash_utils.py` so non-text modalities (FileGroupTask, dedup) don't pull in text deps. The text `writer/utils.py` re-exports it for backward compat. - Drop `is_source_stage = True` from FilePartitioningStage; rely on Pipeline.build()'s "first stage is source" default, so users who put a different stage first (e.g. URL generation) get the right behavior. - Fix dead-branch assertion in test_tasks.py (parent lineage is always "" -> filtered, so children are str(i), never "_i").
…t hash imports, test reorg
- task_id is now the readable underscore-joined lineage path itself (no
sha256). Easier to debug; collapses the redundant `_lineage_path` field.
`_set_lineage(parent_task_ids, current_task_id_suffix)`.
- Drop `from __future__ import annotations` from tasks.py; restore the
runtime StagePerfStats import (project requires py>=3.11, so `str | int`
is native).
- get_deterministic_hash lives only in nemo_curator/utils/hash_utils.py;
all call sites import it directly (no writer/utils re-export). Fix the
3 writer tests that mocked it via the writer_utils namespace.
- Test reorg per review:
* get_deterministic_hash tests -> tests/utils/test_hash_utils.py
* FileGroupTask id test -> tests/tasks/test_file_group_tasks.py
(consolidated into one test_deterministic_ids)
* role-assignment tests -> tests/pipelines/test_pipelines.py
TestPipelineBuild (7 -> 3); delete test_pipeline_roles.py
* add test_task_id_lineage to backends/test_integration.py
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash → task_id and dropped user-set task_id. Adjusts the resumability adapter, client, and tests to the new field names + reformats per ruff/black.
Patch the consuming module's name directly with a string target instead of importing the module as megatron_mod just to use patch.object.
Move all task_id/lineage assignment into a single place — BaseStageAdapter._post_process_task_ids — instead of the default ProcessingStage.process_batch. Every backend adapter subclasses BaseStageAdapter, so this runs for every stage on every backend and makes no difference whether a stage defines `process` or overrides `process_batch`. This closes the gap where overriding `process_batch` produced empty task_ids and restores the old "every task always has an id" guarantee. - stages/base.py: process_batch reverts to the plain per-task loop (no lineage). is_source/is_sink flags + get_deterministic_id stay (declarations). - backends/base.py: _post_process_task_ids assigns ids unconditionally — source content-id (rooted), 1:N, positional M:M, and a random uuid for ambiguous batch fan-out so task_id is never empty. None results dropped. - tasks.py: _EmptyTask.task_id defaults to "0", the implicit lineage root. - pipeline.py: assign_root_lineage (moved here) roots user initial_tasks at "0" → "0_0", "0_1", … ; run() calls it. - tests: process_batch no longer assigns ids (test_tasks asserts that); lineage behavior moved to tests/backends/test_task_id_postprocess.py; test_lineage keeps the _set_lineage/root/get_deterministic_id primitives.
8fd345a to
ac66f5f
Compare
…ness _post_process_task_ids conflated two independent things: the input→output mapping (which determines an output's parent) and whether the stage is a source (which determines the segment: content id vs index). The old `if is_source` branch hard-coded "all outputs are children of tasks[0]", which is only valid for 1→N — a source stage can also be N→N (each input → one partition), where each output must descend from its positional parent. Now the mapping shape (1→N vs positional M:M vs ambiguous) picks the parent and the content-id-vs-index choice is applied within each.
…id-move
- Ambiguous batch fan-out (lineage can't be derived) now assigns
"r"+uuid; documented in Task.task_id that an "r" prefix means
non-deterministic / lineage-not-tracked.
- assign_root_lineage: keep positional roots (NOT content-hash) to avoid
double content-hashing when a source stage is also present; commented.
- Fix tests that assumed the old per-stage task_id / _uuid behavior
(validated against a baseline run of main in the same container — these
were the only PR regressions; all other suite failures are pre-existing
env issues: image codecs, pdfium, video MOTION_VECTORS, ray version):
* test_merge_file_prefixes: rename over-stripped helper arg task_id->prefix
* writer jsonl/parquet/megatron: uuid call_count 2->1 (no more _uuid)
* add_id: assign distinct lineage ids to the two batches (adapter does this)
* drop task_id-suffix assertions in sortformer/dedup/aesthetic/nsfw/
convert/nemotron_cc/multimodal_reader (stages no longer set task_id)
…ntract
- test_kmeans output-filename test: stop asserting output_task.task_id ==
filename. KMeans is an aggregating stage so its terminal _EmptyTask output
ids are the non-deterministic "r<uuid>" fallback; the written FILES remain
deterministic ("0_<file_hash>_<subgroup>.parquet"). Assert that filename
pattern + centroid partitioning instead.
- process_batch docstring: state explicitly that task_id is framework-owned
and stages must not set it (no escape hatch).
|
/ok to test 75b9298 |
Summary
Collapses the random per-task `_uuid` field into a deterministic `task_id` set by the framework. `task_id` is now derived from the task's lineage path through the pipeline DAG (a sha256-32 hash); two runs of the same pipeline on the same inputs produce byte-identical `task_id`s across all tasks.
This is a small, low-risk prerequisite for the resumability work in #2033, which will be rebased on top to use `task.task_id` as the per-task dedup key instead of a separate `_deterministic_lineage_path_hash` field.
What changes
Why no breaking change at the constructor
`Task.task_id` is still a regular `init=True` constructor argument. User code that passes `task_id="file_group_3"` continues to work — the value is just immediately overwritten by `_set_lineage` when the task flows through any stage. The framework retains the ability to enforce `init=False` strictly in a future cleanup PR, but doing it here would force migrating ~55 internal constructor calls and isn't needed for the resumability work.
Test plan
Future work
🤖 Generated with Claude Code