Skip to content

added audio turns merging logic#2031

Open
Ssofja wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
Ssofja:merging_audios
Open

added audio turns merging logic#2031
Ssofja wants to merge 1 commit into
NVIDIA-NeMo:mainfrom
Ssofja:merging_audios

Conversation

@Ssofja
Copy link
Copy Markdown
Contributor

@Ssofja Ssofja commented May 26, 2026

Description

Add MergeConversationSDPStage -- a new ProcessingStage[AudioTask, AudioTask] that merges per-turn TTS outputs into complete multi-speaker conversations using SDP-style silence-stripping and manifest overlaps.

This stage consumes a batch of AudioTask objects (one per conversation turn, all sharing the same conversation_id) and produces a single AudioTask containing:

  • Per-speaker single-channel WAVs (full conversation duration)
  • Multi-channel WAV (channel N = speaker N)
  • Mixed mono WAV (sum of all channels, peak-normalized)
  • Per-speaker RTTM and combined RTTM
  • Per-speaker word-level CTM and combined CTM (when MFA alignment data is available)
  • Segment-level seglst JSON derived from RTTM timestamps

Key features:

  • Configurable inter-turn pause duration with optional randomization
  • Intra-turn silence stripping guided by RTTM segments
  • Support for positive overlaps (speakers talking simultaneously) and negative overlaps (pauses)
  • MFA fallback flag propagation for downstream quality assessment
  • Speaker reference metadata passthrough

New files

  • nemo_curator/stages/audio/merging/__init__.py
  • nemo_curator/stages/audio/merging/merge_conversation.py
  • tests/stages/audio/merging/__init__.py
  • tests/stages/audio/merging/test_merge_conversation.py

Usage

from nemo_curator.stages.audio.merging import MergeConversationSDPStage
from nemo_curator.tasks import AudioTask

# Create the merge stage
merge_stage = MergeConversationSDPStage(
    output_conversations_dir="/output/conversations",
    max_pause_duration=2.0,       # cap inter-turn pauses at 2s
    max_intra_turn_pause=1.0,     # preserve intra-turn pauses up to 1s
    randomize_pauses=False,       # set True to randomize pause lengths
    seglst_offset=0.1,            # padding around segment boundaries
)

# Lifecycle
merge_stage.setup()

# Each batch contains all turns of ONE conversation
turn_tasks = [
    AudioTask(
        data={
            "audio_filepath": "/path/to/turn_0.wav",
            "rttm_filepath": "/path/to/turn_0.rttm",
            "ctm_filepath": "/path/to/turn_0.ctm",      # optional, from MFA
            "speaker": "Alice",
            "conversation_id": "conv_001",
            "turn_index": 0,
            "overlap": 0,
            "text": "Hello, how are you?",
        },
        task_id="t0",
        dataset_name="my_dataset",
    ),
    AudioTask(
        data={
            "audio_filepath": "/path/to/turn_1.wav",
            "rttm_filepath": "/path/to/turn_1.rttm",
            "ctm_filepath": "/path/to/turn_1.ctm",
            "speaker": "Bob",
            "conversation_id": "conv_001",
            "turn_index": 1,
            "overlap": -0.5,  # negative = 0.5s pause after previous turn
            "text": "I'm doing well, thanks!",
        },
        task_id="t1",
        dataset_name="my_dataset",
    ),
]

# Merge all turns into a single conversation output
results = merge_stage.process_batch(turn_tasks)
# results[0].data contains:
#   "audio_filepath"      -> ".../conv_001/mixed.wav"
#   "rttm_filepath"       -> ".../conv_001/all.rttm"
#   "ctm_filepath"        -> ".../conv_001/all.ctm"
#   "seglst_filepath"     -> ".../conv_001/segments.seglst.json"
#   "duration"            -> 3.5  (merged duration in seconds)
#   "num_speakers"        -> 2
#   "speaker_references"  -> {"Alice": {...}, "Bob": {...}}

merge_stage.teardown()

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

Signed-off-by: Ssofja <sofiakostandian@gmail.com>
@Ssofja Ssofja requested a review from a team as a code owner May 26, 2026 18:51
@Ssofja Ssofja requested review from oyilmaz-nvidia and removed request for a team May 26, 2026 18:51
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 26, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 26, 2026

Greptile Summary

This PR adds MergeConversationSDPStage, a new ProcessingStage[AudioTask, AudioTask] that merges per-turn TTS outputs into complete multi-speaker conversations using SDP-style silence-stripping and manifest overlaps. It is a self-contained addition with no changes to existing files.

  • Produces per-speaker WAVs, a multi-channel WAV, a peak-normalized mixed mono WAV, per-speaker and combined RTTM/CTM files, and a seglst JSON from a batch of AudioTask turn objects sharing the same conversation_id.
  • Supports configurable inter-turn pause duration with optional randomization, intra-turn silence stripping guided by RTTM segments, and both positive (simultaneous speech) and negative (explicit pause) overlap values.

Confidence Score: 3/5

The core merging logic is largely sound but two correctness gaps in error-path handling mean consumers can see unexpected exceptions or receive a non-existent file path in the output manifest.

The exception re-raise in _merge_conversation means process_batch throws on any filesystem error instead of returning the documented empty list, breaking any caller that catches nothing. Separately, seglst_filepath is always written into the result dict even when the seglst file was never created, unlike rttm_filepath and ctm_filepath which guard with .exists(). Both gaps affect the normal output contract of the stage.

nemo_curator/stages/audio/merging/merge_conversation.py — specifically the exception handling in _merge_conversation and the seglst_filepath field construction in the output dict.

Important Files Changed

Filename Overview
nemo_curator/stages/audio/merging/merge_conversation.py Core merging stage: two correctness issues — exceptions re-raised from _merge_conversation bypass the documented empty-list fallback in process_batch, and seglst_filepath is always populated even when generation fails, unlike rttm/ctm. Also contains an unreachable guard in _compute_timeline and unsanitized speaker-name filenames.
nemo_curator/stages/audio/merging/init.py Simple re-export module; correctly exposes MergeConversationSDPStage.
tests/stages/audio/merging/test_merge_conversation.py Good coverage of parsing helpers, timeline computation, overlaps, MFA flag, and output files; but setup() is never called before process_batch, leaving the lifecycle contract untested.
tests/stages/audio/merging/init.py Empty package marker file; no issues.

Sequence Diagram

sequenceDiagram
    participant Caller
    participant PB as process_batch
    participant MC as _merge_conversation
    participant MA as _merge_audio_files
    participant CT as _compute_timeline
    participant RTTM as _merge_rttm_files
    participant CTM as _merge_ctm_files
    participant SG as _generate_seglst

    Caller->>PB: [AudioTask, ...]
    PB->>PB: sort by turn_index
    PB->>MC: (conversation_id, sorted_turns)
    MC->>MA: (conv_dir, turns)
    MA-->>MC: actual_overlaps[]
    MC->>CT: (turns, actual_overlaps)
    CT-->>MC: timeline[]
    MC->>RTTM: (conv_dir, id, turns, timeline)
    RTTM-->>MC: per_turn_merged_segments[]
    MC->>CTM: (conv_dir, id, turns, timeline)
    MC->>SG: (path, id, turns, segments, duration)
    MC-->>PB: merged_entry dict
    PB-->>Caller: [AudioTask(merged_entry)]
Loading

Comments Outside Diff (1)

  1. tests/stages/audio/merging/test_merge_conversation.py, line 1025 (link)

    P2 setup() never called before process_batch in any test

    All test classes call _build_stage(tmp_path) then immediately invoke stage.process_batch(...) without first calling stage.setup(). The tests pass incidentally because _merge_conversation creates conv_dir via conv_dir.mkdir(parents=True, exist_ok=True), which also creates output_conversations_dir. This means the lifecycle contract (setup → process_batch → teardown) is never exercised, and any future initialization logic added to setup() will be silently skipped in tests.

Reviews (1): Last reviewed commit: "added audio turns merging logic" | Re-trigger Greptile

Comment on lines +455 to +457
except (OSError, RuntimeError):
logger.exception(f"Error merging conversation {conversation_id}")
raise
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _merge_conversation re-raises, contradicting process_batch contract

_merge_conversation catches OSError/RuntimeError only to log and re-raise them. process_batch never wraps the call in a try/except, so any filesystem error (e.g., permission denied when creating conv_dir, or a RuntimeError from _merge_audio_files) propagates uncaught to the caller. The docstring for process_batch promises "or an empty list on failure", so callers relying on that contract will see an unexpected exception instead of [].

if (conv_dir / "all.ctm").exists()
else ""
),
"seglst_filepath": str(seglst_path),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 seglst_filepath is always written into the result even when generation failed

rttm_filepath and ctm_filepath guard their values with .exists() and fall back to "", but seglst_filepath unconditionally stores str(seglst_path) regardless of whether the _generate_seglst call succeeded. If the seglst step raises (the exception is caught just above), downstream consumers get a path that doesn't exist on disk.

Suggested change
"seglst_filepath": str(seglst_path),
"seglst_filepath": str(seglst_path) if seglst_path.exists() else "",

Comment on lines +341 to +342
if current_time_offset <= offset_before and local_offset > 0:
current_time_offset = offset_before + local_offset
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Unreachable guard after += update

After current_time_offset += local_offset (line 340), current_time_offset equals offset_before + local_offset. The immediately following check current_time_offset <= offset_before is therefore equivalent to local_offset <= 0, which contradicts the and local_offset > 0 in the same condition. The branch body can never execute; the intended safety net has no effect.

timeline: list[_TurnTimeline],
) -> list[list[tuple[float, float]]]:
"""Merge per-turn RTTMs into per-speaker and combined RTTM files.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Speaker names used verbatim as filenames without sanitization

spk values coming from AudioTask.data["speaker"] are written directly to disk as {spk}.wav, {spk}.rttm, and {spk}.ctm. A name containing /, \, or .. would create files in unexpected directories or silently overwrite unrelated files. Consider sanitizing speaker names (e.g., re.sub(r'[^\w\-]', '_', spk)) before using them as path components.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant