Skip to content

Montreal forced aligner#1977

Open
Ssofja wants to merge 4 commits into
NVIDIA-NeMo:mainfrom
Ssofja:montreal_forced_aligner
Open

Montreal forced aligner#1977
Ssofja wants to merge 4 commits into
NVIDIA-NeMo:mainfrom
Ssofja:montreal_forced_aligner

Conversation

@Ssofja
Copy link
Copy Markdown
Contributor

@Ssofja Ssofja commented May 13, 2026

Description

Add a new MFA (Montreal Forced Aligner) forced-alignment stage to NeMo Curator's audio pipeline.

MFAAlignmentStage is a ProcessingStage[AudioTask, AudioTask] that runs MFA in batch mode, producing word-level TextGrid alignments with optional RTTM (speech activity) and CTM (word timing) output. It operates via process_batch: it prepares a temporary MFA corpus of symlinked WAVs and transcript files, runs a single mfa align subprocess, then converts the resulting TextGrid files. Node-level isolation copies MFA models to local storage to avoid NFS race conditions in distributed settings.

New files:

  • nemo_curator/stages/audio/alignment/__init__.py — subpackage init
  • nemo_curator/stages/audio/alignment/mfa_alignment.py — core MFAAlignmentStage (dataclass-based)
  • tests/stages/audio/alignment/test_mfa_alignment.py — 28 unit tests
  • tutorials/audio/alignment/ — tutorial with CLI (pipeline.py), Hydra runner (run.py), YAML config, and README

Modified files:

  • nemo_curator/stages/audio/__init__.py — register MFAAlignmentStage import and __all__ entry
  • pyproject.toml — add audio_alignment optional extra (praatio>=6.0, soundfile>=0.12)

Key features:

  • Batch alignment via a single mfa align subprocess per batch for efficiency
  • TextGrid to RTTM conversion with configurable speech interval merging (max_gap_for_merge)
  • TextGrid to CTM conversion for word-level timing
  • Graceful fallback for files MFA silently drops (produces duration-based RTTM/CTM)
  • Duplicate stem detection with automatic renaming to prevent silent data loss
  • G2P model warning when the specified model is not found on disk
  • Node-local MFA model copying for distributed execution (setup_on_node)
  • Single-worker-per-node scheduling via xenna_stage_spec

Usage

from nemo_curator.stages.audio.alignment import MFAAlignmentStage

stage = MFAAlignmentStage(
    output_dir="/data/aligned",
    acoustic_model="english_us_arpa",
    dictionary="english_us_arpa",
    g2p_model="english_us_arpa",
    create_rttm=True,
    create_ctm=True,
)
pipeline.add_stage(stage.with_(batch_size=256))

Or via CLI:

python tutorials/audio/alignment/pipeline.py \
    --input-manifest /data/manifest.jsonl \
    --output-dir /data/aligned \
    --acoustic-model english_us_arpa \
    --dictionary english_us_arpa

Or via Hydra:

python tutorials/audio/alignment/run.py \
    --config-path=. --config-name=pipeline \
    output_dir=/data/aligned

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

@Ssofja Ssofja requested review from a team as code owners May 13, 2026 13:54
@Ssofja Ssofja requested review from weijiac0619 and removed request for a team May 13, 2026 13:54
@copy-pr-bot
Copy link
Copy Markdown

copy-pr-bot Bot commented May 13, 2026

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented May 13, 2026

Greptile Summary

This PR adds MFAAlignmentStage, a new ProcessingStage that wraps the Montreal Forced Aligner (MFA) to produce word-level TextGrid, RTTM, and CTM alignment outputs from a batch of AudioTask entries in a single mfa align subprocess call.

  • Core stage (mfa_alignment.py): dataclass-based, implements process_batch only; handles duplicate audio stems, graceful fallback for MFA-dropped files, improved tier selection that refuses to parse phone-level tiers as words, and node-local model copying for distributed NFS safety via setup_on_node().
  • Dependencies: praatio>=6.0 is added to audio_common (rather than a separate audio_alignment extra as described in the PR text), making it available to all audio extra users including all.
  • Tests & tutorials: 28 unit tests with subprocess mocking, plus a CLI pipeline, Hydra runner, and detailed README covering distributed execution and troubleshooting.

Confidence Score: 5/5

The new stage is self-contained and well-tested; the core data path (batch alignment, TextGrid parsing, RTTM/CTM conversion) behaves correctly for the standard case.

All new code paths are covered by unit tests that mock the MFA subprocess. Logic for tier selection, silence filtering, interval merging, and fallback output is straightforward and verified. The change is additive with no modifications to existing stages.

nemo_curator/stages/audio/alignment/mfa_alignment.py — the distributed-settings behaviour (model path resolution and history-file lifecycle in _is_node_local_mfa_root / _run_mfa_align) benefits from careful review before running on a multi-node cluster.

Important Files Changed

Filename Overview
nemo_curator/stages/audio/alignment/mfa_alignment.py Core MFA stage implementation; logically complete with good tier-selection and fallback handling, though the _is_node_local_mfa_root early-return on copy_models_to_local can mis-identify a shared NFS root as local in standalone (non-Xenna) runs, and non-WAV symlinks are created with a .wav extension regardless of source format.
tests/stages/audio/alignment/test_mfa_alignment.py 28 well-structured unit tests covering success, fallback, duplicate stem, custom keys, silence filtering, phone-tier rejection, and shared-root guard; mock helper _align_textgrid_output_dir now uses cmd.index("align") rather than a hardcoded index, making it robust to multi-word mfa_command strings.
pyproject.toml Adds praatio>=6.0 to audio_common; this is picked up transitively by audio_cuda12, audio_cpu, and all, so all audio users get the dependency without needing a separate extra.
tutorials/audio/alignment/pipeline.py CLI tutorial pipeline; --mfa-root-dir defaults to "" (falsy, treated as None via or) which works correctly, and backend selection is clean.
tutorials/audio/alignment/run.py Hydra runner; dynamic executor loading via importlib is safe and the backend validation guard prevents silent failures on unknown backends.

Sequence Diagram

sequenceDiagram
    participant F as Framework
    participant S as MFAAlignmentStage
    participant T as TempDir (corpus)
    participant MFA as mfa align (subprocess)
    participant FS as Output Filesystem

    F->>S: setup_on_node() [optional, Xenna only]
    S->>FS: Copy models to node-local storage
    F->>S: setup()
    S->>FS: mkdir textgrids/, rttms/, ctms/

    F->>S: process_batch(tasks)
    loop for each task
        S->>S: validate_input, dedup stems
        S->>S: _get_audio_duration (if missing)
    end
    S->>T: Create symlinks (.wav) + .txt transcript files
    S->>MFA: mfa align corpus_dir dict model tg_out_dir
    MFA-->>T: "Write *.TextGrid files"
    S->>S: "rglob(*.TextGrid) find produced files"
    alt TextGrid produced
        S->>FS: Write .rttm (merged speech intervals)
        S->>FS: Write .ctm (word timings)
        S->>S: "task.data[textgrid/rttm/ctm_filepath] = paths"
    else MFA silently dropped file
        S->>FS: Write duration-based fallback .rttm
        S->>FS: Write uniform-split fallback .ctm
        S->>S: "task.data[mfa_skipped] = True"
    end
    S-->>F: return list[AudioTask] with enriched data
Loading

Reviews (3): Last reviewed commit: "add fixes based on PR comments" | Re-trigger Greptile

if not self.output_dir:
msg = "output_dir is required for MFAAlignmentStage"
raise ValueError(msg)
self._effective_num_jobs = self.num_jobs or os.cpu_count()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 os.cpu_count() can return None, producing -j None

os.cpu_count() returns None in some containerized or virtualized environments. When num_jobs=0 (the default) and this happens, self._effective_num_jobs becomes None, which is then stringified to "None" and passed to MFA as -j None. MFA will reject that value and fail with a confusing error unrelated to alignment. A safe default of 1 ensures MFA is always given a valid integer.

Suggested change
self._effective_num_jobs = self.num_jobs or os.cpu_count()
self._effective_num_jobs = self.num_jobs or os.cpu_count() or 1

Comment on lines +267 to +274
if file_stem in stem_to_task:
original_stem = file_stem
file_stem = f"{file_stem}_{uuid.uuid4().hex[:8]}"
logger.warning(
f"Duplicate stem '{original_stem}' — renamed to "
f"'{file_stem}' to avoid silent data loss"
)
task.data["_mfa_stem"] = file_stem
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 _mfa_stem internal key leaks into output manifest

task.data["_mfa_stem"] is set on duplicate-renamed tasks but never consumed by any subsequent processing. It will appear as a surprise key in the output JSONL manifest for some (but not all) tasks, breaking any downstream schema validation that expects a fixed set of keys.

Suggested change
if file_stem in stem_to_task:
original_stem = file_stem
file_stem = f"{file_stem}_{uuid.uuid4().hex[:8]}"
logger.warning(
f"Duplicate stem '{original_stem}' — renamed to "
f"'{file_stem}' to avoid silent data loss"
)
task.data["_mfa_stem"] = file_stem
if file_stem in stem_to_task:
original_stem = file_stem
file_stem = f"{file_stem}_{uuid.uuid4().hex[:8]}"
logger.warning(
f"Duplicate stem '{original_stem}' — renamed to "
f"'{file_stem}' to avoid silent data loss"
)

Comment on lines +484 to +489
tier_name = "words"
tier = (
tg.getTier(tier_name)
if tier_name in tg.tierNames
else tg.getTier(tg.tierNames[0])
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Fallback tier selection can silently pick a phone-level tier instead of words

When the TextGrid has no tier named "words" the code falls back to tg.tierNames[0]. MFA TextGrids typically have both a "words" tier and a "phones" tier; if ordering is not guaranteed, the fallback may parse phone-level intervals as word intervals, producing nonsense RTTM/CTM output with no error or warning.

Suggested change
tier_name = "words"
tier = (
tg.getTier(tier_name)
if tier_name in tg.tierNames
else tg.getTier(tg.tierNames[0])
)
tier_name = "words"
if tier_name not in tg.tierNames:
logger.warning(
f"No 'words' tier in {textgrid_path}; "
f"available tiers: {tg.tierNames}. Falling back to first tier."
)
tier = (
tg.getTier(tier_name)
if tier_name in tg.tierNames
else tg.getTier(tg.tierNames[0])
)

Comment on lines +394 to +399
history_file = Path(self._mfa_root) / "command_history.yaml"
if history_file.exists():
try:
history_file.unlink()
except Exception: # noqa: BLE001
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 command_history.yaml deleted from shared NFS storage when copy_models_to_local=False

When local model copying is disabled, self._mfa_root points at the shared NFS/Lustre directory. Every batch invocation tries to unlink() the shared history file. In a distributed run this creates concurrent deletion attempts against the shared filesystem, and MFA itself may be writing the file at the same time — exactly the race condition copy_models_to_local is meant to prevent.

Suggested change
history_file = Path(self._mfa_root) / "command_history.yaml"
if history_file.exists():
try:
history_file.unlink()
except Exception: # noqa: BLE001
pass
if self.copy_models_to_local:
history_file = Path(self._mfa_root) / "command_history.yaml"
if history_file.exists():
try:
history_file.unlink()
except Exception: # noqa: BLE001
pass

acoustic_model: str = "english_us_arpa"
dictionary: str = "english_us_arpa"
g2p_model: str = "english_us_arpa"
output_dir: str = ""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
output_dir: str = ""
output_dir: str

instead of checking it in the post int.

Comment on lines +120 to +121
mfa_root_dir: str = ""
local_mfa_base_dir: str = ""
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
mfa_root_dir: str = ""
local_mfa_base_dir: str = ""
mfa_root_dir: str | None = None
local_mfa_base_dir: str | None = None

if not self.output_dir:
msg = "output_dir is required for MFAAlignmentStage"
raise ValueError(msg)
self._effective_num_jobs = self.num_jobs or os.cpu_count()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't read through the whole PR yet but we probably wouldn't want this? Let's let the pipeline and executor work out this type of logic instead.

# See the License for the specific language governing permissions and
# limitations under the License.

# modality: audio
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed. (Also will be ignored since it is not at the top of the file.)

Comment thread tutorials/audio/alignment/pipeline.yaml
Comment thread tutorials/audio/alignment/README.md Outdated
Comment on lines +23 to +27
# Using uv (recommended)
uv sync --extra audio_alignment

# Using pip
pip install "nemo-curator[audio_alignment]"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Using uv (recommended)
uv sync --extra audio_alignment
# Using pip
pip install "nemo-curator[audio_alignment]"
# Using uv (recommended)
uv sync --extra audio_alignment

let's not include pip.

Comment thread tutorials/audio/alignment/run.py Outdated


def _create_executor(backend: str) -> object:
import importlib
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level import?

Comment thread pyproject.toml Outdated
"nemo_curator[video_cuda12]",
]

audio_alignment = [
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason this can't be in audio_common?

Signed-off-by: Ssofja <sofiakostandian@gmail.com>
Comment on lines +524 to +536
def _is_node_local_mfa_root(self) -> bool:
"""True when MFA root is the per-node local copy (safe to mutate)."""
if self.copy_models_to_local:
return True
try:
mfa_root = Path(self._mfa_root).resolve()
local_mfa = (
Path(self._effective_local_base)
/ f"mfa_models_{socket.gethostname()}"
).resolve()
except OSError:
return False
return mfa_root == local_mfa or local_mfa in mfa_root.parents
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 _is_node_local_mfa_root gives a false positive when local copy was never created

The guard at line 526 immediately returns True whenever copy_models_to_local=True, but setup() (lines 273-274) silently falls back to the shared root when the expected local directory doesn't exist. In a standalone run (no Xenna, so setup_on_node() is never called) with the default copy_models_to_local=True, the method incorrectly signals that _mfa_root is a safe-to-mutate local path. Every _run_mfa_align invocation then deletes command_history.yaml from the shared NFS/Lustre directory, exactly the race condition this guard was meant to prevent. The early-return on copy_models_to_local should be removed; the resolved-path comparison below is sufficient and correct.

Comment on lines +293 to +300
for corpus_stem, task in stem_to_task.items():
audio_path = Path(task.data[self.audio_filepath_key])
corpus_wav = corpus_path / f"{corpus_stem}.wav"
if not corpus_wav.exists() and not corpus_wav.is_symlink():
try:
corpus_wav.symlink_to(audio_path.resolve())
except OSError:
shutil.copy2(audio_path, corpus_wav)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Non-WAV inputs create a misnamed symlink, causing a cryptic MFA failure

The corpus preparation always names the symlink/copy {stem}.wav regardless of the actual file extension. If a task's audio file is .flac, .mp3, or any other format, Kaldi reads the file expecting a RIFF WAV header and fails. The resulting RuntimeError("mfa align failed") contains only MFA's stderr output and gives no indication that a format mismatch is the root cause. Adding a guard like if audio_path.suffix.lower() != ".wav": raise ValueError(...) before creating the symlink would surface the problem immediately with a clear message.


# ------------------------------------------------------------------
# Schema
# ------------------------------------------------------------------
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove all these Claude comments.

) -> None:
"""Import praatio and verify MFA root is available."""
try:
from praatio import textgrid as tg_mod
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Top-level import?

self._ctm_dir.mkdir(parents=True, exist_ok=True)

def teardown(self) -> None:
pass
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this since it is empty.

class TestMFAAlignmentStage:
"""Test suite for MFAAlignmentStage."""

def test_stage_properties(self, tmp_path: Path) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be deleted.

Comment on lines +160 to +170
def test_output_dir_required(self) -> None:
with pytest.raises(TypeError, match="output_dir"):
MFAAlignmentStage() # type: ignore[call-arg]

def test_process_raises_not_implemented(self, tmp_path: Path) -> None:
stage = _make_stage(tmp_path)
with pytest.raises(NotImplementedError, match="only supports process_batch"):
stage.process(AudioTask(data={"audio_filepath": "/a.wav", "text": "hi"}))

def test_setup_raises_without_praatio(self, tmp_path: Path) -> None:
import builtins
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests can be deleted.

Signed-off-by: Ssofja <sofiakostandian@gmail.com>
@sarahyurick
Copy link
Copy Markdown
Contributor

Hi, tagging @mohammadaaftabv and @oyilmaz-nvidia for reviews too, thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants