Unify task identity: collapse _uuid into deterministic task_id#2036
Unify task identity: collapse _uuid into deterministic task_id#2036abhinavg4 wants to merge 11 commits into
Conversation
Greptile SummaryThis PR replaces the random per-task
Confidence Score: 4/5Generally safe to merge; the core lineage hash is correct and the migration is mechanical. One structural defect in Pipeline.build() could produce silently wrong source/sink role assignments if stage objects are reused across builds. The core lineage hash and migration are mechanically correct. _assign_source_sink_roles mutates stage instances in-place during build(), so a stage reused across two pipelines at different positions will have the wrong source/sink role in the second pipeline, silently producing incorrect lineage assignments. nemo_curator/pipeline/pipeline.py — the in-place mutation in _assign_source_sink_roles Important Files Changed
Sequence DiagramsequenceDiagram
participant P as Pipeline.build()
participant S as ProcessingStage (default)
participant T as Task._set_lineage
participant FG as FileGroupTask.get_deterministic_id
P->>P: _assign_source_sink_roles()
Note over P: mutates is_source_stage / is_sink_stage on stage instances
Note over S: process_batch(tasks)
S->>S: process(task) to children
loop for each child
alt "is_source_stage and child.get_deterministic_id() != None"
S->>FG: get_deterministic_id()
FG-->>S: content hash (12-char hex)
S->>T: _set_lineage([parent._lineage_path], content_hash)
else
S->>T: _set_lineage([parent._lineage_path], positional_index)
end
T->>T: "_lineage_path = join(parent_paths + segment)"
T->>T: "task_id = sha256(_lineage_path)[:32]"
end
S-->>P: children with deterministic task_ids
Reviews (2): Last reviewed commit: "Add is_source_stage / is_sink_stage flag..." | Re-trigger Greptile |
| assert len(set(task_ids)) == 3, f"Expected unique task_id per task, got {task_ids}" | ||
| # Each child has a non-empty lineage path with the parent's path as prefix. | ||
| for i, t in enumerate(output): | ||
| assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path |
There was a problem hiding this comment.
The first branch of this assertion (
f"_{i}") can never be true. The parent task starts with _lineage_path = "", which is filtered out by _set_lineage's empty-string guard, so children always get _lineage_path = str(i) (e.g. "0", not "_0"). The dead branch obscures the actual invariant being tested.
| assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path | |
| assert t._lineage_path == str(i), t._lineage_path |
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash → task_id and dropped user-set task_id. Adjusts the resumability adapter, client, and tests to the new field names + reformats per ruff/black.
praateekmahajan
left a comment
There was a problem hiding this comment.
Mostly LGTM, left some feedback re tests and clarifying questions
| original_file = segments_sorted[0].get("original_file", "unknown") | ||
|
|
||
| combined = self._concatenate(original_file, segments_sorted, task.task_id, task.dataset_name) | ||
| combined = self._concatenate(original_file, segments_sorted, task.dataset_name) |
There was a problem hiding this comment.
@mohammadaaftabv can you / someone from audio review if this is fine? We're removing task_id
| def extract_and_write(self) -> list[FileGroupTask]: | ||
| self._check_actor_obj() | ||
| current_band_min, current_band_max = self._current_band_range | ||
| _current_band_min, _current_band_max = self._current_band_range |
| def get_deterministic_id(self) -> str: | ||
| """Content-based id derived from the sorted file paths. Stable | ||
| across runs even if the source stage emits the file group at a | ||
| different position (e.g. because new files were added or removed | ||
| between runs).""" | ||
| from nemo_curator.stages.text.io.writer.utils import get_deterministic_hash | ||
|
|
||
| return get_deterministic_hash(sorted(self.data)) |
There was a problem hiding this comment.
Nice!
nit: move this get_deterministic_hash out to somewhere in utils outside text? That way the whole text dependencies need not be resolved for a non text modality
| def test_defaults_first_stage_to_source(self) -> None: | ||
| s0 = _NoopStage(name="s0") | ||
| s1 = _NoopStage(name="s1") | ||
| s2 = _NoopStage(name="s2") | ||
| Pipeline(name="t", stages=[s0, s1, s2]).build() | ||
| assert s0.is_source_stage is True | ||
| assert s1.is_source_stage is False | ||
| assert s2.is_source_stage is False | ||
|
|
||
| def test_defaults_last_stage_to_sink(self) -> None: | ||
| s0 = _NoopStage(name="s0") | ||
| s1 = _NoopStage(name="s1") | ||
| s2 = _NoopStage(name="s2") | ||
| Pipeline(name="t", stages=[s0, s1, s2]).build() | ||
| assert s2.is_sink_stage is True | ||
| assert s0.is_sink_stage is False | ||
| assert s1.is_sink_stage is False |
There was a problem hiding this comment.
nit: AI overdoes on the unittests and writes too many lines.. can we possibly consolidate into a single test maybe within test_pipelines.py where we have class TestPipelineBuild and that has a test that says tests_default_first_source_last_sing_stage(self) which tests both these things?
That way any future changes to pipeline.build(..) also live there
There was a problem hiding this comment.
Yup probably we can do that. @oyilmaz-nvidia what do you think?
| # limitations under the License. | ||
|
|
||
| import uuid | ||
| from __future__ import annotations |
There was a problem hiding this comment.
why do we need future?
There was a problem hiding this comment.
I think it was since we had int | str or something in the typedef. Probably AI dump to support older Python version too. Can we removed?
| t = _new() | ||
| # Empty strings in parent paths are stripped (EmptyTask's default | ||
| # _lineage_path is ""). | ||
| t._set_lineage(["", "5", ""], 3) |
There was a problem hiding this comment.
When can this happen?
There was a problem hiding this comment.
It cannot. First empty is due to an Empty Task, and the second cannot occur.
There was a problem hiding this comment.
Wait until get_deteministic_hash gives "" as the output. I think unrelated, but this is a bug. We need to ensure that get_deteministic_hash cannot return an empty string. @oyilmaz-nvidia I might be wrong, but that sound correct to you/
| assert t._lineage_path == "root_abc123" | ||
|
|
||
|
|
||
| class TestGetDeterministicId: |
There was a problem hiding this comment.
Can this test be inside tasks/test_file_group_tasks.py where we have a test for file group task where we do this... under TestFileGroupTask def test_deterministic_ids(..): in which we gtest few different asserts at once..
| return None # always filter out | ||
|
|
||
|
|
||
| class TestDefaultProcessBatchAssignsLineage: |
There was a problem hiding this comment.
These tests are good, but can we add a new test method here https://github.com/NVIDIA-NeMo/Curator/blob/main/tests/backends/test_integration.py#L53
Where we do test_lineage() / test_task_ids()
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash → task_id and dropped user-set task_id. Adjusts the resumability adapter, client, and tests to the new field names + reformats per ruff/black.
8fd345a to
ac66f5f
Compare
|
/ok to test 75b9298 |
|
/ok to test 976f16d |
VibhuJawa
left a comment
There was a problem hiding this comment.
Trying to understand these changes. Mostly questions.
| @@ -65,7 +117,15 @@ def validate(self) -> bool: | |||
|
|
|||
| @dataclass | |||
| class _EmptyTask(Task[None]): | |||
There was a problem hiding this comment.
Curious: Why did we decide against adding a NoneTask ? I remember it being discussed at one point
| source partition keeps a stable id across reorderings regardless of | ||
| whether the source is 1→N or N→N. | ||
| """ | ||
| out = [r for r in results if r is not None] |
There was a problem hiding this comment.
We are decreasing/dropping N the length here, is that fine ? Why are we doing that here ? Does Xenna support that now ?
|
|
||
| return results | ||
|
|
||
| def _post_process_task_ids(self, tasks: list[Task], results: list[Task | None]) -> list[Task]: |
There was a problem hiding this comment.
Nit:
| def _post_process_task_ids(self, tasks: list[Task], results: list[Task | None]) -> list[Task]: | |
| def _post_process_task_ids(self, input_tasks: list[Task], output_tasks: list[Task | None]) -> list[Task]: |
| elif len(out) == len(tasks): | ||
| for parent, r in zip(tasks, out, strict=True): | ||
| suffix = (r.get_deterministic_id() or 0) if is_source else 0 | ||
| r._set_lineage([parent.task_id], suffix) |
There was a problem hiding this comment.
Question:
How is the positional 1:1 mapping guaranteed here?
With codex
I searched the current process_batch overrides and there are stages whose batch cardinality is not always 1:1, e.g. PreserveByValueStage.process_batch() returns only surviving tasks after filtering, while some dedup stages aggregate many inputs into one output.
Like is this possible ?
# Input batch:
tasks = [
Task(task_id="0_A"),
Task(task_id="0_B"),
]
# Stage behavior:
# - A fans out into two children
# - B is filtered out (because we seem to be dropping nones now)
results = [
child_from_A_0,
child_from_A_1,
]
# len(results) == len(tasks), so current code takes the 1:1 branch:
for parent, r in zip(tasks, results, strict=True):
r._set_lineage([parent.task_id], 0)
# Assigned lineage:
child_from_A_0.task_id == "0_A_0" # correct
child_from_A_1.task_id == "0_B_0" # wrong: this child came from A, not B
Summary
Collapses the random per-task `_uuid` field into a deterministic `task_id` set by the framework. `task_id` is now derived from the task's lineage path through the pipeline DAG (a sha256-32 hash); two runs of the same pipeline on the same inputs produce byte-identical `task_id`s across all tasks.
This is a small, low-risk prerequisite for the resumability work in #2033, which will be rebased on top to use `task.task_id` as the per-task dedup key instead of a separate `_deterministic_lineage_path_hash` field.
What changes
Why no breaking change at the constructor
`Task.task_id` is still a regular `init=True` constructor argument. User code that passes `task_id="file_group_3"` continues to work — the value is just immediately overwritten by `_set_lineage` when the task flows through any stage. The framework retains the ability to enforce `init=False` strictly in a future cleanup PR, but doing it here would force migrating ~55 internal constructor calls and isn't needed for the resumability work.
Test plan
Future work
🤖 Generated with Claude Code