added audio turns merging logic#2031
Conversation
Signed-off-by: Ssofja <sofiakostandian@gmail.com>
Greptile SummaryThis PR adds
Confidence Score: 3/5The core merging logic is largely sound but two correctness gaps in error-path handling mean consumers can see unexpected exceptions or receive a non-existent file path in the output manifest. The exception re-raise in _merge_conversation means process_batch throws on any filesystem error instead of returning the documented empty list, breaking any caller that catches nothing. Separately, seglst_filepath is always written into the result dict even when the seglst file was never created, unlike rttm_filepath and ctm_filepath which guard with .exists(). Both gaps affect the normal output contract of the stage. nemo_curator/stages/audio/merging/merge_conversation.py — specifically the exception handling in _merge_conversation and the seglst_filepath field construction in the output dict. Important Files Changed
Sequence DiagramsequenceDiagram
participant Caller
participant PB as process_batch
participant MC as _merge_conversation
participant MA as _merge_audio_files
participant CT as _compute_timeline
participant RTTM as _merge_rttm_files
participant CTM as _merge_ctm_files
participant SG as _generate_seglst
Caller->>PB: [AudioTask, ...]
PB->>PB: sort by turn_index
PB->>MC: (conversation_id, sorted_turns)
MC->>MA: (conv_dir, turns)
MA-->>MC: actual_overlaps[]
MC->>CT: (turns, actual_overlaps)
CT-->>MC: timeline[]
MC->>RTTM: (conv_dir, id, turns, timeline)
RTTM-->>MC: per_turn_merged_segments[]
MC->>CTM: (conv_dir, id, turns, timeline)
MC->>SG: (path, id, turns, segments, duration)
MC-->>PB: merged_entry dict
PB-->>Caller: [AudioTask(merged_entry)]
|
| except (OSError, RuntimeError): | ||
| logger.exception(f"Error merging conversation {conversation_id}") | ||
| raise |
There was a problem hiding this comment.
_merge_conversation re-raises, contradicting process_batch contract
_merge_conversation catches OSError/RuntimeError only to log and re-raise them. process_batch never wraps the call in a try/except, so any filesystem error (e.g., permission denied when creating conv_dir, or a RuntimeError from _merge_audio_files) propagates uncaught to the caller. The docstring for process_batch promises "or an empty list on failure", so callers relying on that contract will see an unexpected exception instead of [].
| if (conv_dir / "all.ctm").exists() | ||
| else "" | ||
| ), | ||
| "seglst_filepath": str(seglst_path), |
There was a problem hiding this comment.
seglst_filepath is always written into the result even when generation failed
rttm_filepath and ctm_filepath guard their values with .exists() and fall back to "", but seglst_filepath unconditionally stores str(seglst_path) regardless of whether the _generate_seglst call succeeded. If the seglst step raises (the exception is caught just above), downstream consumers get a path that doesn't exist on disk.
| "seglst_filepath": str(seglst_path), | |
| "seglst_filepath": str(seglst_path) if seglst_path.exists() else "", |
| if current_time_offset <= offset_before and local_offset > 0: | ||
| current_time_offset = offset_before + local_offset |
There was a problem hiding this comment.
Unreachable guard after
+= update
After current_time_offset += local_offset (line 340), current_time_offset equals offset_before + local_offset. The immediately following check current_time_offset <= offset_before is therefore equivalent to local_offset <= 0, which contradicts the and local_offset > 0 in the same condition. The branch body can never execute; the intended safety net has no effect.
| timeline: list[_TurnTimeline], | ||
| ) -> list[list[tuple[float, float]]]: | ||
| """Merge per-turn RTTMs into per-speaker and combined RTTM files. | ||
|
|
There was a problem hiding this comment.
Speaker names used verbatim as filenames without sanitization
spk values coming from AudioTask.data["speaker"] are written directly to disk as {spk}.wav, {spk}.rttm, and {spk}.ctm. A name containing /, \, or .. would create files in unexpected directories or silently overwrite unrelated files. Consider sanitizing speaker names (e.g., re.sub(r'[^\w\-]', '_', spk)) before using them as path components.
Description
Add
MergeConversationSDPStage-- a newProcessingStage[AudioTask, AudioTask]that merges per-turn TTS outputs into complete multi-speaker conversations using SDP-style silence-stripping and manifest overlaps.This stage consumes a batch of
AudioTaskobjects (one per conversation turn, all sharing the sameconversation_id) and produces a singleAudioTaskcontaining:Key features:
New files
nemo_curator/stages/audio/merging/__init__.pynemo_curator/stages/audio/merging/merge_conversation.pytests/stages/audio/merging/__init__.pytests/stages/audio/merging/test_merge_conversation.pyUsage
Checklist