Montreal forced aligner#1977
Conversation
Signed-off-by: Ssofja <sofiakostandian@gmail.com>
Greptile SummaryThis PR adds
Confidence Score: 5/5The new stage is self-contained and well-tested; the core data path (batch alignment, TextGrid parsing, RTTM/CTM conversion) behaves correctly for the standard case. All new code paths are covered by unit tests that mock the MFA subprocess. Logic for tier selection, silence filtering, interval merging, and fallback output is straightforward and verified. The change is additive with no modifications to existing stages. nemo_curator/stages/audio/alignment/mfa_alignment.py — the distributed-settings behaviour (model path resolution and history-file lifecycle in Important Files Changed
Sequence DiagramsequenceDiagram
participant F as Framework
participant S as MFAAlignmentStage
participant T as TempDir (corpus)
participant MFA as mfa align (subprocess)
participant FS as Output Filesystem
F->>S: setup_on_node() [optional, Xenna only]
S->>FS: Copy models to node-local storage
F->>S: setup()
S->>FS: mkdir textgrids/, rttms/, ctms/
F->>S: process_batch(tasks)
loop for each task
S->>S: validate_input, dedup stems
S->>S: _get_audio_duration (if missing)
end
S->>T: Create symlinks (.wav) + .txt transcript files
S->>MFA: mfa align corpus_dir dict model tg_out_dir
MFA-->>T: "Write *.TextGrid files"
S->>S: "rglob(*.TextGrid) find produced files"
alt TextGrid produced
S->>FS: Write .rttm (merged speech intervals)
S->>FS: Write .ctm (word timings)
S->>S: "task.data[textgrid/rttm/ctm_filepath] = paths"
else MFA silently dropped file
S->>FS: Write duration-based fallback .rttm
S->>FS: Write uniform-split fallback .ctm
S->>S: "task.data[mfa_skipped] = True"
end
S-->>F: return list[AudioTask] with enriched data
Reviews (3): Last reviewed commit: "add fixes based on PR comments" | Re-trigger Greptile |
| if not self.output_dir: | ||
| msg = "output_dir is required for MFAAlignmentStage" | ||
| raise ValueError(msg) | ||
| self._effective_num_jobs = self.num_jobs or os.cpu_count() |
There was a problem hiding this comment.
os.cpu_count() can return None, producing -j None
os.cpu_count() returns None in some containerized or virtualized environments. When num_jobs=0 (the default) and this happens, self._effective_num_jobs becomes None, which is then stringified to "None" and passed to MFA as -j None. MFA will reject that value and fail with a confusing error unrelated to alignment. A safe default of 1 ensures MFA is always given a valid integer.
| self._effective_num_jobs = self.num_jobs or os.cpu_count() | |
| self._effective_num_jobs = self.num_jobs or os.cpu_count() or 1 |
| if file_stem in stem_to_task: | ||
| original_stem = file_stem | ||
| file_stem = f"{file_stem}_{uuid.uuid4().hex[:8]}" | ||
| logger.warning( | ||
| f"Duplicate stem '{original_stem}' — renamed to " | ||
| f"'{file_stem}' to avoid silent data loss" | ||
| ) | ||
| task.data["_mfa_stem"] = file_stem |
There was a problem hiding this comment.
_mfa_stem internal key leaks into output manifest
task.data["_mfa_stem"] is set on duplicate-renamed tasks but never consumed by any subsequent processing. It will appear as a surprise key in the output JSONL manifest for some (but not all) tasks, breaking any downstream schema validation that expects a fixed set of keys.
| if file_stem in stem_to_task: | |
| original_stem = file_stem | |
| file_stem = f"{file_stem}_{uuid.uuid4().hex[:8]}" | |
| logger.warning( | |
| f"Duplicate stem '{original_stem}' — renamed to " | |
| f"'{file_stem}' to avoid silent data loss" | |
| ) | |
| task.data["_mfa_stem"] = file_stem | |
| if file_stem in stem_to_task: | |
| original_stem = file_stem | |
| file_stem = f"{file_stem}_{uuid.uuid4().hex[:8]}" | |
| logger.warning( | |
| f"Duplicate stem '{original_stem}' — renamed to " | |
| f"'{file_stem}' to avoid silent data loss" | |
| ) |
| tier_name = "words" | ||
| tier = ( | ||
| tg.getTier(tier_name) | ||
| if tier_name in tg.tierNames | ||
| else tg.getTier(tg.tierNames[0]) | ||
| ) |
There was a problem hiding this comment.
Fallback tier selection can silently pick a phone-level tier instead of words
When the TextGrid has no tier named "words" the code falls back to tg.tierNames[0]. MFA TextGrids typically have both a "words" tier and a "phones" tier; if ordering is not guaranteed, the fallback may parse phone-level intervals as word intervals, producing nonsense RTTM/CTM output with no error or warning.
| tier_name = "words" | |
| tier = ( | |
| tg.getTier(tier_name) | |
| if tier_name in tg.tierNames | |
| else tg.getTier(tg.tierNames[0]) | |
| ) | |
| tier_name = "words" | |
| if tier_name not in tg.tierNames: | |
| logger.warning( | |
| f"No 'words' tier in {textgrid_path}; " | |
| f"available tiers: {tg.tierNames}. Falling back to first tier." | |
| ) | |
| tier = ( | |
| tg.getTier(tier_name) | |
| if tier_name in tg.tierNames | |
| else tg.getTier(tg.tierNames[0]) | |
| ) |
| history_file = Path(self._mfa_root) / "command_history.yaml" | ||
| if history_file.exists(): | ||
| try: | ||
| history_file.unlink() | ||
| except Exception: # noqa: BLE001 | ||
| pass |
There was a problem hiding this comment.
command_history.yaml deleted from shared NFS storage when copy_models_to_local=False
When local model copying is disabled, self._mfa_root points at the shared NFS/Lustre directory. Every batch invocation tries to unlink() the shared history file. In a distributed run this creates concurrent deletion attempts against the shared filesystem, and MFA itself may be writing the file at the same time — exactly the race condition copy_models_to_local is meant to prevent.
| history_file = Path(self._mfa_root) / "command_history.yaml" | |
| if history_file.exists(): | |
| try: | |
| history_file.unlink() | |
| except Exception: # noqa: BLE001 | |
| pass | |
| if self.copy_models_to_local: | |
| history_file = Path(self._mfa_root) / "command_history.yaml" | |
| if history_file.exists(): | |
| try: | |
| history_file.unlink() | |
| except Exception: # noqa: BLE001 | |
| pass |
| acoustic_model: str = "english_us_arpa" | ||
| dictionary: str = "english_us_arpa" | ||
| g2p_model: str = "english_us_arpa" | ||
| output_dir: str = "" |
There was a problem hiding this comment.
| output_dir: str = "" | |
| output_dir: str |
instead of checking it in the post int.
| mfa_root_dir: str = "" | ||
| local_mfa_base_dir: str = "" |
There was a problem hiding this comment.
| mfa_root_dir: str = "" | |
| local_mfa_base_dir: str = "" | |
| mfa_root_dir: str | None = None | |
| local_mfa_base_dir: str | None = None |
| if not self.output_dir: | ||
| msg = "output_dir is required for MFAAlignmentStage" | ||
| raise ValueError(msg) | ||
| self._effective_num_jobs = self.num_jobs or os.cpu_count() |
There was a problem hiding this comment.
I haven't read through the whole PR yet but we probably wouldn't want this? Let's let the pipeline and executor work out this type of logic instead.
| # See the License for the specific language governing permissions and | ||
| # limitations under the License. | ||
|
|
||
| # modality: audio |
There was a problem hiding this comment.
Not needed. (Also will be ignored since it is not at the top of the file.)
| # Using uv (recommended) | ||
| uv sync --extra audio_alignment | ||
|
|
||
| # Using pip | ||
| pip install "nemo-curator[audio_alignment]" |
There was a problem hiding this comment.
| # Using uv (recommended) | |
| uv sync --extra audio_alignment | |
| # Using pip | |
| pip install "nemo-curator[audio_alignment]" | |
| # Using uv (recommended) | |
| uv sync --extra audio_alignment |
let's not include pip.
|
|
||
|
|
||
| def _create_executor(backend: str) -> object: | ||
| import importlib |
| "nemo_curator[video_cuda12]", | ||
| ] | ||
|
|
||
| audio_alignment = [ |
There was a problem hiding this comment.
Is there a reason this can't be in audio_common?
Signed-off-by: Ssofja <sofiakostandian@gmail.com>
| def _is_node_local_mfa_root(self) -> bool: | ||
| """True when MFA root is the per-node local copy (safe to mutate).""" | ||
| if self.copy_models_to_local: | ||
| return True | ||
| try: | ||
| mfa_root = Path(self._mfa_root).resolve() | ||
| local_mfa = ( | ||
| Path(self._effective_local_base) | ||
| / f"mfa_models_{socket.gethostname()}" | ||
| ).resolve() | ||
| except OSError: | ||
| return False | ||
| return mfa_root == local_mfa or local_mfa in mfa_root.parents |
There was a problem hiding this comment.
_is_node_local_mfa_root gives a false positive when local copy was never created
The guard at line 526 immediately returns True whenever copy_models_to_local=True, but setup() (lines 273-274) silently falls back to the shared root when the expected local directory doesn't exist. In a standalone run (no Xenna, so setup_on_node() is never called) with the default copy_models_to_local=True, the method incorrectly signals that _mfa_root is a safe-to-mutate local path. Every _run_mfa_align invocation then deletes command_history.yaml from the shared NFS/Lustre directory, exactly the race condition this guard was meant to prevent. The early-return on copy_models_to_local should be removed; the resolved-path comparison below is sufficient and correct.
| for corpus_stem, task in stem_to_task.items(): | ||
| audio_path = Path(task.data[self.audio_filepath_key]) | ||
| corpus_wav = corpus_path / f"{corpus_stem}.wav" | ||
| if not corpus_wav.exists() and not corpus_wav.is_symlink(): | ||
| try: | ||
| corpus_wav.symlink_to(audio_path.resolve()) | ||
| except OSError: | ||
| shutil.copy2(audio_path, corpus_wav) |
There was a problem hiding this comment.
Non-WAV inputs create a misnamed symlink, causing a cryptic MFA failure
The corpus preparation always names the symlink/copy {stem}.wav regardless of the actual file extension. If a task's audio file is .flac, .mp3, or any other format, Kaldi reads the file expecting a RIFF WAV header and fails. The resulting RuntimeError("mfa align failed") contains only MFA's stderr output and gives no indication that a format mismatch is the root cause. Adding a guard like if audio_path.suffix.lower() != ".wav": raise ValueError(...) before creating the symlink would surface the problem immediately with a clear message.
|
|
||
| # ------------------------------------------------------------------ | ||
| # Schema | ||
| # ------------------------------------------------------------------ |
There was a problem hiding this comment.
Can you remove all these Claude comments.
| ) -> None: | ||
| """Import praatio and verify MFA root is available.""" | ||
| try: | ||
| from praatio import textgrid as tg_mod |
| self._ctm_dir.mkdir(parents=True, exist_ok=True) | ||
|
|
||
| def teardown(self) -> None: | ||
| pass |
There was a problem hiding this comment.
Can remove this since it is empty.
| class TestMFAAlignmentStage: | ||
| """Test suite for MFAAlignmentStage.""" | ||
|
|
||
| def test_stage_properties(self, tmp_path: Path) -> None: |
| def test_output_dir_required(self) -> None: | ||
| with pytest.raises(TypeError, match="output_dir"): | ||
| MFAAlignmentStage() # type: ignore[call-arg] | ||
|
|
||
| def test_process_raises_not_implemented(self, tmp_path: Path) -> None: | ||
| stage = _make_stage(tmp_path) | ||
| with pytest.raises(NotImplementedError, match="only supports process_batch"): | ||
| stage.process(AudioTask(data={"audio_filepath": "/a.wav", "text": "hi"})) | ||
|
|
||
| def test_setup_raises_without_praatio(self, tmp_path: Path) -> None: | ||
| import builtins |
There was a problem hiding this comment.
These tests can be deleted.
Signed-off-by: Ssofja <sofiakostandian@gmail.com>
|
Hi, tagging @mohammadaaftabv and @oyilmaz-nvidia for reviews too, thanks! |
Description
Add a new MFA (Montreal Forced Aligner) forced-alignment stage to NeMo Curator's audio pipeline.
MFAAlignmentStageis aProcessingStage[AudioTask, AudioTask]that runs MFA in batch mode, producing word-level TextGrid alignments with optional RTTM (speech activity) and CTM (word timing) output. It operates viaprocess_batch: it prepares a temporary MFA corpus of symlinked WAVs and transcript files, runs a singlemfa alignsubprocess, then converts the resulting TextGrid files. Node-level isolation copies MFA models to local storage to avoid NFS race conditions in distributed settings.New files:
nemo_curator/stages/audio/alignment/__init__.py— subpackage initnemo_curator/stages/audio/alignment/mfa_alignment.py— coreMFAAlignmentStage(dataclass-based)tests/stages/audio/alignment/test_mfa_alignment.py— 28 unit teststutorials/audio/alignment/— tutorial with CLI (pipeline.py), Hydra runner (run.py), YAML config, and READMEModified files:
nemo_curator/stages/audio/__init__.py— registerMFAAlignmentStageimport and__all__entrypyproject.toml— addaudio_alignmentoptional extra (praatio>=6.0,soundfile>=0.12)Key features:
mfa alignsubprocess per batch for efficiencymax_gap_for_merge)setup_on_node)xenna_stage_specUsage
Or via CLI:
python tutorials/audio/alignment/pipeline.py \ --input-manifest /data/manifest.jsonl \ --output-dir /data/aligned \ --acoustic-model english_us_arpa \ --dictionary english_us_arpaOr via Hydra:
python tutorials/audio/alignment/run.py \ --config-path=. --config-name=pipeline \ output_dir=/data/alignedChecklist