diff --git a/js/hang/src/catalog/audio.ts b/js/hang/src/catalog/audio.ts index e57710bf2..dee81c3eb 100644 --- a/js/hang/src/catalog/audio.ts +++ b/js/hang/src/catalog/audio.ts @@ -15,7 +15,7 @@ export const AudioConfigSchema = z.object({ codec: z.string(), // Container format for timestamp encoding - // Defaults to "legacy" when not specified in catalog (backward compatibility) + // Defaults to "native" when not specified in catalog (backward compatibility) container: ContainerSchema.default(DEFAULT_CONTAINER), // The description is used for some codecs. @@ -32,6 +32,12 @@ export const AudioConfigSchema = z.object({ // The bitrate of the audio in bits per second // TODO: Support up to Number.MAX_SAFE_INTEGER bitrate: u53Schema.optional(), + + // Init segment (ftyp+moov) for CMAF/fMP4 containers. + // This is the initialization segment needed for MSE playback. + // Stored as base64-encoded bytes. If not provided, init segments + // will be sent over the data track (legacy behavior). + initSegment: z.string().optional(), // base64-encoded }); export const AudioSchema = z diff --git a/js/hang/src/catalog/container.ts b/js/hang/src/catalog/container.ts index 05b0e81db..6ec563e9f 100644 --- a/js/hang/src/catalog/container.ts +++ b/js/hang/src/catalog/container.ts @@ -3,16 +3,16 @@ import { z } from "zod"; /** * Container format for frame timestamp encoding. * - * - "legacy": Uses QUIC VarInt encoding (1-8 bytes, variable length) + * - "native": Uses QUIC VarInt encoding (1-8 bytes, variable length) * - "raw": Uses fixed u64 encoding (8 bytes, big-endian) - * - "fmp4": Fragmented MP4 container (future) + * - "cmaf": Fragmented MP4 container (future) */ -export const ContainerSchema = z.enum(["legacy", "raw", "fmp4"]); +export const ContainerSchema = z.enum(["native", "raw", "cmaf"]); export type Container = z.infer; /** * Default container format when not specified. - * Set to legacy for backward compatibility. + * Set to native for backward compatibility. */ -export const DEFAULT_CONTAINER: Container = "legacy"; +export const DEFAULT_CONTAINER: Container = "native"; diff --git a/js/hang/src/catalog/video.ts b/js/hang/src/catalog/video.ts index b8b77883f..fbb460de5 100644 --- a/js/hang/src/catalog/video.ts +++ b/js/hang/src/catalog/video.ts @@ -14,7 +14,7 @@ export const VideoConfigSchema = z.object({ codec: z.string(), // Container format for timestamp encoding - // Defaults to "legacy" when not specified in catalog (backward compatibility) + // Defaults to "native" when not specified in catalog (backward compatibility) container: ContainerSchema.default(DEFAULT_CONTAINER), // The description is used for some codecs. @@ -43,6 +43,12 @@ export const VideoConfigSchema = z.object({ // If true, the decoder will optimize for latency. // Default: true optimizeForLatency: z.boolean().optional(), + + // Init segment (ftyp+moov) for CMAF/fMP4 containers. + // This is the initialization segment needed for MSE playback. + // Stored as base64-encoded bytes. If not provided, init segments + // will be sent over the data track (legacy behavior). + initSegment: z.string().optional(), // base64-encoded }); // Mirrors VideoDecoderConfig diff --git a/js/hang/src/container/codec.ts b/js/hang/src/container/codec.ts index ca60754a4..2eb097e2d 100644 --- a/js/hang/src/container/codec.ts +++ b/js/hang/src/container/codec.ts @@ -11,12 +11,14 @@ import { DEFAULT_CONTAINER } from "../catalog"; */ export function encodeTimestamp(timestamp: Time.Micro, container: Catalog.Container = DEFAULT_CONTAINER): Uint8Array { switch (container) { - case "legacy": + case "native": return encodeVarInt(timestamp); case "raw": return encodeU64(timestamp); - case "fmp4": - throw new Error("fmp4 container not yet implemented"); + case "cmaf": { + // CMAF fragments contain timestamps in moof atoms, no header needed + return new Uint8Array(0); + } } } @@ -32,7 +34,7 @@ export function decodeTimestamp( container: Catalog.Container = DEFAULT_CONTAINER, ): [Time.Micro, Uint8Array] { switch (container) { - case "legacy": { + case "native": { const [value, remaining] = decodeVarInt(buffer); return [value as Time.Micro, remaining]; } @@ -40,8 +42,9 @@ export function decodeTimestamp( const [value, remaining] = decodeU64(buffer); return [value as Time.Micro, remaining]; } - case "fmp4": - throw new Error("fmp4 container not yet implemented"); + case "cmaf": { + return [0 as Time.Micro, buffer]; + } } } @@ -54,12 +57,12 @@ export function decodeTimestamp( */ export function getTimestampSize(container: Catalog.Container = DEFAULT_CONTAINER): number { switch (container) { - case "legacy": + case "native": return 8; // VarInt maximum size case "raw": return 8; // u64 fixed size - case "fmp4": - throw new Error("fmp4 container not yet implemented"); + case "cmaf": + return 8; // VarInt maximum size (same as native) } } diff --git a/js/hang/src/frame.ts b/js/hang/src/frame.ts index a0fd86219..c9936411b 100644 --- a/js/hang/src/frame.ts +++ b/js/hang/src/frame.ts @@ -20,6 +20,16 @@ export function encode(source: Uint8Array | Source, timestamp: Time.Micro, conta // Encode timestamp using the specified container format const timestampBytes = Container.encodeTimestamp(timestamp, container); + // For CMAF, timestampBytes will be empty, so we just return the source + if (container === "cmaf") { + if (source instanceof Uint8Array) { + return source; + } + const data = new Uint8Array(source.byteLength); + source.copyTo(data); + return data; + } + // Allocate buffer for timestamp + payload const payloadSize = source instanceof Uint8Array ? source.byteLength : source.byteLength; const data = new Uint8Array(timestampBytes.byteLength + payloadSize); @@ -112,19 +122,18 @@ export class Consumer { async #run() { // Start fetching groups in the background + for (;;) { const consumer = await this.#track.nextGroup(); - if (!consumer) break; + if (!consumer) { + break; + } - // To improve TTV, we always start with the first group. - // For higher latencies we might need to figure something else out, as its racey. if (this.#active === undefined) { this.#active = consumer.sequence; } if (consumer.sequence < this.#active) { - console.warn(`skipping old group: ${consumer.sequence} < ${this.#active}`); - // Skip old groups. consumer.close(); continue; } @@ -150,7 +159,9 @@ export class Consumer { for (;;) { const next = await group.consumer.readFrame(); - if (!next) break; + if (!next) { + break; + } const { data, timestamp } = decode(next, this.#container); const frame = { @@ -223,8 +234,6 @@ export class Consumer { if (this.#active !== undefined && first.consumer.sequence <= this.#active) { this.#groups.shift(); - console.warn(`skipping slow group: ${first.consumer.sequence} < ${this.#groups[0]?.consumer.sequence}`); - first.consumer.close(); first.frames.length = 0; } @@ -246,7 +255,9 @@ export class Consumer { this.#groups[0].consumer.sequence <= this.#active ) { const frame = this.#groups[0].frames.shift(); - if (frame) return frame; + if (frame) { + return frame; + } // Check if the group is done and then remove it. if (this.#active > this.#groups[0].consumer.sequence) { @@ -261,7 +272,9 @@ export class Consumer { const wait = new Promise((resolve) => { this.#notify = resolve; - }).then(() => true); + }).then(() => { + return true; + }); if (!(await Promise.race([wait, this.#signals.closed]))) { this.#notify = undefined; diff --git a/js/hang/src/util/mp4-mime.ts b/js/hang/src/util/mp4-mime.ts new file mode 100644 index 000000000..18e59345d --- /dev/null +++ b/js/hang/src/util/mp4-mime.ts @@ -0,0 +1,52 @@ +import type * as Catalog from "../catalog"; + +/** + * Builds an MP4 MIME type string for MediaSource from a codec string. + * + * @param codec - The codec string from the catalog (e.g., "avc1.42E01E", "mp4a.40.2") + * @param type - "video" or "audio" + * @returns MP4 MIME type string (e.g., "video/mp4; codecs=\"avc1.42E01E\"") + */ +function buildMp4MimeType(codec: string, type: "video" | "audio"): string { + // For MP4 containers, we use the standard MIME type format + // Most codecs are already in the correct format for MSE + return `${type}/mp4; codecs="${codec}"`; +} + +/** + * Checks if a MIME type is supported by MediaSource. + * + * @param mimeType - The MIME type to check + * @returns true if supported, false otherwise + */ +export function isMimeTypeSupported(mimeType: string): boolean { + return MediaSource.isTypeSupported(mimeType); +} + +/** + * Builds and validates an MP4 MIME type for video from catalog config. + * + * @param config - Video configuration from catalog + * @returns MP4 MIME type string or undefined if not supported + */ +export function buildMp4VideoMimeType(config: Catalog.VideoConfig): string | undefined { + const mimeType = buildMp4MimeType(config.codec, "video"); + if (isMimeTypeSupported(mimeType)) { + return mimeType; + } + return undefined; +} + +/** + * Builds and validates an MP4 MIME type for audio from catalog config. + * + * @param config - Audio configuration from catalog + * @returns MP4 MIME type string or undefined if not supported + */ +export function buildMp4AudioMimeType(config: Catalog.AudioConfig): string | undefined { + const mimeType = buildMp4MimeType(config.codec, "audio"); + if (isMimeTypeSupported(mimeType)) { + return mimeType; + } + return undefined; +} diff --git a/js/hang/src/watch/audio/emitter.ts b/js/hang/src/watch/audio/emitter.ts index 79be52da9..2bdbb9870 100644 --- a/js/hang/src/watch/audio/emitter.ts +++ b/js/hang/src/watch/audio/emitter.ts @@ -46,7 +46,8 @@ export class Emitter { }); this.#signals.effect((effect) => { - const enabled = !effect.get(this.paused) && !effect.get(this.muted); + const paused = effect.get(this.paused); + const enabled = !paused; this.source.enabled.set(enabled); }); @@ -56,7 +57,44 @@ export class Emitter { this.muted.set(volume === 0); }); + // Handle MSE path (HTMLAudioElement) vs WebCodecs path (AudioWorklet) this.#signals.effect((effect) => { + const mseAudio = effect.get(this.source.mseAudioElement); + if (mseAudio) { + // MSE path: control HTMLAudioElement directly + effect.effect(() => { + const volume = effect.get(this.volume); + const muted = effect.get(this.muted); + const paused = effect.get(this.paused); + mseAudio.volume = volume; + mseAudio.muted = muted; + + // Control play/pause state + if (paused && !mseAudio.paused) { + mseAudio.pause(); + } else if (!paused && mseAudio.paused) { + // Resume if paused - try to play even if readyState is low + const tryPlay = () => { + if (!paused && mseAudio.paused) { + mseAudio + .play() + .catch((err) => console.error("[Audio Emitter] Failed to resume audio:", err)); + } + }; + + // Try to play if we have metadata (HAVE_METADATA = 1), browser will start when ready + if (mseAudio.readyState >= HTMLMediaElement.HAVE_METADATA) { + tryPlay(); + } else { + // Wait for loadedmetadata event if not ready yet + mseAudio.addEventListener("loadedmetadata", tryPlay, { once: true }); + } + } + }); + return; + } + + // WebCodecs path: use AudioWorklet with GainNode const root = effect.get(this.source.root); if (!root) return; @@ -76,9 +114,10 @@ export class Emitter { }); }); + // Only apply gain transitions for WebCodecs path (when gain node exists) this.#signals.effect((effect) => { const gain = effect.get(this.#gain); - if (!gain) return; + if (!gain) return; // MSE path doesn't use gain node // Cancel any scheduled transitions on change. effect.cleanup(() => gain.gain.cancelScheduledValues(gain.context.currentTime)); diff --git a/js/hang/src/watch/audio/source.ts b/js/hang/src/watch/audio/source.ts index fad996840..588c76f6e 100644 --- a/js/hang/src/watch/audio/source.ts +++ b/js/hang/src/watch/audio/source.ts @@ -5,6 +5,8 @@ import type * as Catalog from "../../catalog"; import * as Frame from "../../frame"; import * as Hex from "../../util/hex"; import * as libav from "../../util/libav"; +import type { SourceMSE } from "../source-mse"; +import type * as Video from "../video"; import type * as Render from "./render"; // We want some extra overhead to avoid starving the render worklet. @@ -41,6 +43,10 @@ export class Source { // Downcast to AudioNode so it matches Publish.Audio readonly root = this.#worklet as Getter; + // For MSE path, expose the HTMLAudioElement for direct control + #mseAudioElement = new Signal(undefined); + readonly mseAudioElement = this.#mseAudioElement as Getter; + #sampleRate = new Signal(undefined); readonly sampleRate: Getter = this.#sampleRate; @@ -58,6 +64,13 @@ export class Source { #signals = new Effect(); + // Track active audio track subscription to prevent double subscription + // Similar to video's #active pattern - tracks the current running subscription + #activeAudioTrack?: Effect; + + // Reference to video source for coordination + video?: Video.Source; + constructor( broadcast: Getter, catalog: Getter, @@ -95,6 +108,12 @@ export class Source { const config = effect.get(this.config); if (!config) return; + // Don't create worklet for MSE (cmaf) - browser handles playback directly + // The worklet is only needed for WebCodecs path + if (config.container === "cmaf") { + return; + } + const sampleRate = config.sampleRate; const channelCount = config.numberOfChannels; @@ -149,26 +168,164 @@ export class Source { #runDecoder(effect: Effect): void { const enabled = effect.get(this.enabled); - if (!enabled) return; + const config = effect.get(this.config); + + // For CMAF, always initialize (even if disabled) to add SourceBuffer before video starts + // For non-CMAF, only initialize if enabled + if (config?.container !== "cmaf" && !enabled) { + return; + } const catalog = effect.get(this.catalog); - if (!catalog) return; + if (!catalog) { + return; + } const broadcast = effect.get(this.broadcast); - if (!broadcast) return; + if (!broadcast) { + return; + } - const config = effect.get(this.config); - if (!config) return; + if (!config) { + return; + } const active = effect.get(this.active); - if (!active) return; + if (!active) { + return; + } + + // For CMAF, watch video.mseSource reactively so we re-run when video creates a new SourceMSE + // This ensures audio re-initializes when video resolution changes + // IMPORTANT: effect.get() must be called unconditionally for the effect to track the signal + if (config.container === "cmaf" && this.video) { + // Always call effect.get() unconditionally - the effect system will track this signal + // and re-run #runDecoder when mseSource changes (e.g., new SourceMSE created on resolution change) + const mseSource = effect.get(this.video.mseSource); + // If mseSource is not available yet, wait for it (effect will re-run when it's set) + if (!mseSource) { + return; + } + } + + // Close previous subscription if exists + if (this.#activeAudioTrack) { + this.#activeAudioTrack.close(); + } + + // Route to MSE for CMAF, WebCodecs for native/raw + // For CMAF, ALWAYS initialize MSE (even if disabled) to add SourceBuffer + // This ensures MediaSource has both SourceBuffers before video starts appending + // The SourceBuffer will be added, but fragments won't be appended if disabled + if (config.container === "cmaf") { + // Always initialize for CMAF - SourceBuffer must be added before video starts + // Create a new effect for this subscription (like video does with #pending) + const trackEffect = new Effect(); + this.#activeAudioTrack = trackEffect; + effect.cleanup(() => trackEffect.close()); + this.#runMSEPath(trackEffect, broadcast, active, config, catalog); + } else { + // For non-CMAF, only run if enabled + if (enabled) { + // Create a new effect for this subscription (like video does with #pending) + const trackEffect = new Effect(); + this.#activeAudioTrack = trackEffect; + effect.cleanup(() => trackEffect.close()); + this.#runWebCodecsPath(trackEffect, broadcast, active, config, catalog); + } + } + } + + #runMSEPath( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: Catalog.AudioConfig, + catalog: Catalog.Audio, + ): void { + // Use the unified SourceMSE from video - it manages both video and audio SourceBuffers + // Use a reactive effect to always get the latest SourceMSE instance + effect.cleanup(() => { + // Clear tracking when effect is cleaned up + if (this.#activeAudioTrack === effect) { + this.#activeAudioTrack = undefined; + } + }); + + effect.spawn(async () => { + // Wait for video's MSE source to be available + // Video creates it asynchronously, and may recreate it when restarting + let videoMseSource: SourceMSE | undefined; + if (this.video?.mseSource) { + // Wait up to 2 seconds for video MSE source to be available + const maxWait = 2000; + const startTime = Date.now(); + while (!videoMseSource && Date.now() - startTime < maxWait) { + videoMseSource = effect.get(this.video.mseSource); + if (!videoMseSource) { + await new Promise((resolve) => setTimeout(resolve, 50)); // Check more frequently + } + } + } + + if (!videoMseSource) { + console.error("[Audio Source] Video MSE source not available, falling back to WebCodecs"); + this.#runWebCodecsPath(effect, broadcast, name, config, catalog); + return; + } + + // Expose video element as "audioElement" for compatibility with emitter + this.#signals.effect((eff) => { + const videoElement = videoMseSource.videoElement ? eff.get(videoMseSource.videoElement) : undefined; + eff.set(this.#mseAudioElement, videoElement as HTMLAudioElement | undefined); + }); + + // Forward stats + this.#signals.effect((eff) => { + eff.set(this.#stats, { bytesReceived: 0 }); + }); + + // Check if audio is enabled + const isEnabled = effect.get(this.enabled); + + // Only subscribe to track and initialize SourceBuffer if enabled + if (!isEnabled) { + return; + } + + // Wait for MediaSource to be ready + const maxWait = 5000; + const startTime = Date.now(); + while (Date.now() - startTime < maxWait) { + const ms = videoMseSource.mediaSource ? effect.get(videoMseSource.mediaSource) : undefined; + if (ms && typeof ms === "object" && "readyState" in ms && (ms as MediaSource).readyState === "open") { + break; + } + await new Promise((resolve) => setTimeout(resolve, 50)); + } + + // Initialize audio SourceBuffer and run track + try { + await videoMseSource.initializeAudio(config); + await videoMseSource.runAudioTrack(effect, broadcast, name, config, catalog, this.enabled); + } catch (error) { + console.warn("[Audio Source] Failed to initialize audio:", error); + } + }); + } - const sub = broadcast.subscribe(active, catalog.priority); + #runWebCodecsPath( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: Catalog.AudioConfig, + catalog: Catalog.Audio, + ): void { + const sub = broadcast.subscribe(name, catalog.priority); effect.cleanup(() => sub.close()); // Create consumer with slightly less latency than the render worklet to avoid underflowing. - // Container defaults to "legacy" via Zod schema for backward compatibility - console.log(`[Audio Subscriber] Using container format: ${config.container}`); + // Container defaults to "native" via Zod schema for backward compatibility const consumer = new Frame.Consumer(sub, { latency: Math.max(this.latency.peek() - JITTER_UNDERHEAD, 0) as Time.Milli, container: config.container, @@ -244,6 +401,9 @@ export class Source { } close() { + // Close active audio track subscription + this.#activeAudioTrack?.close(); + this.#activeAudioTrack = undefined; this.#signals.close(); } } diff --git a/js/hang/src/watch/broadcast.ts b/js/hang/src/watch/broadcast.ts index c8fe30a1a..5e045811d 100644 --- a/js/hang/src/watch/broadcast.ts +++ b/js/hang/src/watch/broadcast.ts @@ -6,6 +6,7 @@ import * as Audio from "./audio"; import { Chat, type ChatProps } from "./chat"; import * as Location from "./location"; import { Preview, type PreviewProps } from "./preview"; +import type { SourceMSE } from "./source-mse"; import * as User from "./user"; import * as Video from "./video"; @@ -62,8 +63,14 @@ export class Broadcast { this.path = Signal.from(props?.path); this.enabled = Signal.from(props?.enabled ?? false); this.reload = Signal.from(props?.reload ?? true); - this.audio = new Audio.Source(this.#broadcast, this.#catalog, props?.audio); + + // Create video first so audio can use its MediaSource this.video = new Video.Source(this.#broadcast, this.#catalog, props?.video); + + // Create audio and pass video reference for coordination + this.audio = new Audio.Source(this.#broadcast, this.#catalog, props?.audio); + this.audio.video = this.video; // Pass video reference for coordination + this.location = new Location.Root(this.#broadcast, this.#catalog, props?.location); this.chat = new Chat(this.#broadcast, this.#catalog, props?.chat); this.preview = new Preview(this.#broadcast, this.#catalog, props?.preview); @@ -72,6 +79,7 @@ export class Broadcast { this.signals.effect(this.#runReload.bind(this)); this.signals.effect(this.#runBroadcast.bind(this)); this.signals.effect(this.#runCatalog.bind(this)); + this.signals.effect(this.#restartAudioOnVideoChange.bind(this)); } #runReload(effect: Effect): void { @@ -156,6 +164,48 @@ export class Broadcast { } } + // Track the previous SourceMSE instance to detect changes + #previousMseSource?: SourceMSE; + + // Restart audio when video resolution/track changes + // This ensures audio re-subscribes with the new SourceMSE instance + // NOTE: This is now redundant since audio.#runDecoder watches video.mseSource directly, + // but keeping it as a backup mechanism + #restartAudioOnVideoChange(effect: Effect): void { + const mseSource = effect.get(this.video.mseSource); + if (!mseSource) { + this.#previousMseSource = undefined; + return; + } + + // Check if SourceMSE instance changed (new instance = resolution change) + if (mseSource === this.#previousMseSource) { + return; // Same instance, no change + } + + const wasInitialized = this.#previousMseSource !== undefined; + this.#previousMseSource = mseSource; + + // Skip on initial setup (when previous is undefined) + if (!wasInitialized) { + return; + } + + // Only restart audio if it's enabled and using CMAF (MSE path) + const audioEnabled = effect.get(this.audio.enabled); + const audioConfig = effect.get(this.audio.config); + if (!audioEnabled || audioConfig?.container !== "cmaf") { + return; + } + + // Restart audio by toggling enabled (mimics pause/unpause that fixes the issue) + // NOTE: This is now redundant since audio.#runDecoder watches video.mseSource directly + this.audio.enabled.set(false); + queueMicrotask(() => { + this.audio.enabled.set(true); + }); + } + close() { this.signals.close(); diff --git a/js/hang/src/watch/source-mse.ts b/js/hang/src/watch/source-mse.ts new file mode 100644 index 000000000..b2e6d018a --- /dev/null +++ b/js/hang/src/watch/source-mse.ts @@ -0,0 +1,828 @@ +import type * as Moq from "@moq/lite"; +import type { Time } from "@moq/lite"; +import { Effect, type Getter, Signal } from "@moq/signals"; +import type * as Catalog from "../catalog"; +import * as Frame from "../frame"; +import { PRIORITY } from "../publish/priority"; +import * as Mime from "../util/mp4-mime"; + +// The types in VideoDecoderConfig that cause a hard reload. +type RequiredDecoderConfig = Omit & + Partial>; + +type BufferStatus = { state: "empty" | "filled" }; + +type SyncStatus = { + state: "ready" | "wait"; + bufferDuration?: number; +}; + +export interface VideoStats { + frameCount: number; + timestamp: number; + bytesReceived: number; +} + +/** + * MSE-based video source for CMAF/fMP4 fragments. + * Uses Media Source Extensions to handle complete moof+mdat fragments. + */ +export class SourceMSE { + #video?: HTMLVideoElement; + #mediaSource?: MediaSource; + #videoSourceBuffer?: SourceBuffer; + #audioSourceBuffer?: SourceBuffer; + #audioSourceBufferSetup = false; + #audioInitSegmentAppended = false; + + readonly mediaSource = new Signal(undefined); + readonly videoElement = new Signal(undefined); + + #videoAppendQueue: Uint8Array[] = []; + #audioAppendQueue: Uint8Array[] = []; + static readonly MAX_QUEUE_SIZE = 10; + + frame = new Signal(undefined); + latency: Signal; + display = new Signal<{ width: number; height: number } | undefined>(undefined); + flip = new Signal(undefined); + + bufferStatus = new Signal({ state: "empty" }); + syncStatus = new Signal({ state: "ready" }); + + #stats = new Signal(undefined); + + #signals = new Effect(); + #frameCallbackId?: number; + + constructor(latency: Signal) { + this.latency = latency; + } + + #isBufferUpdating(): boolean { + if (!this.#mediaSource) return false; + const buffers = this.#mediaSource.sourceBuffers; + for (let i = 0; i < buffers.length; i++) { + if (buffers[i].updating) { + return true; + } + } + return false; + } + + async initializeVideo(config: RequiredDecoderConfig): Promise { + const mimeType = Mime.buildMp4VideoMimeType(config); + if (!mimeType) { + throw new Error(`Unsupported codec for MSE: ${config.codec}`); + } + + this.#video = document.createElement("video"); + this.#video.style.display = "none"; + this.#video.playsInline = true; + this.#video.muted = false; + document.body.appendChild(this.#video); + + this.videoElement.set(this.#video); + + this.#mediaSource = new MediaSource(); + this.mediaSource.set(this.#mediaSource); + + const url = URL.createObjectURL(this.#mediaSource); + this.#video.src = url; + await new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error("MediaSource sourceopen timeout")); + }, 5000); + + this.#mediaSource?.addEventListener( + "sourceopen", + () => { + clearTimeout(timeout); + if (this.#mediaSource) { + this.mediaSource.set(this.#mediaSource); + } + try { + this.#videoSourceBuffer = this.#mediaSource?.addSourceBuffer(mimeType); + if (!this.#videoSourceBuffer) { + reject(new Error("Failed to create video SourceBuffer")); + return; + } + this.#setupVideoSourceBuffer(); + resolve(); + } catch (error) { + console.error("[MSE] Error creating video SourceBuffer:", error); + reject(error); + } + }, + { once: true }, + ); + + this.#mediaSource?.addEventListener( + "error", + (e) => { + clearTimeout(timeout); + console.error("[MSE] MediaSource error event:", e); + reject(new Error(`MediaSource error: ${e}`)); + }, + { once: true }, + ); + }); + + this.#startFrameCapture(); + } + + async initializeAudio(config: Catalog.AudioConfig): Promise { + if (this.#audioSourceBuffer && this.#audioSourceBufferSetup) { + return; + } + + const mimeType = Mime.buildMp4AudioMimeType(config); + if (!mimeType) { + throw new Error(`Unsupported codec for MSE: ${config.codec}`); + } + + // Wait for signal propagation, then get MediaSource + await new Promise((resolve) => setTimeout(resolve, 10)); + let mediaSource = this.mediaSource.peek(); + + if (!mediaSource && this.#mediaSource) { + mediaSource = this.#mediaSource; + } + + if (mediaSource && mediaSource.readyState === "open") { + this.#mediaSource = mediaSource; + } else { + await new Promise((resolve, reject) => { + const maxWait = 5000; + const startTime = Date.now(); + const checkInterval = 50; + + const timeout = setTimeout(() => { + const waited = ((Date.now() - startTime) / 1000).toFixed(1); + reject( + new Error( + `MediaSource not ready after ${waited}s (current state: ${mediaSource?.readyState || "not created"})`, + ), + ); + }, maxWait); + + const checkReady = () => { + mediaSource = this.mediaSource.peek(); + + if (!mediaSource && this.#mediaSource) { + mediaSource = this.#mediaSource; + } + + if (mediaSource && mediaSource.readyState === "open") { + clearTimeout(timeout); + this.#mediaSource = mediaSource; + resolve(); + return; + } + + const elapsed = Date.now() - startTime; + + if (mediaSource && mediaSource.readyState === "closed") { + if (this.#mediaSource === mediaSource) { + this.#mediaSource = undefined; + } + } + + if (elapsed < maxWait) { + setTimeout(checkReady, checkInterval); + } else { + clearTimeout(timeout); + const waited = (elapsed / 1000).toFixed(1); + const finalSignalState = this.mediaSource.peek()?.readyState || "not set"; + const finalPrivateState = this.#mediaSource?.readyState || "not set"; + reject( + new Error( + `MediaSource not ready after ${waited}s (signal: ${finalSignalState}, private: ${finalPrivateState})`, + ), + ); + } + }; + + checkReady(); + }); + } + + mediaSource = this.mediaSource.peek() || this.#mediaSource; + if (!mediaSource || mediaSource.readyState !== "open") { + throw new Error(`MediaSource not ready (state: ${mediaSource?.readyState || "not created"})`); + } + + this.#mediaSource = mediaSource; + + const existingAudioBuffer = this.#resolveExistingAudioSourceBuffer(Array.from(this.#mediaSource.sourceBuffers)); + if (existingAudioBuffer) { + this.#setAudioSourceBuffer(existingAudioBuffer); + return; + } + if (this.#videoSourceBuffer?.updating) { + await new Promise((resolve) => { + if (!this.#videoSourceBuffer) { + resolve(); + return; + } + this.#videoSourceBuffer.addEventListener( + "updateend", + () => { + resolve(); + }, + { once: true }, + ); + }); + } + + const audioBufferAfterWait = this.#resolveExistingAudioSourceBuffer( + Array.from(this.#mediaSource.sourceBuffers), + ); + if (audioBufferAfterWait) { + this.#setAudioSourceBuffer(audioBufferAfterWait); + return; + } + + if (this.#mediaSource.readyState !== "open") { + throw new Error( + `MediaSource readyState changed to "${this.#mediaSource.readyState}" before adding audio SourceBuffer`, + ); + } + + mediaSource = this.mediaSource.peek() || this.#mediaSource; + if (!mediaSource) { + throw new Error("MediaSource is not available"); + } + + this.#mediaSource = mediaSource; + + if (this.#videoSourceBuffer?.updating) { + await new Promise((resolve) => { + const timeout = setTimeout(() => { + resolve(); + }, 500); + + if (!this.#videoSourceBuffer) { + clearTimeout(timeout); + resolve(); + return; + } + this.#videoSourceBuffer.addEventListener( + "updateend", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true }, + ); + }); + } + + const sourceBuffers = Array.from(mediaSource.sourceBuffers); + if (!MediaSource.isTypeSupported(mimeType)) { + throw new Error(`Audio MIME type not supported: ${mimeType}`); + } + + try { + if (sourceBuffers.length >= 2) { + console.warn("[MSE] MediaSource already has 2 SourceBuffers, cannot add audio"); + throw new Error("MediaSource already has maximum SourceBuffers"); + } + + this.#audioSourceBuffer = mediaSource.addSourceBuffer(mimeType); + if (!this.#audioSourceBuffer) { + throw new Error("Failed to create audio SourceBuffer"); + } + this.#setAudioSourceBuffer(this.#audioSourceBuffer); + } catch (error) { + if (error instanceof DOMException && error.name === "QuotaExceededError") { + const sourceBuffers = Array.from(mediaSource.sourceBuffers); + const readyState = mediaSource.readyState; + + if (readyState !== "open") { + throw new Error(`MediaSource readyState is "${readyState}", cannot add SourceBuffers`); + } + + const existingAudioBuffer = this.#resolveExistingAudioSourceBuffer(sourceBuffers); + if (existingAudioBuffer) { + this.#setAudioSourceBuffer(existingAudioBuffer); + return; + } + + if (sourceBuffers.length === 1 && this.#videoSourceBuffer) { + if (this.#videoSourceBuffer.updating) { + await new Promise((resolve) => { + const timeout = setTimeout(() => resolve(), 200); + if (!this.#videoSourceBuffer) { + clearTimeout(timeout); + resolve(); + return; + } + this.#videoSourceBuffer.addEventListener( + "updateend", + () => { + clearTimeout(timeout); + resolve(); + }, + { once: true }, + ); + }); + } else { + await new Promise((resolve) => setTimeout(resolve, 10)); + } + + const currentSourceBuffers = Array.from(mediaSource.sourceBuffers); + if (currentSourceBuffers.length >= 2) { + const retryAudioBuffer = this.#resolveExistingAudioSourceBuffer(currentSourceBuffers); + if (retryAudioBuffer) { + this.#setAudioSourceBuffer(retryAudioBuffer); + return; + } + } + + try { + if (mediaSource.readyState !== "open") { + throw new Error(`MediaSource readyState is "${mediaSource.readyState}"`); + } + this.#audioSourceBuffer = mediaSource.addSourceBuffer(mimeType); + if (!this.#audioSourceBuffer) { + throw new Error("Failed to create audio SourceBuffer"); + } + this.#setAudioSourceBuffer(this.#audioSourceBuffer); + return; + } catch (retryError) { + console.warn("[MSE] Retry failed, allowing video-only playback", retryError); + return; + } + } + + console.warn("[MSE] QuotaExceededError but couldn't find audio SourceBuffer in MediaSource", { + sourceBufferCount: sourceBuffers.length, + readyState: mediaSource.readyState, + hasVideoSourceBuffer: !!this.#videoSourceBuffer, + hasAudioSourceBuffer: !!this.#audioSourceBuffer, + }); + } + console.error("[MSE] Error adding audio SourceBuffer:", error); + throw error; + } + } + + #setupVideoSourceBuffer(): void { + if (!this.#videoSourceBuffer) return; + + const SEEK_HYSTERESIS = 0.1; + this.#videoSourceBuffer.addEventListener("updateend", () => { + const video = this.#video; + const sourceBuffer = this.#videoSourceBuffer; + if (video && sourceBuffer && sourceBuffer.buffered.length > 0) { + const buffered = sourceBuffer.buffered; + const start = buffered.start(0); + const end = buffered.end(0); + + if ( + video.currentTime + SEEK_HYSTERESIS < start || + video.currentTime >= end - SEEK_HYSTERESIS || + Number.isNaN(video.currentTime) + ) { + video.currentTime = start; + } + + if (video.paused && video.readyState >= HTMLMediaElement.HAVE_METADATA) { + video.play().catch((err) => { + console.warn("[MSE] Autoplay blocked:", err); + }); + } + } + + this.#processVideoQueue(); + }); + + this.#videoSourceBuffer.addEventListener("error", (e) => { + console.error("[MSE] Video SourceBuffer error:", e); + }); + } + + #setupAudioSourceBuffer(): void { + if (!this.#audioSourceBuffer || this.#audioSourceBufferSetup) return; + + this.#audioSourceBuffer.addEventListener("updateend", () => { + this.#processAudioQueue(); + }); + + this.#audioSourceBuffer.addEventListener("error", (e) => { + console.error("[MSE] Audio SourceBuffer error:", e); + }); + + this.#audioSourceBufferSetup = true; + } + + #startFrameCapture(): void { + if (!this.#video) return; + + const captureFrame = () => { + if (!this.#video) return; + + try { + const frame = new VideoFrame(this.#video, { + timestamp: this.#video.currentTime * 1_000_000, + }); + + this.#stats.update((current) => ({ + frameCount: (current?.frameCount ?? 0) + 1, + timestamp: frame.timestamp, + bytesReceived: current?.bytesReceived ?? 0, + })); + + this.frame.update((prev) => { + prev?.close(); + return frame; + }); + + if (this.#video.videoWidth && this.#video.videoHeight) { + this.display.set({ + width: this.#video.videoWidth, + height: this.#video.videoHeight, + }); + } + + if (this.#video.readyState >= HTMLMediaElement.HAVE_CURRENT_DATA) { + this.bufferStatus.set({ state: "filled" }); + if (this.#video.paused && this.#video.readyState >= HTMLMediaElement.HAVE_CURRENT_DATA) { + this.#video.play().catch(() => { + // Ignore autoplay errors + }); + } + } + } catch (error) { + console.error("Error capturing frame:", error); + } + + if (this.#video.requestVideoFrameCallback) { + this.#frameCallbackId = this.#video.requestVideoFrameCallback(captureFrame); + } else { + this.#frameCallbackId = requestAnimationFrame(captureFrame) as unknown as number; + } + }; + + if (this.#video.requestVideoFrameCallback) { + this.#frameCallbackId = this.#video.requestVideoFrameCallback(captureFrame); + } else { + this.#frameCallbackId = requestAnimationFrame(captureFrame) as unknown as number; + } + } + + async appendVideoFragment(fragment: Uint8Array): Promise { + if (!this.#videoSourceBuffer || !this.#mediaSource) { + throw new Error("Video SourceBuffer not initialized"); + } + + if (this.#videoAppendQueue.length >= SourceMSE.MAX_QUEUE_SIZE) { + const discarded = this.#videoAppendQueue.shift(); + console.warn( + `[MSE] Video queue full (${SourceMSE.MAX_QUEUE_SIZE}), discarding oldest fragment (${discarded?.byteLength ?? 0} bytes)`, + ); + } + + const copy = new Uint8Array(fragment); + this.#videoAppendQueue.push(copy); + this.#processVideoQueue(); + } + + async appendAudioFragment(fragment: Uint8Array): Promise { + if (!this.#audioSourceBuffer || !this.#mediaSource) { + return; + } + + if (this.#mediaSource.readyState === "closed") { + return; + } + + if (this.#audioAppendQueue.length >= SourceMSE.MAX_QUEUE_SIZE) { + const discarded = this.#audioAppendQueue.shift(); + console.warn( + `[MSE] Audio queue full (${SourceMSE.MAX_QUEUE_SIZE}), discarding oldest fragment (${discarded?.byteLength ?? 0} bytes)`, + ); + } + + const copy = new Uint8Array(fragment); + this.#audioAppendQueue.push(copy); + this.#processAudioQueue(); + } + + /** + * Decode base64 init segment string to Uint8Array. + */ + #decodeInitSegment(base64: string): Uint8Array { + const binaryString = atob(base64); + const initSegment = new Uint8Array(binaryString.length); + for (let i = 0; i < binaryString.length; i++) { + initSegment[i] = binaryString.charCodeAt(i); + } + return initSegment; + } + + #setAudioSourceBuffer(buffer: SourceBuffer): void { + this.#audioSourceBuffer = buffer; + if (!this.#audioSourceBufferSetup) { + this.#setupAudioSourceBuffer(); + } + } + + #resolveExistingAudioSourceBuffer(sourceBuffers: SourceBuffer[]): SourceBuffer | undefined { + if (this.#audioSourceBuffer && sourceBuffers.includes(this.#audioSourceBuffer)) { + return this.#audioSourceBuffer; + } + + if (sourceBuffers.length < 2) { + return undefined; + } + + if (this.#videoSourceBuffer) { + const otherBuffer = sourceBuffers.find((sb) => sb !== this.#videoSourceBuffer); + if (otherBuffer) { + return otherBuffer; + } + } else { + return sourceBuffers[1]; + } + + throw new Error("MediaSource already has maximum SourceBuffers and cannot identify audio SourceBuffer"); + } + + /** + * Append an init segment to a SourceBuffer and wait for completion. + * Handles both synchronous errors and async errors via events. + */ + async #appendInitSegment( + sourceBuffer: SourceBuffer, + initSegment: Uint8Array, + trackType: "video" | "audio", + ): Promise { + await new Promise((resolve, reject) => { + const onUpdateEnd = () => { + resolve(); + }; + + const onError = (e: Event) => { + const error = e as ErrorEvent; + console.error(`[MSE] ${trackType} SourceBuffer error appending init segment:`, error); + reject(new Error(`${trackType} SourceBuffer error: ${error.message || "unknown error"}`)); + }; + + sourceBuffer.addEventListener("updateend", onUpdateEnd, { once: true }); + sourceBuffer.addEventListener("error", onError, { once: true }); + + try { + sourceBuffer.appendBuffer(initSegment as BufferSource); + } catch (error) { + sourceBuffer.removeEventListener("updateend", onUpdateEnd); + sourceBuffer.removeEventListener("error", onError); + console.error(`[MSE] Error calling appendBuffer on ${trackType} init segment:`, error); + reject(error); + } + }); + } + + #processVideoQueue(): void { + if (!this.#videoSourceBuffer || this.#videoSourceBuffer.updating || this.#videoAppendQueue.length === 0) { + return; + } + + if (this.#mediaSource?.readyState !== "open") { + return; + } + + if (this.#isBufferUpdating()) { + return; + } + + const fragment = this.#videoAppendQueue.shift(); + if (!fragment) return; + + try { + this.#videoSourceBuffer.appendBuffer(fragment as BufferSource); + this.#stats.update((current) => { + const newCount = (current?.frameCount ?? 0) + 1; + return { + frameCount: newCount, + timestamp: current?.timestamp ?? 0, + bytesReceived: (current?.bytesReceived ?? 0) + fragment.byteLength, + }; + }); + } catch (error) { + if (error instanceof DOMException && error.name === "QuotaExceededError") { + console.warn("[MSE] QuotaExceededError - browser will manage buffer automatically"); + this.#videoAppendQueue.unshift(fragment); + } else { + console.error("[MSE] Error appending video fragment:", error); + } + } + } + + #processAudioQueue(): void { + if (!this.#audioSourceBuffer || this.#audioSourceBuffer.updating || this.#audioAppendQueue.length === 0) { + return; + } + + if (this.#mediaSource?.readyState !== "open") { + return; + } + + if (this.#isBufferUpdating()) { + return; + } + + const fragment = this.#audioAppendQueue.shift(); + if (!fragment) return; + + try { + this.#audioSourceBuffer.appendBuffer(fragment as BufferSource); + } catch (error) { + if (error instanceof DOMException && error.name === "QuotaExceededError") { + console.warn("[MSE] QuotaExceededError for audio - browser will manage buffer automatically"); + this.#audioAppendQueue.unshift(fragment); + } else { + console.error("[MSE] Error appending audio fragment:", error); + } + } + } + + async appendFragment(fragment: Uint8Array): Promise { + return this.appendVideoFragment(fragment); + } + + async runTrack( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: RequiredDecoderConfig, + ): Promise { + await this.initializeVideo(config); + + // Wait for audio SourceBuffer initialization to avoid Chrome quota race + for (let i = 0; i < 10; i++) { + if (this.#audioSourceBuffer || (this.#mediaSource && this.#mediaSource.sourceBuffers.length >= 2)) { + break; + } + await new Promise((resolve) => setTimeout(resolve, 100)); + } + + const sub = broadcast.subscribe(name, PRIORITY.video); + console.log(`[MSE] Subscribing to video track: ${name}`); + effect.cleanup(() => sub.close()); + + const consumer = new Frame.Consumer(sub, { + latency: this.latency, + container: "cmaf", + }); + effect.cleanup(() => consumer.close()); + + if (!config.initSegment) { + throw new Error("Init segment is required in catalog for CMAF playback"); + } + + if (!this.#videoSourceBuffer) { + throw new Error("Video SourceBuffer not available"); + } + + const initSegment = this.#decodeInitSegment(config.initSegment); + await this.#appendInitSegment(this.#videoSourceBuffer, initSegment, "video"); + effect.spawn(async () => { + for (;;) { + const frame = await Promise.race([consumer.decode(), effect.cancel]); + if (!frame) { + break; + } + + await this.appendVideoFragment(frame.data); + } + }); + } + + async runAudioTrack( + effect: Effect, + broadcast: Moq.Broadcast, + name: string, + config: Catalog.AudioConfig, + catalog: Catalog.Audio, + enabled?: Getter, + ): Promise { + if (!this.#audioSourceBuffer) { + return; + } + + const isEnabled = enabled ? effect.get(enabled) : true; + if (!isEnabled) { + return; + } + + if (!config.initSegment) { + throw new Error("Init segment is required in catalog for CMAF playback"); + } + + if (!this.#audioInitSegmentAppended) { + const initSegment = this.#decodeInitSegment(config.initSegment); + await this.#appendInitSegment(this.#audioSourceBuffer, initSegment, "audio"); + this.#audioInitSegmentAppended = true; + } + + const sub = broadcast.subscribe(name, catalog.priority); + console.log(`[MSE] Subscribing to audio track: ${name}`); + effect.cleanup(() => sub.close()); + + const consumer = new Frame.Consumer(sub, { + latency: this.latency, + container: "cmaf", + }); + effect.cleanup(() => consumer.close()); + + effect.spawn(async () => { + for (;;) { + const frame = await Promise.race([consumer.decode(), effect.cancel]); + if (!frame) { + break; + } + + if (this.#mediaSource?.readyState === "closed") { + break; + } + + if (this.#mediaSource?.readyState === "open") { + await this.appendAudioFragment(frame.data); + } + } + }); + } + + close(): void { + this.#videoAppendQueue = []; + this.#audioAppendQueue = []; + this.#audioSourceBufferSetup = false; + this.#audioInitSegmentAppended = false; + + const audioSourceBuffer = this.#audioSourceBuffer; + const videoSourceBuffer = this.#videoSourceBuffer; + const mediaSource = this.#mediaSource; + + this.#audioSourceBuffer = undefined; + + this.mediaSource.set(undefined); + + if (this.#frameCallbackId !== undefined) { + if (this.#video?.requestVideoFrameCallback) { + this.#video.cancelVideoFrameCallback(this.#frameCallbackId); + } else { + cancelAnimationFrame(this.#frameCallbackId); + } + } + + this.frame.update((prev) => { + prev?.close(); + return undefined; + }); + + if (videoSourceBuffer && mediaSource) { + try { + if (videoSourceBuffer.updating) { + videoSourceBuffer.abort(); + } + } catch (error) { + console.error("Error closing video SourceBuffer:", error); + } + } + + if (audioSourceBuffer && mediaSource) { + try { + if (audioSourceBuffer.updating) { + audioSourceBuffer.abort(); + } + } catch (error) { + console.error("Error closing audio SourceBuffer:", error); + } + } + + if (this.#mediaSource) { + try { + if (this.#mediaSource.readyState === "open") { + this.#mediaSource.endOfStream(); + } + URL.revokeObjectURL(this.#video?.src || ""); + } catch (error) { + console.error("Error closing MediaSource:", error); + } + } + + if (this.#video) { + this.#video.pause(); + this.#video.src = ""; + this.#video.remove(); + } + + this.#signals.close(); + } + + get stats() { + return this.#stats; + } +} diff --git a/js/hang/src/watch/video/source.ts b/js/hang/src/watch/video/source.ts index 406a600bb..9d608073a 100644 --- a/js/hang/src/watch/video/source.ts +++ b/js/hang/src/watch/video/source.ts @@ -5,6 +5,7 @@ import type * as Catalog from "../../catalog"; import * as Frame from "../../frame"; import { PRIORITY } from "../../publish/priority"; import * as Hex from "../../util/hex"; +import type { SourceMSE } from "../source-mse"; export type SourceProps = { enabled?: boolean | Signal; @@ -27,7 +28,9 @@ export type Target = { // The types in VideoDecoderConfig that cause a hard reload. // ex. codedWidth/Height are optional and can be changed in-band, so we don't want to trigger a reload. // This way we can keep the current subscription active. -type RequiredDecoderConfig = Omit; +// Note: We keep codedWidth/Height as optional for logging, but set them to undefined to avoid reloads. +type RequiredDecoderConfig = Omit & + Partial>; type BufferStatus = { state: "empty" | "filled" }; @@ -97,6 +100,14 @@ export class Source { #signals = new Effect(); + // Expose MediaSource for audio to use + #mseMediaSource = new Signal(undefined); + readonly mseMediaSource = this.#mseMediaSource as Getter; + + // Expose mseSource instance for audio to access coordination methods + #mseSource = new Signal(undefined); + readonly mseSource = this.#mseSource as Getter; + constructor( broadcast: Signal, catalog: Signal, @@ -192,12 +203,78 @@ export class Source { } #runTrack(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { + // Route to MSE for CMAF, WebCodecs for native/raw + if (config.container === "cmaf") { + this.#runMSEPath(effect, broadcast, name, config); + } else { + this.#runWebCodecsPath(effect, broadcast, name, config); + } + } + + #runMSEPath(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { + // Import MSE source dynamically to avoid loading if not needed + effect.spawn(async () => { + const { SourceMSE } = await import("../source-mse.js"); + const mseSource = new SourceMSE(this.latency); + effect.cleanup(() => mseSource.close()); + + // IMPORTANT: Set mseSource immediately so audio can track it + // This must be set synchronously, not in an effect, so the signal updates immediately + this.#mseSource.set(mseSource); + + // Forward signals using effects + this.#signals.effect((eff) => { + const frame = eff.get(mseSource.frame); + eff.set(this.frame, frame); + }); + + this.#signals.effect((eff) => { + const display = eff.get(mseSource.display); + eff.set(this.display, display); + }); + + this.#signals.effect((eff) => { + const status = eff.get(mseSource.bufferStatus); + eff.set(this.bufferStatus, status, { state: "empty" }); + }); + + this.#signals.effect((eff) => { + const status = eff.get(mseSource.syncStatus); + eff.set(this.syncStatus, status, { state: "ready" }); + }); + + this.#signals.effect((eff) => { + const mediaSource = eff.get(mseSource.mediaSource); + eff.set(this.#mseMediaSource, mediaSource); + }); + + this.#signals.effect((eff) => { + const stats = eff.get(mseSource.stats); + eff.set(this.#stats, stats); + }); + // Run MSE track + try { + await mseSource.runTrack(effect, broadcast, name, config); + } catch (error) { + console.error("MSE path error, falling back to WebCodecs:", error); + // Fallback to WebCodecs + this.#mseSource.set(undefined); + this.#runWebCodecsPath(effect, broadcast, name, config); + } + }); + + // Clean up mseSource when the effect closes (track switches) + effect.cleanup(() => { + this.#mseSource.set(undefined); + }); + } + + #runWebCodecsPath(effect: Effect, broadcast: Moq.Broadcast, name: string, config: RequiredDecoderConfig): void { const sub = broadcast.subscribe(name, PRIORITY.video); // TODO use priority from catalog effect.cleanup(() => sub.close()); // Create consumer that reorders groups/frames up to the provided latency. - // Container defaults to "legacy" via Zod schema for backward compatibility - console.log(`[Video Subscriber] Using container format: ${config.container}`); + // Container defaults to "native" via Zod schema for backward compatibility const consumer = new Frame.Consumer(sub, { latency: this.latency, container: config.container, diff --git a/js/lite/src/lite/connection.ts b/js/lite/src/lite/connection.ts index a7ce8c922..c5bb3f07d 100644 --- a/js/lite/src/lite/connection.ts +++ b/js/lite/src/lite/connection.ts @@ -131,7 +131,7 @@ export class Connection implements Established { // TODO use the session info } } finally { - console.warn("session stream closed"); + console.debug("session stream closed"); } } diff --git a/justfile b/justfile index 391f3c9e4..db23ac87e 100644 --- a/justfile +++ b/justfile @@ -151,7 +151,7 @@ pub name url="http://localhost:4443/anon" prefix="" *args: publish --url "{{url}}" --name "{{prefix}}{{name}}" {{args}} fmp4 # Generate and ingest an HLS stream from a video file. -pub-hls name relay="http://localhost:4443/anon": +pub-hls name passthrough='' relay="http://localhost:4443/anon": #!/usr/bin/env bash set -euo pipefail @@ -178,7 +178,7 @@ pub-hls name relay="http://localhost:4443/anon": -c:v:1 libx264 -profile:v:1 high -level:v:1 4.1 -pix_fmt:v:1 yuv420p -tag:v:1 avc1 \ -b:v:1 300k -maxrate:v:1 330k -bufsize:v:1 600k \ -c:a aac -b:a 128k \ - -f hls -hls_time 2 -hls_list_size 12 \ + -f hls -hls_time 2 -hls_list_size 6 \ -hls_flags independent_segments+delete_segments \ -hls_segment_type fmp4 \ -master_pl_name master.m3u8 \ @@ -204,18 +204,58 @@ pub-hls name relay="http://localhost:4443/anon": exit 1 fi - echo ">>> Starting HLS ingest from disk: $OUT_DIR/master.m3u8" + # Wait for individual playlists to be generated (they're referenced in master.m3u8) + # Give ffmpeg a bit more time to generate the variant playlists + echo ">>> Waiting for variant playlists..." + sleep 2 + for i in {1..20}; do + # Check if at least one variant playlist exists + if [ -f "$OUT_DIR/v0/stream.m3u8" ] || [ -f "$OUT_DIR/v720/stream.m3u8" ] || [ -f "$OUT_DIR/v144/stream.m3u8" ] || [ -f "$OUT_DIR/vaudio/stream.m3u8" ]; then + break + fi + sleep 0.5 + done + + # Check if passthrough flag is provided (boolean parameter) + if [ -n "{{passthrough}}" ]; then + echo ">>> Starting HLS ingest from disk with passthrough mode: $OUT_DIR/master.m3u8" + PASSTHROUGH_FLAG="--passthrough" + else + echo ">>> Starting HLS ingest from disk (non-passthrough mode): $OUT_DIR/master.m3u8" + PASSTHROUGH_FLAG="" + fi # Trap to clean up ffmpeg on exit + CLEANUP_CALLED=false cleanup() { + if [ "$CLEANUP_CALLED" = "true" ]; then + return + fi + CLEANUP_CALLED=true echo "Shutting down..." kill $FFMPEG_PID 2>/dev/null || true - exit 0 + # Wait a bit for ffmpeg to finish + sleep 0.5 + # Force kill if still running + kill -9 $FFMPEG_PID 2>/dev/null || true } - trap cleanup SIGINT SIGTERM + trap cleanup SIGINT SIGTERM EXIT # Run hang to ingest from local files - cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" + if [ -n "$PASSTHROUGH_FLAG" ]; then + echo ">>> Running with --passthrough flag" + cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" --passthrough + else + echo ">>> Running without --passthrough flag" + cargo run --bin hang -- publish --url "{{relay}}" --name "{{name}}" hls --playlist "$OUT_DIR/master.m3u8" + fi + EXIT_CODE=$? + + # Cleanup after cargo run completes (success or failure) + cleanup + + # Exit with the same code as cargo run + exit $EXIT_CODE # Publish a video using H.264 Annex B format to the localhost relay server pub-h264 name url="http://localhost:4443/anon" *args: diff --git a/rs/hang-cli/src/publish.rs b/rs/hang-cli/src/publish.rs index 56668d17e..7c22c3c0a 100644 --- a/rs/hang-cli/src/publish.rs +++ b/rs/hang-cli/src/publish.rs @@ -16,6 +16,9 @@ pub enum PublishFormat { /// URL or file path of an HLS playlist to ingest. #[arg(long)] playlist: String, + /// Enable passthrough mode to transport complete CMAF fragments (moof+mdat) without decomposing. + #[arg(long)] + passthrough: bool, }, } @@ -45,12 +48,17 @@ impl Publish { let stream = Decoder::new(broadcast.clone(), format); PublishDecoder::Decoder(Box::new(stream)) } - PublishFormat::Hls { playlist } => { + PublishFormat::Hls { playlist, passthrough } => { + tracing::info!( + passthrough = *passthrough, + "HLS publish preserving original container format." + ); let hls = hang::import::Hls::new( broadcast.clone(), hang::import::HlsConfig { playlist: playlist.clone(), client: None, + passthrough: *passthrough, }, )?; PublishDecoder::Hls(Box::new(hls)) diff --git a/rs/hang/Cargo.toml b/rs/hang/Cargo.toml index ca7a12fd2..8596aceb2 100644 --- a/rs/hang/Cargo.toml +++ b/rs/hang/Cargo.toml @@ -32,7 +32,7 @@ reqwest = { version = "0.12", default-features = false, features = [ scuffle-h265 = "0.2.2" serde = { workspace = true } serde_json = "1" -serde_with = { version = "3", features = ["hex"] } +serde_with = { version = "3", features = ["hex", "base64"] } thiserror = "2" tokio = { workspace = true, features = ["macros", "fs"] } tracing = "0.1" diff --git a/rs/hang/examples/video.rs b/rs/hang/examples/video.rs index e86a47ebb..6cb6df231 100644 --- a/rs/hang/examples/video.rs +++ b/rs/hang/examples/video.rs @@ -69,6 +69,8 @@ fn create_track(broadcast: &mut moq_lite::BroadcastProducer) -> hang::TrackProdu display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: hang::catalog::Container::Native, + init_segment: None, }; // Create a map of video renditions diff --git a/rs/hang/src/catalog/audio/mod.rs b/rs/hang/src/catalog/audio/mod.rs index e1e95a794..e708103b9 100644 --- a/rs/hang/src/catalog/audio/mod.rs +++ b/rs/hang/src/catalog/audio/mod.rs @@ -9,7 +9,9 @@ use std::collections::BTreeMap; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_with::{DisplayFromStr, hex::Hex}; +use serde_with::{DisplayFromStr, base64::Base64, hex::Hex}; + +use crate::catalog::container::Container; /// Information about an audio track in the catalog. /// @@ -60,4 +62,18 @@ pub struct AudioConfig { #[serde(default)] #[serde_as(as = "Option")] pub description: Option, + + /// Container format for frame encoding. + /// Defaults to "native" for backward compatibility. + pub container: Container, + + /// Init segment (ftyp+moov) for CMAF/fMP4 containers. + /// + /// This is the initialization segment needed for MSE playback. + /// Stored as base64-encoded bytes and embedded in the catalog (as suggested + /// in feedback). Init segments should not be sent over data tracks or at the + /// start of each group. + #[serde(default)] + #[serde_as(as = "Option")] + pub init_segment: Option, } diff --git a/rs/hang/src/catalog/container.rs b/rs/hang/src/catalog/container.rs new file mode 100644 index 000000000..cecee5d57 --- /dev/null +++ b/rs/hang/src/catalog/container.rs @@ -0,0 +1,18 @@ +use serde::{Deserialize, Serialize}; + +/// Container format for frame timestamp encoding and frame payload structure. +/// +/// - "native": Uses QUIC VarInt encoding (1-8 bytes, variable length), raw frame payloads +/// - "raw": Uses fixed u64 encoding (8 bytes, big-endian), raw frame payloads +/// - "cmaf": Fragmented MP4 container - frames contain complete moof+mdat fragments +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Default)] +#[serde(rename_all = "camelCase")] +pub enum Container { + #[serde(rename = "native")] + #[default] + Native, + #[serde(rename = "raw")] + Raw, + #[serde(rename = "cmaf")] + Cmaf, +} diff --git a/rs/hang/src/catalog/mod.rs b/rs/hang/src/catalog/mod.rs index ea48eee01..906feb350 100644 --- a/rs/hang/src/catalog/mod.rs +++ b/rs/hang/src/catalog/mod.rs @@ -6,6 +6,7 @@ mod audio; mod chat; +mod container; mod preview; mod root; mod user; @@ -13,6 +14,7 @@ mod video; pub use audio::*; pub use chat::*; +pub use container::*; pub use preview::*; pub use root::*; pub use user::*; diff --git a/rs/hang/src/catalog/root.rs b/rs/hang/src/catalog/root.rs index 6d7ef0c0e..10721d91c 100644 --- a/rs/hang/src/catalog/root.rs +++ b/rs/hang/src/catalog/root.rs @@ -208,6 +208,22 @@ impl Drop for CatalogGuard<'_> { // TODO decide if this should return an error, or be impossible to fail let frame = self.catalog.to_string().expect("invalid catalog"); + + // Log the catalog JSON to verify container field is included + if let Some(video) = &self.catalog.video { + for (name, config) in &video.renditions { + tracing::info!(track = name, container = ?config.container, "publishing catalog with container"); + } + } + if let Some(audio) = &self.catalog.audio { + for (name, config) in &audio.renditions { + tracing::info!(track = name, container = ?config.container, "publishing catalog with container"); + } + } + + // Log the full catalog JSON to debug serialization + tracing::debug!(catalog_json = %frame, "publishing catalog JSON"); + group.write_frame(frame); group.close(); } @@ -272,7 +288,7 @@ impl From for CatalogConsumer { mod test { use std::collections::BTreeMap; - use crate::catalog::{AudioCodec::Opus, AudioConfig, H264, VideoConfig}; + use crate::catalog::{AudioCodec::Opus, AudioConfig, Container, H264, VideoConfig}; use super::*; @@ -286,7 +302,8 @@ mod test { "codedWidth": 1280, "codedHeight": 720, "bitrate": 6000000, - "framerate": 30.0 + "framerate": 30.0, + "container": "native" } }, "priority": 1 @@ -297,7 +314,8 @@ mod test { "codec": "opus", "sampleRate": 48000, "numberOfChannels": 2, - "bitrate": 128000 + "bitrate": 128000, + "container": "native" } }, "priority": 2 @@ -326,6 +344,8 @@ mod test { bitrate: Some(6_000_000), framerate: Some(30.0), optimize_for_latency: None, + container: Container::Native, + init_segment: None, }, ); @@ -338,6 +358,8 @@ mod test { channel_count: 2, bitrate: Some(128_000), description: None, + container: Container::Native, + init_segment: None, }, ); diff --git a/rs/hang/src/catalog/video/mod.rs b/rs/hang/src/catalog/video/mod.rs index 750a2501e..054a722e5 100644 --- a/rs/hang/src/catalog/video/mod.rs +++ b/rs/hang/src/catalog/video/mod.rs @@ -14,7 +14,9 @@ use std::collections::BTreeMap; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use serde_with::{DisplayFromStr, hex::Hex}; +use serde_with::{DisplayFromStr, base64::Base64, hex::Hex}; + +use crate::catalog::container::Container; /// Information about a video track in the catalog. /// @@ -109,4 +111,22 @@ pub struct VideoConfig { /// Default: true #[serde(default)] pub optimize_for_latency: Option, + + /// Container format for frame encoding. + /// Defaults to "native" for backward compatibility. + pub container: Container, + + /// Init segment (ftyp+moov) for CMAF/fMP4 containers. + /// + /// This is the initialization segment needed for MSE playback. + /// Stored as base64-encoded bytes and embedded in the catalog (as suggested + /// in feedback). Init segments should not be sent over data tracks or at the + /// start of each group. + /// + /// Note: A future optimization could build init segments from the description + /// field (e.g., avcC box for H.264) along with other catalog metadata, but + /// for now we store the complete init segment for simplicity and correctness. + #[serde(default)] + #[serde_as(as = "Option")] + pub init_segment: Option, } diff --git a/rs/hang/src/import/aac.rs b/rs/hang/src/import/aac.rs index 065282374..fe79f652e 100644 --- a/rs/hang/src/import/aac.rs +++ b/rs/hang/src/import/aac.rs @@ -107,6 +107,8 @@ impl Aac { channel_count, bitrate: None, description: None, + container: hang::catalog::Container::Native, + init_segment: None, }; tracing::debug!(name = ?track.name, ?config, "starting track"); diff --git a/rs/hang/src/import/avc3.rs b/rs/hang/src/import/avc3.rs index 9171332ed..fbe6376db 100644 --- a/rs/hang/src/import/avc3.rs +++ b/rs/hang/src/import/avc3.rs @@ -62,6 +62,8 @@ impl Avc3 { display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: hang::catalog::Container::Native, + init_segment: None, }; if let Some(old) = &self.config diff --git a/rs/hang/src/import/fmp4.rs b/rs/hang/src/import/fmp4.rs index bfbb889e3..fee3298dc 100644 --- a/rs/hang/src/import/fmp4.rs +++ b/rs/hang/src/import/fmp4.rs @@ -1,4 +1,6 @@ -use crate::catalog::{AAC, AV1, AudioCodec, AudioConfig, CatalogProducer, H264, H265, VP9, VideoCodec, VideoConfig}; +use crate::catalog::{ + AAC, AV1, AudioCodec, AudioConfig, CatalogProducer, Container, H264, H265, VP9, VideoCodec, VideoConfig, +}; use crate::{self as hang, Timestamp}; use anyhow::Context; use bytes::{Buf, Bytes, BytesMut}; @@ -38,12 +40,28 @@ pub struct Fmp4 { // The timestamp of the last keyframe for each track last_keyframe: HashMap, + // Track if we've sent the first frame for each track (needed for passthrough mode) + first_frame_sent: HashMap, + // The moov atom at the start of the file. moov: Option, // The latest moof header moof: Option, moof_size: usize, + + /// When true, transport CMAF fragments directly (passthrough mode) + /// When false, decompose fragments into individual samples (current behavior) + passthrough_mode: bool, + + /// When passthrough_mode is enabled, store raw bytes of moof + moof_bytes: Option, + + /// When passthrough_mode is enabled, store raw bytes of ftyp (file type box) + ftyp_bytes: Option, + + /// When passthrough_mode is enabled, store raw bytes of moov (init segment) + moov_bytes: Option, } impl Fmp4 { @@ -58,15 +76,37 @@ impl Fmp4 { catalog, tracks: HashMap::default(), last_keyframe: HashMap::default(), + first_frame_sent: HashMap::default(), moov: None, moof: None, moof_size: 0, + passthrough_mode: false, + moof_bytes: None, + ftyp_bytes: None, + moov_bytes: None, } } + /// Set passthrough mode for CMAF fragment transport. + /// + /// When enabled, complete fMP4 fragments (moof+mdat) are transported directly + /// instead of being decomposed into individual samples. + pub fn set_passthrough_mode(&mut self, enabled: bool) { + self.passthrough_mode = enabled; + } + pub fn decode>(&mut self, buf: &mut T) -> anyhow::Result<()> { + // If passthrough mode, we need to extract raw bytes before parsing. + let available_bytes = if self.passthrough_mode && buf.has_remaining() { + let chunk = buf.chunk(); + Some(Bytes::copy_from_slice(chunk)) + } else { + None + }; + let mut cursor = std::io::Cursor::new(buf); let mut position = 0; + let mut bytes_offset = 0; while let Some(atom) = mp4_atom::Any::decode_maybe(&mut cursor)? { // Process the parsed atom. @@ -75,9 +115,43 @@ impl Fmp4 { match atom { Any::Ftyp(_) | Any::Styp(_) => { - // Skip + // If passthrough mode, capture raw bytes of ftyp (file type box) + if self.passthrough_mode { + if let Some(ref bytes) = available_bytes { + if bytes_offset + size <= bytes.len() { + self.ftyp_bytes = Some(bytes.slice(bytes_offset..bytes_offset + size)); + } else { + tracing::warn!( + bytes_offset, + size, + available_len = bytes.len(), + "ftyp bytes out of range" + ); + } + } else { + tracing::warn!("passthrough mode but available_bytes is None when processing ftyp"); + } + } + // Skip ftyp/styp atoms in normal processing } Any::Moov(moov) => { + // If passthrough mode, capture raw bytes of moov (init segment) + if self.passthrough_mode { + if let Some(ref bytes) = available_bytes { + if bytes_offset + size <= bytes.len() { + self.moov_bytes = Some(bytes.slice(bytes_offset..bytes_offset + size)); + } else { + tracing::warn!( + bytes_offset, + size, + available_len = bytes.len(), + "moov bytes out of range" + ); + } + } else { + tracing::warn!("passthrough mode but available_bytes is None when processing moov"); + } + } // Create the broadcast. self.init(moov)?; } @@ -89,17 +163,51 @@ impl Fmp4 { self.moof = Some(moof); self.moof_size = size; + + // If passthrough mode, extract and store raw bytes of moof + if let Some(ref bytes) = available_bytes + && bytes_offset + size <= bytes.len() + { + self.moof_bytes = Some(bytes.slice(bytes_offset..bytes_offset + size)); + } } Any::Mdat(mdat) => { - // Extract the samples from the mdat atom. - let header_size = size - mdat.data.len(); - self.extract(mdat, header_size)?; + if self.passthrough_mode { + // Transport complete fragment + let moof = self.moof.take().context("missing moof box")?; + let moof_bytes = self.moof_bytes.take().context("missing moof bytes")?; + + // Extract mdat bytes + let mdat_bytes = if let Some(ref bytes) = available_bytes { + if bytes_offset + size <= bytes.len() { + bytes.slice(bytes_offset..bytes_offset + size) + } else { + anyhow::bail!("invalid buffer position for mdat"); + } + } else { + anyhow::bail!("missing available bytes in passthrough mode"); + }; + + // Combine moof + mdat into complete fragment + let mut fragment_bytes = BytesMut::with_capacity(moof_bytes.len() + mdat_bytes.len()); + fragment_bytes.extend_from_slice(&moof_bytes); + fragment_bytes.extend_from_slice(&mdat_bytes); + let fragment = fragment_bytes.freeze(); + + self.transport_fragment(fragment, moof)?; + } else { + // Extract the samples from the mdat atom (existing behavior) + let header_size = size - mdat.data.len(); + self.extract(mdat, header_size)?; + } } _ => { - // Skip unknown atoms - tracing::warn!(?atom, "skipping") + // Skip unknown atoms (e.g., sidx, which is optional and used for segment indexing) + // These are safe to ignore and don't affect playback } } + + bytes_offset += size; } // Advance the buffer by the amount of data that was processed. @@ -113,60 +221,135 @@ impl Fmp4 { } fn init(&mut self, moov: Moov) -> anyhow::Result<()> { + let passthrough_mode = self.passthrough_mode; let mut catalog = self.catalog.lock(); + // Track which specific tracks were created in this init call + let mut created_video_tracks = Vec::new(); + let mut created_audio_tracks = Vec::new(); + for trak in &moov.trak { let track_id = trak.tkhd.track_id; let handler = &trak.mdia.hdlr.handler; let track = match handler.as_ref() { b"vide" => { - let config = Self::init_video(trak)?; + let config = Self::init_video_static(trak, passthrough_mode)?; let track = moq::Track { name: self.broadcast.track_name("video"), priority: 1, }; - tracing::debug!(name = ?track.name, ?config, "starting track"); - - let video = catalog.insert_video(track.name.clone(), config); + let video = catalog.insert_video(track.name.clone(), config.clone()); video.priority = 1; + // Record this track name + created_video_tracks.push(track.name.clone()); + let track = track.produce(); self.broadcast.insert_track(track.consumer); - track.producer + hang::TrackProducer::new(track.producer, config.container) } b"soun" => { - let config = Self::init_audio(trak)?; + let config = Self::init_audio_static(trak, passthrough_mode)?; let track = moq::Track { name: self.broadcast.track_name("audio"), priority: 2, }; - tracing::debug!(name = ?track.name, ?config, "starting track"); - - let audio = catalog.insert_audio(track.name.clone(), config); + let audio = catalog.insert_audio(track.name.clone(), config.clone()); audio.priority = 2; + // Record this track name + created_audio_tracks.push(track.name.clone()); + let track = track.produce(); self.broadcast.insert_track(track.consumer); - track.producer + hang::TrackProducer::new(track.producer, config.container) } b"sbtl" => anyhow::bail!("subtitle tracks are not supported"), handler => anyhow::bail!("unknown track type: {:?}", handler), }; - self.tracks.insert(track_id, track.into()); + self.tracks.insert(track_id, track); } + // Verify that the moov atom contains all expected tracks BEFORE moving it + let has_video = moov.trak.iter().any(|t| t.mdia.hdlr.handler.as_ref() == b"vide"); + let has_audio = moov.trak.iter().any(|t| t.mdia.hdlr.handler.as_ref() == b"soun"); + self.moov = Some(moov); + // In passthrough mode, store the init segment (ftyp+moov) in the catalog + // instead of sending it over the data tracks. This allows clients to + // reconstruct init segments from the catalog. + // + // Note: Init segments are embedded in the catalog. + // A future optimization could build init segments from the description field + // (e.g., avcC box for H.264) along with other catalog metadata, but for now + // we store the complete init segment for simplicity and correctness. + if passthrough_mode { + if let Some(moov_bytes) = self.moov_bytes.as_ref() { + // Build init segment: ftyp (if available) + moov + let mut init_segment = BytesMut::new(); + if let Some(ref ftyp_bytes) = self.ftyp_bytes { + init_segment.extend_from_slice(ftyp_bytes); + } + init_segment.extend_from_slice(moov_bytes); + let init_segment_bytes = init_segment.freeze(); + + // Verify that the moov atom contains all expected tracks + let expected_video_tracks = catalog.video.as_ref().map(|v| v.renditions.len()).unwrap_or(0); + let expected_audio_tracks = catalog.audio.as_ref().map(|a| a.renditions.len()).unwrap_or(0); + + // Warn if moov doesn't contain expected tracks. + // For HLS, inits are per-track (video-only or audio-only), so skip cross-track warnings. + let video_only = has_video && !has_audio; + let audio_only = has_audio && !has_video; + if expected_video_tracks > 0 && !has_video && !audio_only { + tracing::error!( + "moov atom does not contain video track but video configs exist! This will cause client-side errors." + ); + } + if expected_audio_tracks > 0 && !has_audio && !video_only { + tracing::error!( + "moov atom does not contain audio track but audio configs exist! This will cause client-side errors." + ); + } + + // Store init segment in catalog for the relevant track type + if has_video && let Some(video) = catalog.video.as_mut() { + for track_name in &created_video_tracks { + if let Some(config) = video.renditions.get_mut(track_name) { + config.init_segment = Some(init_segment_bytes.clone()); + } + } + } + + if has_audio && let Some(audio) = catalog.audio.as_mut() { + for track_name in &created_audio_tracks { + if let Some(config) = audio.renditions.get_mut(track_name) { + config.init_segment = Some(init_segment_bytes.clone()); + } + } + } + + // Init has been stored; clear cached moov/ftyp to avoid repeated warnings later. + self.moov_bytes = None; + self.ftyp_bytes = None; + } else { + tracing::warn!( + "passthrough mode enabled but moov_bytes is None - init segment will not be stored in catalog" + ); + } + } + Ok(()) } - fn init_video(trak: &Trak) -> anyhow::Result { + fn init_video_static(trak: &Trak, passthrough_mode: bool) -> anyhow::Result { let stsd = &trak.mdia.minf.stbl.stsd; let codec = match stsd.codecs.len() { @@ -199,10 +382,16 @@ impl Fmp4 { display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } - mp4_atom::Codec::Hev1(hev1) => Self::init_h265(true, &hev1.hvcc, &hev1.visual)?, - mp4_atom::Codec::Hvc1(hvc1) => Self::init_h265(false, &hvc1.hvcc, &hvc1.visual)?, + mp4_atom::Codec::Hev1(hev1) => Self::init_h265_static(true, &hev1.hvcc, &hev1.visual, passthrough_mode)?, + mp4_atom::Codec::Hvc1(hvc1) => Self::init_h265_static(false, &hvc1.hvcc, &hvc1.visual, passthrough_mode)?, mp4_atom::Codec::Vp08(vp08) => VideoConfig { codec: VideoCodec::VP8, description: Default::default(), @@ -214,6 +403,12 @@ impl Fmp4 { display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, }, mp4_atom::Codec::Vp09(vp09) => { // https://github.com/gpac/mp4box.js/blob/325741b592d910297bf609bc7c400fc76101077b/src/box-codecs.js#L238 @@ -240,6 +435,12 @@ impl Fmp4 { optimize_for_latency: None, bitrate: None, framerate: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Av01(av01) => { @@ -272,6 +473,12 @@ impl Fmp4 { optimize_for_latency: None, bitrate: None, framerate: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown), @@ -282,7 +489,12 @@ impl Fmp4 { } // There's two almost identical hvcc atoms in the wild. - fn init_h265(in_band: bool, hvcc: &mp4_atom::Hvcc, visual: &mp4_atom::Visual) -> anyhow::Result { + fn init_h265_static( + in_band: bool, + hvcc: &mp4_atom::Hvcc, + visual: &mp4_atom::Visual, + passthrough_mode: bool, + ) -> anyhow::Result { let mut description = BytesMut::new(); hvcc.encode_body(&mut description)?; @@ -302,14 +514,20 @@ impl Fmp4 { coded_height: Some(visual.height as _), // TODO: populate these fields bitrate: None, + init_segment: None, framerate: None, display_ratio_width: None, display_ratio_height: None, optimize_for_latency: None, + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, }) } - fn init_audio(trak: &Trak) -> anyhow::Result { + fn init_audio_static(trak: &Trak, passthrough_mode: bool) -> anyhow::Result { let stsd = &trak.mdia.minf.stbl.stsd; let codec = match stsd.codecs.len() { @@ -338,6 +556,12 @@ impl Fmp4 { channel_count: mp4a.audio.channel_count as _, bitrate: Some(bitrate.into()), description: None, // TODO? + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Opus(opus) => { @@ -347,6 +571,12 @@ impl Fmp4 { channel_count: opus.audio.channel_count as _, bitrate: None, description: None, // TODO? + container: if passthrough_mode { + Container::Cmaf + } else { + Container::Native + }, + init_segment: None, } } mp4_atom::Codec::Unknown(unknown) => anyhow::bail!("unknown codec: {:?}", unknown), @@ -490,6 +720,106 @@ impl Fmp4 { Ok(()) } + + // Transport a complete CMAF fragment (moof+mdat) directly without decomposing. + fn transport_fragment(&mut self, fragment: Bytes, moof: Moof) -> anyhow::Result<()> { + // Verify that init segment was sent before fragments + if self.moov_bytes.is_some() { + tracing::warn!("transporting fragment but moov_bytes is still set - init segment may not have been sent"); + } + + // Verify fragment starts with moof atom + + // Ensure moov is available (init segment must be processed first) + let moov = self.moov.as_ref().ok_or_else(|| { + anyhow::anyhow!("missing moov box - init segment must be processed before fragments. Make sure ensure_init_segment() is called first.") + })?; + + // Loop over all of the traf boxes in the moof. + for traf in &moof.traf { + let track_id = traf.tfhd.track_id; + let track = self.tracks.get_mut(&track_id).context("unknown track")?; + + // Find the track information in the moov + let trak = moov + .trak + .iter() + .find(|trak| trak.tkhd.track_id == track_id) + .context("unknown track")?; + + let tfdt = traf.tfdt.as_ref().context("missing tfdt box")?; + let dts = tfdt.base_media_decode_time; + let timescale = trak.mdia.mdhd.timescale as u64; + + // Convert timestamp from track timescale to microseconds + let micros = (dts as u128 * 1_000_000 / timescale as u128) as u64; + let timestamp = hang::Timestamp::from_micros(micros)?; + + // Determine keyframe status (reuse logic from extract()) + let is_keyframe = if trak.mdia.hdlr.handler == b"vide".into() { + // For video, check sample flags in trun entries + let mut is_keyframe = false; + if let Some(trun) = traf.trun.first() + && let Some(entry) = trun.entries.first() + { + let tfhd = &traf.tfhd; + let flags = entry.flags.unwrap_or(tfhd.default_sample_flags.unwrap_or_default()); + // https://chromium.googlesource.com/chromium/src/media/+/master/formats/mp4/track_run_iterator.cc#177 + let keyframe_flag = (flags >> 24) & 0x3 == 0x2; // kSampleDependsOnNoOther + let non_sync = (flags >> 16) & 0x1 == 0x1; // kSampleIsNonSyncSample + is_keyframe = keyframe_flag && !non_sync; + + if is_keyframe { + // Force an audio keyframe on video keyframes + for audio in moov.trak.iter().filter(|t| t.mdia.hdlr.handler == b"soun".into()) { + self.last_keyframe.remove(&audio.tkhd.track_id); + } + } + } + is_keyframe + } else { + // For audio, force keyframe every 10 seconds or at video keyframes + match self.last_keyframe.get(&track_id) { + Some(prev) => timestamp - *prev > Timestamp::from_secs(10).unwrap(), + None => true, + } + }; + + if is_keyframe { + self.last_keyframe.insert(track_id, timestamp); + } + + // In passthrough mode, send fragments directly without init segments + // Init segments are stored in the catalog and reconstructed on the client side + if self.passthrough_mode { + // The first frame must be a keyframe to create the initial group + // After that, we can send fragments based on their actual keyframe status + let is_first_frame = !self.first_frame_sent.get(&track_id).copied().unwrap_or(false); + let should_be_keyframe = is_first_frame || is_keyframe; + + if is_first_frame { + self.first_frame_sent.insert(track_id, true); + } + + let frame = hang::Frame { + timestamp, + keyframe: should_be_keyframe, + payload: fragment.clone().into(), + }; + track.write(frame)?; + } else { + // For non-passthrough mode, just write the frame normally + let frame = hang::Frame { + timestamp, + keyframe: is_keyframe, + payload: fragment.clone().into(), + }; + track.write(frame)?; + } + } + + Ok(()) + } } impl Drop for Fmp4 { @@ -497,8 +827,6 @@ impl Drop for Fmp4 { let mut catalog = self.broadcast.catalog.lock(); for track in self.tracks.values() { - tracing::debug!(name = ?track.info.name, "ending track"); - // We're too lazy to keep track of if this track is for audio or video, so we just remove both. catalog.remove_video(&track.info.name); catalog.remove_audio(&track.info.name); diff --git a/rs/hang/src/import/hev1.rs b/rs/hang/src/import/hev1.rs index 1062a2c5c..bc179c318 100644 --- a/rs/hang/src/import/hev1.rs +++ b/rs/hang/src/import/hev1.rs @@ -62,6 +62,8 @@ impl Hev1 { display_ratio_width: vui_data.display_ratio_width, display_ratio_height: vui_data.display_ratio_height, optimize_for_latency: None, + container: hang::catalog::Container::Native, + init_segment: None, }; if let Some(old) = &self.config diff --git a/rs/hang/src/import/hls.rs b/rs/hang/src/import/hls.rs index 40e73025a..f797e85a2 100644 --- a/rs/hang/src/import/hls.rs +++ b/rs/hang/src/import/hls.rs @@ -31,11 +31,20 @@ pub struct HlsConfig { /// An optional HTTP client to use for fetching the playlist and segments. /// If not provided, a default client will be created. pub client: Option, + + /// Enable passthrough mode for CMAF fragment transport. + /// When enabled, complete fMP4 fragments (moof+mdat) are transported directly + /// instead of being decomposed into individual samples. + pub passthrough: bool, } impl HlsConfig { pub fn new(playlist: String) -> Self { - Self { playlist, client: None } + Self { + playlist, + client: None, + passthrough: false, + } } /// Parse the playlist string into a URL. @@ -86,6 +95,8 @@ pub struct Hls { video: Vec, /// Optional audio track shared across variants. audio: Option, + /// Passthrough mode setting for fMP4 importers. + passthrough: bool, } #[derive(Debug, Clone, Copy)] @@ -120,9 +131,11 @@ impl Hls { .build() .unwrap() }); + let passthrough = cfg.passthrough; Ok(Self { broadcast, video_importers: Vec::new(), + passthrough, audio_importer: None, client, base_url, @@ -150,9 +163,10 @@ impl Hls { let outcome = self.step().await?; let delay = self.refresh_delay(outcome.target_duration, outcome.wrote_segments); - debug!( - wrote = outcome.wrote_segments, - delay = ?delay, + info!( + wrote_segments = outcome.wrote_segments, + target_duration = ?outcome.target_duration, + delay_secs = delay.as_secs_f32(), "HLS ingest step complete" ); @@ -165,6 +179,7 @@ impl Hls { self.ensure_tracks().await?; let mut buffered = 0usize; + const MAX_INIT_SEGMENTS: usize = 3; // Only process a few segments during init to avoid getting ahead of live stream // Prime all discovered video variants. // @@ -174,7 +189,7 @@ impl Hls { for (index, mut track) in video_tracks.into_iter().enumerate() { let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; let count = self - .consume_segments(TrackKind::Video(index), &mut track, &playlist) + .consume_segments_limited(TrackKind::Video(index), &mut track, &playlist, MAX_INIT_SEGMENTS) .await?; buffered += count; self.video.push(track); @@ -183,7 +198,9 @@ impl Hls { // Prime the shared audio track, if any. if let Some(mut track) = self.audio.take() { let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; - let count = self.consume_segments(TrackKind::Audio, &mut track, &playlist).await?; + let count = self + .consume_segments_limited(TrackKind::Audio, &mut track, &playlist, MAX_INIT_SEGMENTS) + .await?; buffered += count; self.audio = Some(track); } @@ -302,6 +319,50 @@ impl Hls { Ok(()) } + async fn consume_segments_limited( + &mut self, + kind: TrackKind, + track: &mut TrackState, + playlist: &MediaPlaylist, + max_segments: usize, + ) -> anyhow::Result { + // Calculate segments to process + let next_seq = track.next_sequence.unwrap_or(0); + let playlist_seq = playlist.media_sequence; + let total_segments = playlist.segments.len(); + + let last_playlist_seq = playlist_seq + total_segments as u64; + + let skip = if next_seq > last_playlist_seq { + total_segments + } else if next_seq < playlist_seq { + track.next_sequence = None; + 0 + } else { + (next_seq - playlist_seq) as usize + }; + + let available = total_segments.saturating_sub(skip); + + // Limit how many segments we process + let to_process = available.min(max_segments); + + if to_process > 0 { + let base_seq = playlist_seq + skip as u64; + for (i, segment) in playlist.segments[skip..skip + to_process].iter().enumerate() { + self.push_segment(kind, track, segment, base_seq + i as u64).await?; + } + info!( + ?kind, + processed = to_process, + available = available, + "processed limited segments during init" + ); + } + + Ok(to_process) + } + async fn consume_segments( &mut self, kind: TrackKind, @@ -310,19 +371,63 @@ impl Hls { ) -> anyhow::Result { self.ensure_init_segment(kind, track, playlist).await?; - // Skip segments we've already seen - let skip = track.next_sequence.unwrap_or(0).saturating_sub(playlist.media_sequence) as usize; - let base_seq = playlist.media_sequence + skip as u64; - for (i, segment) in playlist.segments[skip..].iter().enumerate() { - self.push_segment(kind, track, segment, base_seq + i as u64).await?; - } - let consumed = playlist.segments.len() - skip; + // Calculate how many segments to skip (already processed) + let next_seq = track.next_sequence.unwrap_or(0); + let playlist_seq = playlist.media_sequence; + let total_segments = playlist.segments.len(); + + // Calculate the last sequence number in the playlist + let last_playlist_seq = playlist_seq + total_segments as u64; + + // If we've already processed beyond what's in the playlist, wait for new segments + let skip = if next_seq > last_playlist_seq { + // We're ahead of the playlist - wait for ffmpeg to generate more segments + warn!( + ?kind, + next_sequence = next_seq, + playlist_sequence = playlist_seq, + last_playlist_sequence = last_playlist_seq, + "imported ahead of playlist, waiting for new segments" + ); + total_segments // Skip all segments in playlist + } else if next_seq < playlist_seq { + // We're behind - reset and start from the beginning of the playlist + warn!( + ?kind, + next_sequence = next_seq, + playlist_sequence = playlist_seq, + "next_sequence behind playlist, resetting to start of playlist" + ); + track.next_sequence = None; + 0 + } else { + // Normal case: next_seq is within playlist range + (next_seq - playlist_seq) as usize + }; + + let fresh_segments = total_segments.saturating_sub(skip); - if consumed == 0 { + info!( + ?kind, + playlist_sequence = playlist_seq, + next_sequence = next_seq, + skip = skip, + total_segments = total_segments, + fresh_segments = fresh_segments, + "consuming HLS segments" + ); + + if fresh_segments > 0 { + let base_seq = playlist_seq + skip as u64; + for (i, segment) in playlist.segments[skip..].iter().enumerate() { + self.push_segment(kind, track, segment, base_seq + i as u64).await?; + } + info!(?kind, consumed = fresh_segments, "consumed HLS segments"); + } else { debug!(?kind, "no fresh HLS segments available"); } - Ok(consumed) + Ok(fresh_segments) } async fn ensure_init_segment( @@ -369,11 +474,28 @@ impl Hls { let url = resolve_uri(&track.playlist, &segment.uri)?; let mut bytes = self.fetch_bytes(url).await?; + // Ensure the importer is initialized before processing fragments + // Use track.init_ready to avoid borrowing issues + if !track.init_ready { + // Try to ensure init segment is processed + let playlist = self.fetch_media_playlist(track.playlist.clone()).await?; + self.ensure_init_segment(kind, track, &playlist).await?; + } + + // Get importer after ensuring init segment let importer = match kind { TrackKind::Video(index) => self.ensure_video_importer_for(index), TrackKind::Audio => self.ensure_audio_importer(), }; + // Final check after ensuring init segment + if !importer.is_initialized() { + return Err(anyhow::anyhow!( + "importer not initialized for {:?} after ensure_init_segment - init segment processing failed", + kind + )); + } + importer.decode(&mut bytes).context("failed to parse media segment")?; track.next_sequence = Some(sequence + 1); @@ -403,7 +525,8 @@ impl Hls { /// independent while still contributing to the same shared catalog. fn ensure_video_importer_for(&mut self, index: usize) -> &mut Fmp4 { while self.video_importers.len() <= index { - let importer = Fmp4::new(self.broadcast.clone()); + let mut importer = Fmp4::new(self.broadcast.clone()); + importer.set_passthrough_mode(self.passthrough); self.video_importers.push(importer); } @@ -412,8 +535,11 @@ impl Hls { /// Create or retrieve the fMP4 importer for the audio rendition. fn ensure_audio_importer(&mut self) -> &mut Fmp4 { - self.audio_importer - .get_or_insert_with(|| Fmp4::new(self.broadcast.clone())) + self.audio_importer.get_or_insert_with(|| { + let mut imp = Fmp4::new(self.broadcast.clone()); + imp.set_passthrough_mode(self.passthrough); + imp + }) } #[cfg(test)] diff --git a/rs/hang/src/import/opus.rs b/rs/hang/src/import/opus.rs index ed5282ab7..dd744b537 100644 --- a/rs/hang/src/import/opus.rs +++ b/rs/hang/src/import/opus.rs @@ -53,6 +53,8 @@ impl Opus { channel_count, bitrate: None, description: None, + container: hang::catalog::Container::Native, + init_segment: None, }; tracing::debug!(name = ?track.name, ?config, "starting track"); diff --git a/rs/hang/src/model/track.rs b/rs/hang/src/model/track.rs index 3fae96be0..57de1c3bf 100644 --- a/rs/hang/src/model/track.rs +++ b/rs/hang/src/model/track.rs @@ -2,6 +2,7 @@ use std::collections::VecDeque; use std::ops::Deref; use crate::Error; +use crate::catalog::Container; use crate::model::{Frame, GroupConsumer, Timestamp}; use futures::{StreamExt, stream::FuturesUnordered}; @@ -23,15 +24,21 @@ pub struct TrackProducer { pub inner: moq_lite::TrackProducer, group: Option, keyframe: Option, + /// Track if the current group is the init segment group (timestamp 0) + /// We keep this group open so new subscribers can receive the init segment + is_init_segment_group: bool, + container: Container, } impl TrackProducer { /// Create a new TrackProducer wrapping the given moq-lite producer. - pub fn new(inner: moq_lite::TrackProducer) -> Self { + pub fn new(inner: moq_lite::TrackProducer, container: Container) -> Self { Self { inner, group: None, keyframe: None, + is_init_segment_group: false, + container, } } @@ -48,11 +55,22 @@ impl TrackProducer { tracing::trace!(?frame, "write frame"); let mut header = BytesMut::new(); - frame.timestamp.encode(&mut header, lite::Version::Draft02); + if self.container != Container::Cmaf { + frame.timestamp.encode(&mut header, lite::Version::Draft02); + } if frame.keyframe { if let Some(group) = self.group.take() { - group.close(); + // Don't close the init segment group - keep it open for new subscribers + if self.is_init_segment_group { + tracing::debug!("keeping init segment group open for new subscribers"); + // Don't close it, just drop it (the group remains open) + drop(group); + } else { + tracing::info!(timestamp = ?frame.timestamp, "closing group and creating new one for keyframe"); + group.close(); + } + self.is_init_segment_group = false; } // Make sure this frame's timestamp doesn't go backwards relative to the last keyframe. @@ -68,7 +86,18 @@ impl TrackProducer { let mut group = match self.group.take() { Some(group) => group, - None if frame.keyframe => self.inner.append_group(), + None if frame.keyframe => { + let new_group = self.inner.append_group(); + // Log when creating a new group, especially for init segment (timestamp 0) + if frame.timestamp.as_micros() == 0 { + tracing::info!(timestamp = ?frame.timestamp, "creating new group for init segment (timestamp 0)"); + // Mark this as the init segment group so we can keep it open + self.is_init_segment_group = true; + } else { + tracing::info!(timestamp = ?frame.timestamp, "creating new group for keyframe"); + } + new_group + } // The first frame must be a keyframe. None => return Err(Error::MissingKeyframe), }; @@ -76,10 +105,14 @@ impl TrackProducer { let size = header.len() + frame.payload.remaining(); let mut chunked = group.create_frame(size.into()); - chunked.write_chunk(header.freeze()); + if !header.is_empty() { + chunked.write_chunk(header.freeze()); + } + for chunk in frame.payload { chunked.write_chunk(chunk); } + chunked.close(); self.group.replace(group); @@ -98,7 +131,7 @@ impl TrackProducer { impl From for TrackProducer { fn from(inner: moq_lite::TrackProducer) -> Self { - Self::new(inner) + Self::new(inner, Container::Native) } }