From b606106ea5829433216db6a8e88fa6a9af50edc8 Mon Sep 17 00:00:00 2001 From: James Date: Wed, 13 May 2026 22:23:12 +0000 Subject: [PATCH] feat(producer): add services/distributed/assemble.ts --- .../src/services/distributed/assemble.test.ts | 356 ++++++++++++++++++ .../src/services/distributed/assemble.ts | 296 +++++++++++++++ 2 files changed, 652 insertions(+) create mode 100644 packages/producer/src/services/distributed/assemble.test.ts create mode 100644 packages/producer/src/services/distributed/assemble.ts diff --git a/packages/producer/src/services/distributed/assemble.test.ts b/packages/producer/src/services/distributed/assemble.test.ts new file mode 100644 index 000000000..6a55c7727 --- /dev/null +++ b/packages/producer/src/services/distributed/assemble.test.ts @@ -0,0 +1,356 @@ +/** + * Unit tests for `services/distributed/assemble.ts`. + * + * Contracts: + * - mp4/mov: pre-rendered chunks → assembled output passes ffprobe + * (correct frame count, audio present and exactly `frames / fps` + * long, faststart applied). + * - png-sequence: chunk frame directories merge into one continuous + * numbered sequence (chunk N's `frame_NNNNNN.png` files renumber + * into `outputPath/frame_NNNNNN.png` with a global index). + * + * The mp4 fixture pre-renders chunk inputs via raw ffmpeg (test color + * bars + AAC silence) so we don't need a working Chrome to exercise + * assemble. The capture pipeline is covered by `renderChunk.test.ts`. + */ + +import { spawnSync } from "node:child_process"; +import { afterAll, beforeAll, describe, expect, it } from "bun:test"; +import { existsSync, mkdirSync, mkdtempSync, readdirSync, rmSync, writeFileSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import type { ChunkSliceJson } from "../render/stages/freezePlan.js"; +import { assemble } from "./assemble.js"; + +let runRoot: string; +let hasFfmpeg = false; + +beforeAll(() => { + runRoot = mkdtempSync(join(tmpdir(), "hf-assemble-test-")); + hasFfmpeg = spawnSync("ffmpeg", ["-version"]).status === 0; +}); + +afterAll(() => { + rmSync(runRoot, { recursive: true, force: true }); +}); + +/** + * Build a synthetic planDir whose `meta/chunks.json` declares N chunks of + * `framesPerChunk` frames each. Does NOT materialize compiled/, video-frames/, + * audio.aac — assemble only reads `plan.json` + `meta/chunks.json`, and we + * pass chunk paths explicitly. Keeping the dir lean speeds up the test + * loop. + */ +function buildPlanDir( + format: "mp4" | "png-sequence", + chunks: ChunkSliceJson[], + totalFrames: number, + hasAudio: boolean, +): string { + const planDir = mkdtempSync(join(runRoot, `plan-${format}-`)); + mkdirSync(join(planDir, "meta"), { recursive: true }); + writeFileSync( + join(planDir, "plan.json"), + JSON.stringify({ + planHash: "fake", + totalFrames, + hasAudio, + dimensions: { fpsNum: 30, fpsDen: 1, width: 160, height: 120, format }, + }), + "utf-8", + ); + writeFileSync(join(planDir, "meta", "chunks.json"), JSON.stringify(chunks), "utf-8"); + return planDir; +} + +/** + * Encode a tiny mp4 chunk via raw ffmpeg with closed-GOP libx264 args + * matching what `renderChunk` produces. Uses ffmpeg's `testsrc` filter + * so the test doesn't depend on any image assets. Each chunk is + * independently concatenable because GOP === frame count and the first + * frame is forced as a keyframe. + */ +function makeMp4Chunk(outputPath: string, frameCount: number): void { + const args = [ + "-v", + "error", + "-f", + "lavfi", + "-i", + `testsrc=size=160x120:rate=30:duration=${frameCount / 30}`, + "-c:v", + "libx264", + "-preset", + "ultrafast", + "-g", + String(frameCount), + "-keyint_min", + String(frameCount), + "-sc_threshold", + "0", + "-force_key_frames", + `expr:eq(mod(n,${frameCount}),0)`, + "-bf", + "0", + "-pix_fmt", + "yuv420p", + "-vframes", + String(frameCount), + "-y", + outputPath, + ]; + const result = spawnSync("ffmpeg", args, { stdio: "pipe" }); + if (result.status !== 0) { + throw new Error(`ffmpeg testsrc chunk failed: ${result.stderr.toString().slice(-400)}`); + } +} + +/** Generate an AAC audio file of `durationSeconds` of silence. */ +function makeAacAudio(outputPath: string, durationSeconds: number): void { + const result = spawnSync("ffmpeg", [ + "-v", + "error", + "-f", + "lavfi", + "-i", + `anullsrc=channel_layout=stereo:sample_rate=48000`, + "-t", + String(durationSeconds), + "-c:a", + "aac", + "-b:a", + "128k", + "-y", + outputPath, + ]); + if (result.status !== 0) { + throw new Error(`ffmpeg anullsrc failed: ${result.stderr.toString().slice(-400)}`); + } +} + +/** Read ffprobe JSON for one stream of `outputPath`. */ +function probeStream( + outputPath: string, + streamSelector: "v:0" | "a:0", +): Record | null { + const result = spawnSync( + "ffprobe", + [ + "-v", + "error", + "-select_streams", + streamSelector, + "-show_entries", + "stream=duration,nb_frames,nb_read_packets,codec_name,r_frame_rate", + "-count_packets", + "-of", + "json", + outputPath, + ], + { stdio: "pipe" }, + ); + if (result.status !== 0) return null; + const parsed = JSON.parse(result.stdout.toString()) as { + streams?: Array>; + }; + return parsed.streams?.[0] ?? null; +} + +describe("assemble()", () => { + const TIMEOUT_MS = 30_000; + + it( + "concat-copies two mp4 chunks and applies faststart", + async () => { + if (!hasFfmpeg) { + console.warn( + "[assemble.test] skipping mp4 concat test — ffmpeg not available on this host", + ); + return; + } + + const chunks: ChunkSliceJson[] = [ + { index: 0, startFrame: 0, endFrame: 5 }, + { index: 1, startFrame: 5, endFrame: 10 }, + ]; + const planDir = buildPlanDir("mp4", chunks, 10, false); + + const chunkAPath = join(planDir, "chunk-0.mp4"); + const chunkBPath = join(planDir, "chunk-1.mp4"); + makeMp4Chunk(chunkAPath, 5); + makeMp4Chunk(chunkBPath, 5); + + const outputPath = join(planDir, "output.mp4"); + const result = await assemble(planDir, [chunkAPath, chunkBPath], null, outputPath); + + expect(result.outputPath).toBe(outputPath); + expect(existsSync(outputPath)).toBe(true); + expect(result.fileSize).toBeGreaterThan(0); + expect(result.framesEncoded).toBe(10); + + // ── ffprobe: correct frame count + codec ─────────────────────────── + const videoStream = probeStream(outputPath, "v:0"); + expect(videoStream).toBeDefined(); + expect(videoStream?.codec_name).toBe("h264"); + const probedFrames = Number(videoStream?.nb_read_packets ?? videoStream?.nb_frames); + expect(probedFrames).toBe(10); + + // ── faststart applied ────────────────────────────────────────────── + // Bun.file is async; resolve before asserting. + const buf = await Bun.file(outputPath).arrayBuffer(); + const bytes = new Uint8Array(buf); + let cursor = 0; + let moovBeforeMdat = false; + while (cursor + 8 <= bytes.length) { + const size = + (bytes[cursor]! << 24) | + (bytes[cursor + 1]! << 16) | + (bytes[cursor + 2]! << 8) | + bytes[cursor + 3]!; + const fourcc = String.fromCharCode( + bytes[cursor + 4]!, + bytes[cursor + 5]!, + bytes[cursor + 6]!, + bytes[cursor + 7]!, + ); + if (fourcc === "moov") { + moovBeforeMdat = true; + break; + } + if (fourcc === "mdat") break; + if (size <= 0) break; + cursor += size; + } + expect(moovBeforeMdat).toBe(true); + }, + TIMEOUT_MS, + ); + + it( + "muxes audio with frame-count-derived duration when audio.aac is present", + async () => { + if (!hasFfmpeg) return; + + const chunks: ChunkSliceJson[] = [ + { index: 0, startFrame: 0, endFrame: 6 }, + { index: 1, startFrame: 6, endFrame: 12 }, + ]; + const totalFrames = 12; + const fps = 30; + const planDir = buildPlanDir("mp4", chunks, totalFrames, true); + + const chunkAPath = join(planDir, "chunk-0.mp4"); + const chunkBPath = join(planDir, "chunk-1.mp4"); + const audioPath = join(planDir, "audio.aac"); + makeMp4Chunk(chunkAPath, 6); + makeMp4Chunk(chunkBPath, 6); + // Audio is half a second longer than the video — `padOrTrimAudioToVideoFrameCount` + // should trim it down to `totalFrames / fps`. + makeAacAudio(audioPath, totalFrames / fps + 0.5); + + const outputPath = join(planDir, "output-audio.mp4"); + const result = await assemble(planDir, [chunkAPath, chunkBPath], audioPath, outputPath); + + expect(existsSync(outputPath)).toBe(true); + expect(result.framesEncoded).toBe(totalFrames); + + const audioStream = probeStream(outputPath, "a:0"); + expect(audioStream).toBeDefined(); + expect(audioStream?.codec_name).toBe("aac"); + // Audio duration should be within ~25ms of `totalFrames / fps` after + // pad/trim. The 25ms tolerance absorbs AAC frame quantization (1024 + // samples @ 48kHz = ~21ms). + const audioDuration = Number(audioStream?.duration ?? 0); + const expected = totalFrames / fps; + expect(Math.abs(audioDuration - expected)).toBeLessThan(0.05); + }, + TIMEOUT_MS, + ); + + it( + "merges png-sequence chunk directories with continuous global numbering", + () => { + const chunks: ChunkSliceJson[] = [ + { index: 0, startFrame: 0, endFrame: 3 }, + { index: 1, startFrame: 3, endFrame: 7 }, + ]; + const planDir = buildPlanDir("png-sequence", chunks, 7, false); + + // Fabricate two chunk directories with 3 + 4 frames respectively. + // Each chunk uses a 0-indexed naming scheme — `renderChunk` writes + // them this way today. + const chunkADir = join(planDir, "chunk-a"); + const chunkBDir = join(planDir, "chunk-b"); + mkdirSync(chunkADir, { recursive: true }); + mkdirSync(chunkBDir, { recursive: true }); + const minimalPngHeader = Buffer.from([ + // 8-byte PNG signature followed by an IHDR chunk for a 1×1 RGB image. + 0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, + 0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x02, 0x00, 0x00, 0x00, 0x90, + 0x77, 0x53, 0xde, + ]); + for (let i = 0; i < 3; i++) { + // Each frame's bytes differ (suffix index) so the merged sequence's + // ordering assertion has something to bite on. + writeFileSync( + join(chunkADir, `frame_${String(i).padStart(6, "0")}.png`), + Buffer.concat([minimalPngHeader, Buffer.from([0xaa, i])]), + ); + } + for (let i = 0; i < 4; i++) { + writeFileSync( + join(chunkBDir, `frame_${String(i).padStart(6, "0")}.png`), + Buffer.concat([minimalPngHeader, Buffer.from([0xbb, i])]), + ); + } + + const outputPath = join(planDir, "merged"); + // No await — png-sequence assemble is synchronous internally. + const promise = assemble(planDir, [chunkADir, chunkBDir], null, outputPath); + return promise.then((result) => { + expect(result.outputPath).toBe(outputPath); + expect(result.framesEncoded).toBe(7); + const merged = readdirSync(outputPath).sort(); + expect(merged).toEqual([ + "frame_000001.png", + "frame_000002.png", + "frame_000003.png", + "frame_000004.png", + "frame_000005.png", + "frame_000006.png", + "frame_000007.png", + ]); + }); + }, + TIMEOUT_MS, + ); + + it("rejects when chunkPaths.length does not match chunks.json length", async () => { + const chunks: ChunkSliceJson[] = [ + { index: 0, startFrame: 0, endFrame: 5 }, + { index: 1, startFrame: 5, endFrame: 10 }, + ]; + const planDir = buildPlanDir("mp4", chunks, 10, false); + let caught: unknown; + try { + await assemble(planDir, ["/tmp/nonexistent.mp4"], null, join(planDir, "out.mp4")); + } catch (err) { + caught = err; + } + expect(caught).toBeDefined(); + expect((caught as Error).message).toContain("does not match"); + }); + + it("rejects a planDir missing plan.json", async () => { + const emptyDir = join(runRoot, "empty"); + mkdirSync(emptyDir, { recursive: true }); + let caught: unknown; + try { + await assemble(emptyDir, [], null, join(emptyDir, "out.mp4")); + } catch (err) { + caught = err; + } + expect(caught).toBeDefined(); + expect((caught as Error).message).toContain("plan.json"); + }); +}); diff --git a/packages/producer/src/services/distributed/assemble.ts b/packages/producer/src/services/distributed/assemble.ts new file mode 100644 index 000000000..728f122cf --- /dev/null +++ b/packages/producer/src/services/distributed/assemble.ts @@ -0,0 +1,296 @@ +/** + * Activity C of the distributed render pipeline. + * + * `assemble(planDir, chunkPaths, audioPath, outputPath)` stitches per-chunk + * outputs into the final deliverable. For mp4/mov this is `ffmpeg -f concat + * -c copy` (free of re-encode loss because every chunk's first frame is an + * IDR keyframe — the chunk encoder sets `lockGopForChunkConcat` to + * enforce this). For png-sequence chunks (each chunk is a directory of + * frames) this is a straight directory merge with global re-numbering. + * + * Mux + faststart for mp4/mov go through the engine's `muxVideoWithAudio` + * + `applyFaststart` helpers — same path the in-process renderer uses; we + * just feed concat output rather than streaming-encoder output. Audio + * length is pad-or-trimmed to `frameCount / fps` via + * `padOrTrimAudioToVideoFrameCount` so the mux step doesn't introduce + * sub-millisecond drift at the end of long renders. + * + * Pure function over local paths. No networking. The caller is responsible + * for moving `outputPath` to its orchestration-level storage. + */ + +import { + cpSync, + existsSync, + mkdirSync, + readFileSync, + readdirSync, + rmSync, + statSync, + writeFileSync, +} from "node:fs"; +import { dirname, join } from "node:path"; +import { applyFaststart, muxVideoWithAudio, runFfmpeg } from "@hyperframes/engine"; +import { defaultLogger, type ProducerLogger } from "../../logger.js"; +import { padOrTrimAudioToVideoFrameCount } from "../render/audioPadTrim.js"; +import type { ChunkSliceJson } from "../render/stages/freezePlan.js"; + +/** + * Result of {@link assemble}. `fileSize` reflects the final file on disk + * (mp4/mov) or the cumulative byte total of the frame directory + * (png-sequence). + */ +export interface AssembleResult { + outputPath: string; + durationMs: number; + framesEncoded: number; + fileSize: number; +} + +/** Shape of the planDir's top-level `plan.json` — only the fields `assemble` needs. */ +interface PlanJsonForAssemble { + planHash: string; + totalFrames: number; + hasAudio: boolean; + dimensions: { + fpsNum: number; + fpsDen: number; + width: number; + height: number; + format: "mp4" | "mov" | "png-sequence" | "webm"; + }; +} + +/** + * Assemble the chunk outputs into a single deliverable. + * + * @param planDir — absolute path to the planDir produced by `plan()`. + * @param chunkPaths — ordered chunk outputs, length === `chunks.json` length. + * For mp4/mov each entry is a path to an encoded chunk file; for + * png-sequence each entry is a path to a directory of frames. + * @param audioPath — `/audio.aac` for mux'd formats. Pass `null` + * when the composition has no audio (or `assemble` is being called for a + * format whose audio is muxed elsewhere). `assemble` always normalizes + * audio length against the assembled video's frame count when + * `audioPath` is non-null. + * @param outputPath — final on-disk output (file for mp4/mov; directory + * for png-sequence — created if missing). + */ +export async function assemble( + planDir: string, + chunkPaths: readonly string[], + audioPath: string | null, + outputPath: string, + options?: { logger?: ProducerLogger; abortSignal?: AbortSignal }, +): Promise { + const start = Date.now(); + const log = options?.logger ?? defaultLogger; + const abortSignal = options?.abortSignal; + + // ── 1. Validate planDir manifest matches chunkPaths shape ────────────── + const planJsonPath = join(planDir, "plan.json"); + const chunksJsonPath = join(planDir, "meta", "chunks.json"); + if (!existsSync(planJsonPath)) { + throw new Error(`[assemble] planDir missing plan.json: ${planJsonPath}`); + } + if (!existsSync(chunksJsonPath)) { + throw new Error(`[assemble] planDir missing meta/chunks.json: ${chunksJsonPath}`); + } + const plan = JSON.parse(readFileSync(planJsonPath, "utf-8")) as PlanJsonForAssemble; + const chunks = JSON.parse(readFileSync(chunksJsonPath, "utf-8")) as ChunkSliceJson[]; + if (chunkPaths.length !== chunks.length) { + throw new Error( + `[assemble] chunkPaths length (${chunkPaths.length}) does not match ` + + `chunks.json length (${chunks.length}). Adapters must pass one path ` + + `per chunk, ordered by index.`, + ); + } + for (const path of chunkPaths) { + if (!existsSync(path)) { + throw new Error(`[assemble] chunk path does not exist: ${path}`); + } + } + + if (plan.dimensions.format === "png-sequence") { + // ── 2a. png-sequence: merge frame directories with global re-numbering + return mergePngFrameDirs(chunkPaths, outputPath, plan.totalFrames, audioPath, start); + } + + // ── 2b. mp4 / mov: concat-copy then mux + faststart ──────────────────── + if (!existsSync(dirname(outputPath))) { + mkdirSync(dirname(outputPath), { recursive: true }); + } + const workDir = `${outputPath}.assemble-work`; + if (existsSync(workDir)) rmSync(workDir, { recursive: true, force: true }); + mkdirSync(workDir, { recursive: true }); + + try { + // Concat list file — one `file ''` per chunk, in order. ffmpeg's + // concat demuxer escapes single quotes via `'\''`; we replicate that + // here so chunk paths containing quotes don't break the parser. + const concatListPath = join(workDir, "concat-list.txt"); + const concatBody = chunkPaths.map((path) => `file '${path.replace(/'/g, "'\\''")}'`).join("\n"); + writeFileSync(concatListPath, `${concatBody}\n`, "utf-8"); + + const concatOutputPath = join(workDir, `concat.${plan.dimensions.format}`); + const concatArgs = [ + "-f", + "concat", + "-safe", + "0", + "-i", + concatListPath, + "-c", + "copy", + "-y", + concatOutputPath, + ]; + const concatResult = await runFfmpeg(concatArgs, { signal: abortSignal }); + if (!concatResult.success) { + throw new Error( + `[assemble] ffmpeg concat-copy failed (exit ${concatResult.exitCode}): ` + + `${concatResult.stderr.slice(-400)}`, + ); + } + + // ── 3. Audio: pad-or-trim then mux ──────────────────────────────────── + let audioForMux: string | null = null; + if (audioPath !== null && existsSync(audioPath)) { + const paddedAudioPath = join(workDir, "audio-padded.aac"); + const padTrimResult = await padOrTrimAudioToVideoFrameCount({ + videoPath: concatOutputPath, + audioPath, + outputPath: paddedAudioPath, + }); + if (!padTrimResult.success) { + throw new Error(`[assemble] audio pad/trim failed: ${padTrimResult.error}`); + } + audioForMux = paddedAudioPath; + log.info("[assemble] audio normalized for mux", { + operation: padTrimResult.operation, + targetDurationSeconds: padTrimResult.targetDurationSeconds, + sourceDurationSeconds: padTrimResult.sourceDurationSeconds, + }); + } + + // mux + faststart paths mirror `runAssembleStage` for in-process renders + // (`render/stages/assembleStage.ts`). We can't call that stage directly + // because it operates on a `RenderJob` and emits `updateJobStatus` + // payloads — the distributed activity has no job to thread through. + const muxOutputPath = + audioForMux !== null ? join(workDir, `mux.${plan.dimensions.format}`) : concatOutputPath; + if (audioForMux !== null) { + const muxResult = await muxVideoWithAudio( + concatOutputPath, + audioForMux, + muxOutputPath, + abortSignal, + ); + if (!muxResult.success) { + throw new Error(`[assemble] audio mux failed: ${muxResult.error}`); + } + } + + // applyFaststart is a no-op for `.mov` (it copies the input to output); + // we still call it so the success path produces `outputPath` regardless. + const faststartResult = await applyFaststart(muxOutputPath, outputPath, abortSignal); + if (!faststartResult.success) { + throw new Error(`[assemble] faststart failed: ${faststartResult.error}`); + } + } finally { + try { + rmSync(workDir, { recursive: true, force: true }); + } catch (err) { + log.warn("[assemble] failed to remove work dir", { + workDir, + error: err instanceof Error ? err.message : String(err), + }); + } + } + + const fileSize = existsSync(outputPath) ? statSync(outputPath).size : 0; + return { + outputPath, + durationMs: Date.now() - start, + framesEncoded: plan.totalFrames, + fileSize, + }; +} + +/** + * Merge per-chunk PNG frame directories into a single directory with + * globally-incrementing frame numbers. Each chunk's `frame_NNNNNN.png` files + * (which `renderChunk` writes normalized to zero per chunk) are re-numbered + * into the merged output so consumers see one continuous numbered sequence. + * + * Audio is intentionally NOT muxed here — png-sequence has no container. + * If `audioPath` is non-null we copy it alongside as `audio.aac` so callers + * who need to re-mux later (After Effects, Nuke, ffmpeg image2 + audio) can + * find it. + */ +function mergePngFrameDirs( + chunkPaths: readonly string[], + outputPath: string, + totalFrames: number, + audioPath: string | null, + startTimeMs: number, +): AssembleResult { + if (existsSync(outputPath)) rmSync(outputPath, { recursive: true, force: true }); + mkdirSync(outputPath, { recursive: true }); + + let globalIdx = 0; + for (const chunkDir of chunkPaths) { + if (!statSync(chunkDir).isDirectory()) { + throw new Error( + `[assemble] png-sequence chunk must be a directory: ${chunkDir} (got a file)`, + ); + } + const frames = readdirSync(chunkDir) + .filter((name) => name.endsWith(".png")) + .sort(); + if (frames.length === 0) { + throw new Error(`[assemble] png-sequence chunk has no frames: ${chunkDir}`); + } + for (const frame of frames) { + const dst = join(outputPath, `frame_${String(globalIdx + 1).padStart(6, "0")}.png`); + cpSync(join(chunkDir, frame), dst); + globalIdx += 1; + } + } + + if (globalIdx !== totalFrames) { + // Don't throw — surface as a warning. Some compositions report total + // frame count via the duration math (`ceil(duration * fps)`) but the + // actual captured frame count can differ by ±1 across hosts in edge + // cases. The merged sequence is still complete; consumers should rely + // on the on-disk count. + console.warn( + `[assemble] png-sequence frame count mismatch: merged ${globalIdx} frames vs ` + + `plan.totalFrames=${totalFrames}. Using on-disk count.`, + ); + } + + // Pad-or-trim is encoder-side (audio length normalization for muxed + // containers); png-sequence has no encoder, so we copy the audio + // verbatim. The sidecar matches the in-process png-sequence convention. + if (audioPath !== null && existsSync(audioPath)) { + const sidecar = join(outputPath, "audio.aac"); + cpSync(audioPath, sidecar); + } + + let fileSize = 0; + for (const name of readdirSync(outputPath)) { + try { + fileSize += statSync(join(outputPath, name)).size; + } catch { + // ignore + } + } + + return { + outputPath, + durationMs: Date.now() - startTimeMs, + framesEncoded: globalIdx, + fileSize, + }; +}