Skip to content

Unify task identity: collapse _uuid into deterministic task_id#2036

Open
abhinavg4 wants to merge 11 commits into
mainfrom
abhinavg/task-id-cleanup
Open

Unify task identity: collapse _uuid into deterministic task_id#2036
abhinavg4 wants to merge 11 commits into
mainfrom
abhinavg/task-id-cleanup

Conversation

@abhinavg4
Copy link
Copy Markdown
Contributor

Summary

Collapses the random per-task `_uuid` field into a deterministic `task_id` set by the framework. `task_id` is now derived from the task's lineage path through the pipeline DAG (a sha256-32 hash); two runs of the same pipeline on the same inputs produce byte-identical `task_id`s across all tasks.

This is a small, low-risk prerequisite for the resumability work in #2033, which will be rebased on top to use `task.task_id` as the per-task dedup key instead of a separate `_deterministic_lineage_path_hash` field.

What changes

  • `Task` class (`nemo_curator/tasks/tasks.py`)
    • DROP `_uuid` field.
    • ADD `_lineage_path` field (`init=False, default=""`).
    • ADD `_set_lineage(parent_lineage_paths, child_segment)` method. Always overwrites `_lineage_path` and `task_id` (no idempotency check). `child_segment` accepts either a positional `int` index or a content-based `str` (for source-stage emissions).
    • ADD `get_deterministic_id() -> str | None` method (default `None`). Subclasses with stable content override.
  • `FileGroupTask` overrides `get_deterministic_id()` to return `get_deterministic_hash(sorted(self.data))` — stable across runs even if files are added/removed between launches.
  • `ProcessingStage.process_batch` default impl now calls `_set_lineage` per emitted child. Stages that override `process_batch` are responsible for calling it themselves (docstring updated).
  • Migrate 6 `_uuid` use sites to `task.task_id` (all dedup output-filename generators + AddId).

Why no breaking change at the constructor

`Task.task_id` is still a regular `init=True` constructor argument. User code that passes `task_id="file_group_3"` continues to work — the value is just immediately overwritten by `_set_lineage` when the task flows through any stage. The framework retains the ability to enforce `init=False` strictly in a future cleanup PR, but doing it here would force migrating ~55 internal constructor calls and isn't needed for the resumability work.

Test plan

  • `tests/tasks/test_tasks.py`: updated fanout test to call `process_batch` (the path that exercises the new lineage assignment).
  • `tests/tasks/test_lineage.py` (NEW):
    • `_set_lineage` filters empty parent paths, always overwrites, accepts `str` and `int` segments.
    • `get_deterministic_id` returns `None` by default; `FileGroupTask` returns a stable hash regardless of input list order.
    • Default `process_batch` assigns unique lineage paths to fan-out outputs.
    • Same pipeline shape on same inputs produces byte-identical `task_id`s.

Future work

  • "Strict" mode: make `task_id` `init=False` so users can't pass it at all. Requires migrating ~55 internal `task_id=...` constructor calls — mechanical but big diff. Separate PR.
  • Resumability (Pipeline resumability via source-level counter checkpointing #2033) rebases on top of this and uses `task.task_id` as the per-task dedup key.

🤖 Generated with Claude Code

User-visible: `Task.task_id` is now a deterministic 12-char hex hash
derived from the task's lineage path through the pipeline DAG, set by
the framework. User-provided `task_id=...` values continue to work (no
breaking API change) but are always overwritten by the framework's
`_set_lineage` once the task flows through any stage.

Two pipelines run on the same inputs produce byte-identical `task_id`s
across all tasks. This replaces the random `_uuid` field (which is now
dropped) for use cases like deterministic output filenames in dedup
stages, IDs added by `AddId`, etc.

Changes:

- `Task` (nemo_curator/tasks/tasks.py):
  - DROP `_uuid` field.
  - ADD `_lineage_path` field (init=False, default="").
  - ADD `_set_lineage(parent_lineage_paths, child_segment)` method.
    Always overwrites both `_lineage_path` and `task_id`. The
    `child_segment` can be a positional index (int) for plain emissions
    or a content-based string (e.g. from `get_deterministic_id()`) for
    source-stage emissions where stability across input reordering
    matters.
  - ADD `get_deterministic_id() -> str | None` method (default None).
    Subclasses with stable content override.
- `FileGroupTask` (nemo_curator/tasks/file_group.py):
  - Override `get_deterministic_id()` to return
    `get_deterministic_hash(sorted(self.data))` — stable across runs
    even if files are added/removed between launches.
- `ProcessingStage.process_batch` (nemo_curator/stages/base.py):
  - Default implementation now calls `_set_lineage` on each emitted
    child. Stages that override `process_batch` are responsible for
    calling `_set_lineage` themselves.
- Migrate 6 `_uuid` use sites to `task.task_id`:
  - `stages/text/modules/add_id.py:71`
  - `stages/deduplication/semantic/kmeans.py:235, 395`
  - `stages/deduplication/fuzzy/buckets_to_edges.py:83`
  - `stages/deduplication/fuzzy/connected_components.py:176`
  - `stages/deduplication/fuzzy/minhash.py:309`

Tests:
- `tests/tasks/test_tasks.py`: updated fanout test to use
  `process_batch` (which exercises the new lineage assignment) and
  assert on `task_id` instead of `_uuid`.
- `tests/tasks/test_lineage.py` (NEW): unit tests covering
  `_set_lineage` (always overwrites, filters empty parent paths,
  accepts string segments), `get_deterministic_id` (default None,
  FileGroupTask content-hash with stable sort), and the default
  `process_batch` lineage assignment.

This PR is a prerequisite for the resumability work in #2033, which
will be rebased on top to use `task.task_id` as the per-task dedup key
instead of a separate `_deterministic_lineage_path_hash` field.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@abhinavg4 abhinavg4 requested review from a team, ayushdg and praateekmahajan as code owners May 27, 2026 23:54
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 27, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 27, 2026

Greptile Summary

This PR replaces the random per-task _uuid with a deterministic task_id derived from a sha256 hash of the task's lineage path through the pipeline DAG. It also introduces is_source_stage/is_sink_stage role flags on ProcessingStage and a role-assignment step in Pipeline.build().

  • Core lineage mechanism (tasks.py, base.py): _set_lineage builds a _lineage_path by joining parent paths and a child segment with \"_\", then hashes it into a 32-char hex task_id. The default process_batch calls this for each emitted child; stages that override process_batch must call it themselves.
  • Source stage identity (file_group.py, file_partitioning.py): FileGroupTask overrides get_deterministic_id() to return a content hash of its sorted file paths; FilePartitioningStage is marked is_source_stage = True; Pipeline._assign_source_sink_roles() defaults the first/last stage to source/sink if none is explicit.
  • Migration (add_id.py, deduplication stages): six _uuid references swapped to task_id; three deduplication process_batch overrides (ConnectedComponentsStage, KMeansReadFitWriteStage) do not yet call _set_lineage on their outputs (noted in previously posted comments).

Confidence Score: 4/5

Generally safe to merge; the core lineage hash is correct and the migration is mechanical. One structural defect in Pipeline.build() could produce silently wrong source/sink role assignments if stage objects are reused across builds.

The core lineage hash and migration are mechanically correct. _assign_source_sink_roles mutates stage instances in-place during build(), so a stage reused across two pipelines at different positions will have the wrong source/sink role in the second pipeline, silently producing incorrect lineage assignments.

nemo_curator/pipeline/pipeline.py — the in-place mutation in _assign_source_sink_roles

Important Files Changed

Filename Overview
nemo_curator/tasks/tasks.py Drops _uuid, adds deterministic _set_lineage / _lineage_path contract. Core logic is sound; separator ambiguity is a latent design hazard for future subclasses.
nemo_curator/stages/base.py Adds is_source_stage/is_sink_stage class attrs and calls _set_lineage in the default process_batch. Clean except that the class-level flags are instance-mutated by Pipeline.build(), causing role pollution across builds.
nemo_curator/pipeline/pipeline.py Adds _assign_source_sink_roles() called inside build(). Mutates stage instances in-place, which silently corrupts role assignments when stages are shared across pipeline builds.
nemo_curator/tasks/file_group.py Adds get_deterministic_id() returning a stable hash of sorted file paths. Clean implementation; correctly uses hex output (no underscores) that is safe with the _ separator.
nemo_curator/stages/deduplication/fuzzy/connected_components.py Migrates _uuid to task_id for output filename. Overrides process_batch without calling _set_lineage, leaving the returned task with empty _lineage_path (previously flagged).
nemo_curator/stages/deduplication/semantic/kmeans.py Migrates _uuid to task_id for filenames. Both _process_batch_single_pass and _predict_write_pass create output _EmptyTask objects without calling _set_lineage (previously flagged).
nemo_curator/stages/text/modules/add_id.py Straightforward swap of _uuid to task_id for document ID prefix. Safe because task_id is always set by _set_lineage before AddId processes a task in a pipeline.
tests/tasks/test_lineage.py New test file with comprehensive coverage of _set_lineage, get_deterministic_id, default process_batch, source stage segment selection, and cross-run determinism. Well-structured.
tests/pipelines/test_pipeline_roles.py New tests for _assign_source_sink_roles covering defaults, explicit overrides, and error cases. Does not cover the stage-reuse mutation scenario.
tests/tasks/test_tasks.py Updated fanout test to use process_batch path. The assertion on line 747 has a dead branch that can never be true (previously flagged).

Sequence Diagram

sequenceDiagram
    participant P as Pipeline.build()
    participant S as ProcessingStage (default)
    participant T as Task._set_lineage
    participant FG as FileGroupTask.get_deterministic_id

    P->>P: _assign_source_sink_roles()
    Note over P: mutates is_source_stage / is_sink_stage on stage instances

    Note over S: process_batch(tasks)
    S->>S: process(task) to children

    loop for each child
        alt "is_source_stage and child.get_deterministic_id() != None"
            S->>FG: get_deterministic_id()
            FG-->>S: content hash (12-char hex)
            S->>T: _set_lineage([parent._lineage_path], content_hash)
        else
            S->>T: _set_lineage([parent._lineage_path], positional_index)
        end
        T->>T: "_lineage_path = join(parent_paths + segment)"
        T->>T: "task_id = sha256(_lineage_path)[:32]"
    end

    S-->>P: children with deterministic task_ids
Loading

Reviews (2): Last reviewed commit: "Add is_source_stage / is_sink_stage flag..." | Re-trigger Greptile

Comment thread tests/tasks/test_tasks.py Outdated
assert len(set(task_ids)) == 3, f"Expected unique task_id per task, got {task_ids}"
# Each child has a non-empty lineage path with the parent's path as prefix.
for i, t in enumerate(output):
assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 The first branch of this assertion (f"_{i}") can never be true. The parent task starts with _lineage_path = "", which is filtered out by _set_lineage's empty-string guard, so children always get _lineage_path = str(i) (e.g. "0", not "_0"). The dead branch obscures the actual invariant being tested.

Suggested change
assert t._lineage_path == f"_{i}" or t._lineage_path == str(i), t._lineage_path
assert t._lineage_path == str(i), t._lineage_path

abhinavg4 and others added 2 commits May 28, 2026 11:03
Makes PR-A self-contained: `get_deterministic_id()` is now actually
used. The default `process_batch` calls it for stages flagged
`is_source_stage=True` and falls back to the positional index otherwise.

- Add `is_source_stage: bool = False` and `is_sink_stage: bool = False`
  ClassVars on `ProcessingStage`.
- `Pipeline.build()` now auto-assigns the first stage as source and the
  last as sink if no stage is explicitly marked. Raises on multiple
  explicit marks. The sink flag itself has no behavior in this PR — it
  exists for the resumability layer in a follow-up.
- Default `process_batch` uses `child.get_deterministic_id() or i` as
  the lineage segment when the stage is a source. Content-based ids
  make `task_id` stable across input reordering for stages whose Task
  subclass implements `get_deterministic_id` (currently `FileGroupTask`).
- Mark `FilePartitioningStage` as `is_source_stage = True` so the
  default reader path gets content-based ids out of the box.
- Tests: `TestSourceStageSegment` (3 tests covering content-hash use,
  positional fallback, and non-source-stage passthrough);
  `TestSourceSinkRoleAssignment` in new `tests/pipelines/test_pipeline_roles.py`
  (defaults, explicit overrides, multi-mark validation).
- Also: move `StagePerfStats` import to `TYPE_CHECKING` block in
  tasks.py (ruff TC001).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
task_id is now framework-controlled — it is assigned exclusively by
`_set_lineage` at each stage boundary. Users no longer have any way
to set it.

- Make `Task.task_id` `init=False, default=""` (was a required `__init__` arg).
- Drop EmptyTask's `task_id="empty"` literal.
- Strip `task_id=...` kwargs from all 78 Task constructors across
  nemo_curator/stages/.
- Strip `task_id=...` from all test fixtures (107 files).
- Delete ~40 test assertions like `assert result.task_id == "test_task"`
  that exercised the old user-set-task_id-passes-through behavior.
- Drop now-dead helper params (`task_id` arg to `_concatenate`, conftest
  helpers, etc.) and dead loop counters that only existed to format task_ids.

Output filenames that previously derived from `task_id` (e.g. `f"{task_id}.parquet"`)
still work — they now use the lineage-derived sha256-32 hash, which is
deterministic across runs.
abhinavg4 added a commit that referenced this pull request May 28, 2026
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash
→ task_id and dropped user-set task_id. Adjusts the resumability adapter,
client, and tests to the new field names + reformats per ruff/black.
Copy link
Copy Markdown
Contributor

@praateekmahajan praateekmahajan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly LGTM, left some feedback re tests and clarifying questions

original_file = segments_sorted[0].get("original_file", "unknown")

combined = self._concatenate(original_file, segments_sorted, task.task_id, task.dataset_name)
combined = self._concatenate(original_file, segments_sorted, task.dataset_name)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mohammadaaftabv can you / someone from audio review if this is fine? We're removing task_id

def extract_and_write(self) -> list[FileGroupTask]:
self._check_actor_obj()
current_band_min, current_band_max = self._current_band_range
_current_band_min, _current_band_max = self._current_band_range
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +50 to +57
def get_deterministic_id(self) -> str:
"""Content-based id derived from the sorted file paths. Stable
across runs even if the source stage emits the file group at a
different position (e.g. because new files were added or removed
between runs)."""
from nemo_curator.stages.text.io.writer.utils import get_deterministic_hash

return get_deterministic_hash(sorted(self.data))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!
nit: move this get_deterministic_hash out to somewhere in utils outside text? That way the whole text dependencies need not be resolved for a non text modality

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I agree.

Comment thread nemo_curator/stages/file_partitioning.py Outdated
Comment thread tests/pipelines/test_pipeline_roles.py Outdated
Comment on lines +42 to +58
def test_defaults_first_stage_to_source(self) -> None:
s0 = _NoopStage(name="s0")
s1 = _NoopStage(name="s1")
s2 = _NoopStage(name="s2")
Pipeline(name="t", stages=[s0, s1, s2]).build()
assert s0.is_source_stage is True
assert s1.is_source_stage is False
assert s2.is_source_stage is False

def test_defaults_last_stage_to_sink(self) -> None:
s0 = _NoopStage(name="s0")
s1 = _NoopStage(name="s1")
s2 = _NoopStage(name="s2")
Pipeline(name="t", stages=[s0, s1, s2]).build()
assert s2.is_sink_stage is True
assert s0.is_sink_stage is False
assert s1.is_sink_stage is False
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: AI overdoes on the unittests and writes too many lines.. can we possibly consolidate into a single test maybe within test_pipelines.py where we have class TestPipelineBuild and that has a test that says tests_default_first_source_last_sing_stage(self) which tests both these things?

That way any future changes to pipeline.build(..) also live there

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup probably we can do that. @oyilmaz-nvidia what do you think?

Comment thread nemo_curator/tasks/tasks.py Outdated
# limitations under the License.

import uuid
from __future__ import annotations
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need future?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it was since we had int | str or something in the typedef. Probably AI dump to support older Python version too. Can we removed?

Comment thread nemo_curator/tasks/tasks.py Outdated
t = _new()
# Empty strings in parent paths are stripped (EmptyTask's default
# _lineage_path is "").
t._set_lineage(["", "5", ""], 3)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When can this happen?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It cannot. First empty is due to an Empty Task, and the second cannot occur.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait until get_deteministic_hash gives "" as the output. I think unrelated, but this is a bug. We need to ensure that get_deteministic_hash cannot return an empty string. @oyilmaz-nvidia I might be wrong, but that sound correct to you/

assert t._lineage_path == "root_abc123"


class TestGetDeterministicId:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this test be inside tasks/test_file_group_tasks.py where we have a test for file group task where we do this... under TestFileGroupTask def test_deterministic_ids(..): in which we gtest few different asserts at once..

Comment thread tests/tasks/test_lineage.py Outdated
return None # always filter out


class TestDefaultProcessBatchAssignsLineage:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests are good, but can we add a new test method here https://github.com/NVIDIA-NeMo/Curator/blob/main/tests/backends/test_integration.py#L53

Where we do test_lineage() / test_task_ids()

abhinavg4 added 2 commits June 1, 2026 15:40
…ng source flag

- Rename `child_segment`/`segment` -> `current_task_id_suffix` (this task's
  own segment of the lineage path). Clearer per review.
- Move `get_deterministic_hash` to `nemo_curator/utils/hash_utils.py` so
  non-text modalities (FileGroupTask, dedup) don't pull in text deps. The
  text `writer/utils.py` re-exports it for backward compat.
- Drop `is_source_stage = True` from FilePartitioningStage; rely on
  Pipeline.build()'s "first stage is source" default, so users who put a
  different stage first (e.g. URL generation) get the right behavior.
- Fix dead-branch assertion in test_tasks.py (parent lineage is always ""
  -> filtered, so children are str(i), never "_i").
…t hash imports, test reorg

- task_id is now the readable underscore-joined lineage path itself (no
  sha256). Easier to debug; collapses the redundant `_lineage_path` field.
  `_set_lineage(parent_task_ids, current_task_id_suffix)`.
- Drop `from __future__ import annotations` from tasks.py; restore the
  runtime StagePerfStats import (project requires py>=3.11, so `str | int`
  is native).
- get_deterministic_hash lives only in nemo_curator/utils/hash_utils.py;
  all call sites import it directly (no writer/utils re-export). Fix the
  3 writer tests that mocked it via the writer_utils namespace.
- Test reorg per review:
  * get_deterministic_hash tests -> tests/utils/test_hash_utils.py
  * FileGroupTask id test -> tests/tasks/test_file_group_tasks.py
    (consolidated into one test_deterministic_ids)
  * role-assignment tests -> tests/pipelines/test_pipelines.py
    TestPipelineBuild (7 -> 3); delete test_pipeline_roles.py
  * add test_task_id_lineage to backends/test_integration.py
abhinavg4 added a commit that referenced this pull request Jun 1, 2026
Rebased on top of PR-A (#2036) which renamed _deterministic_lineage_path_hash
→ task_id and dropped user-set task_id. Adjusts the resumability adapter,
client, and tests to the new field names + reformats per ruff/black.
abhinavg4 added 2 commits June 1, 2026 16:08
Patch the consuming module's name directly with a string target instead
of importing the module as megatron_mod just to use patch.object.
Move all task_id/lineage assignment into a single place —
BaseStageAdapter._post_process_task_ids — instead of the default
ProcessingStage.process_batch. Every backend adapter subclasses
BaseStageAdapter, so this runs for every stage on every backend and makes
no difference whether a stage defines `process` or overrides
`process_batch`. This closes the gap where overriding `process_batch`
produced empty task_ids and restores the old "every task always has an id"
guarantee.

- stages/base.py: process_batch reverts to the plain per-task loop (no
  lineage). is_source/is_sink flags + get_deterministic_id stay (declarations).
- backends/base.py: _post_process_task_ids assigns ids unconditionally —
  source content-id (rooted), 1:N, positional M:M, and a random uuid for
  ambiguous batch fan-out so task_id is never empty. None results dropped.
- tasks.py: _EmptyTask.task_id defaults to "0", the implicit lineage root.
- pipeline.py: assign_root_lineage (moved here) roots user initial_tasks at
  "0" → "0_0", "0_1", … ; run() calls it.
- tests: process_batch no longer assigns ids (test_tasks asserts that);
  lineage behavior moved to tests/backends/test_task_id_postprocess.py;
  test_lineage keeps the _set_lineage/root/get_deterministic_id primitives.
@abhinavg4 abhinavg4 force-pushed the abhinavg/task-id-cleanup branch from 8fd345a to ac66f5f Compare June 2, 2026 02:05
abhinavg4 and others added 4 commits June 1, 2026 19:18
…ness

_post_process_task_ids conflated two independent things: the input→output
mapping (which determines an output's parent) and whether the stage is a
source (which determines the segment: content id vs index). The old
`if is_source` branch hard-coded "all outputs are children of tasks[0]",
which is only valid for 1→N — a source stage can also be N→N (each input
→ one partition), where each output must descend from its positional
parent. Now the mapping shape (1→N vs positional M:M vs ambiguous) picks
the parent and the content-id-vs-index choice is applied within each.
…id-move

- Ambiguous batch fan-out (lineage can't be derived) now assigns
  "r"+uuid; documented in Task.task_id that an "r" prefix means
  non-deterministic / lineage-not-tracked.
- assign_root_lineage: keep positional roots (NOT content-hash) to avoid
  double content-hashing when a source stage is also present; commented.
- Fix tests that assumed the old per-stage task_id / _uuid behavior
  (validated against a baseline run of main in the same container — these
  were the only PR regressions; all other suite failures are pre-existing
  env issues: image codecs, pdfium, video MOTION_VECTORS, ray version):
  * test_merge_file_prefixes: rename over-stripped helper arg task_id->prefix
  * writer jsonl/parquet/megatron: uuid call_count 2->1 (no more _uuid)
  * add_id: assign distinct lineage ids to the two batches (adapter does this)
  * drop task_id-suffix assertions in sortformer/dedup/aesthetic/nsfw/
    convert/nemotron_cc/multimodal_reader (stages no longer set task_id)
…ntract

- test_kmeans output-filename test: stop asserting output_task.task_id ==
  filename. KMeans is an aggregating stage so its terminal _EmptyTask output
  ids are the non-deterministic "r<uuid>" fallback; the written FILES remain
  deterministic ("0_<file_hash>_<subgroup>.parquet"). Assert that filename
  pattern + centroid partitioning instead.
- process_batch docstring: state explicitly that task_id is framework-owned
  and stages must not set it (no escape hatch).
@abhinavg4
Copy link
Copy Markdown
Contributor Author

/ok to test 75b9298

@copy-pr-bot copy-pr-bot Bot deployed to nemo-ci June 2, 2026 03:40 Active
@copy-pr-bot copy-pr-bot Bot deployed to public June 2, 2026 03:47 Active
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants