Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
356 changes: 356 additions & 0 deletions packages/producer/src/services/distributed/assemble.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,356 @@
/**
* Unit tests for `services/distributed/assemble.ts`.
*
* Contracts:
* - mp4/mov: pre-rendered chunks → assembled output passes ffprobe
* (correct frame count, audio present and exactly `frames / fps`
* long, faststart applied).
* - png-sequence: chunk frame directories merge into one continuous
* numbered sequence (chunk N's `frame_NNNNNN.png` files renumber
* into `outputPath/frame_NNNNNN.png` with a global index).
*
* The mp4 fixture pre-renders chunk inputs via raw ffmpeg (test color
* bars + AAC silence) so we don't need a working Chrome to exercise
* assemble. The capture pipeline is covered by `renderChunk.test.ts`.
*/

import { spawnSync } from "node:child_process";
import { afterAll, beforeAll, describe, expect, it } from "bun:test";
import { existsSync, mkdirSync, mkdtempSync, readdirSync, rmSync, writeFileSync } from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";
import type { ChunkSliceJson } from "../render/stages/freezePlan.js";
import { assemble } from "./assemble.js";

let runRoot: string;
let hasFfmpeg = false;

beforeAll(() => {
runRoot = mkdtempSync(join(tmpdir(), "hf-assemble-test-"));
hasFfmpeg = spawnSync("ffmpeg", ["-version"]).status === 0;
});

afterAll(() => {
rmSync(runRoot, { recursive: true, force: true });
});

/**
* Build a synthetic planDir whose `meta/chunks.json` declares N chunks of
* `framesPerChunk` frames each. Does NOT materialize compiled/, video-frames/,
* audio.aac — assemble only reads `plan.json` + `meta/chunks.json`, and we
* pass chunk paths explicitly. Keeping the dir lean speeds up the test
* loop.
*/
function buildPlanDir(
format: "mp4" | "png-sequence",
chunks: ChunkSliceJson[],
totalFrames: number,
hasAudio: boolean,
): string {
const planDir = mkdtempSync(join(runRoot, `plan-${format}-`));
mkdirSync(join(planDir, "meta"), { recursive: true });
writeFileSync(
join(planDir, "plan.json"),
JSON.stringify({
planHash: "fake",
totalFrames,
hasAudio,
dimensions: { fpsNum: 30, fpsDen: 1, width: 160, height: 120, format },
}),
"utf-8",
);
writeFileSync(join(planDir, "meta", "chunks.json"), JSON.stringify(chunks), "utf-8");
return planDir;
}

/**
* Encode a tiny mp4 chunk via raw ffmpeg with closed-GOP libx264 args
* matching what `renderChunk` produces. Uses ffmpeg's `testsrc` filter
* so the test doesn't depend on any image assets. Each chunk is
* independently concatenable because GOP === frame count and the first
* frame is forced as a keyframe.
*/
function makeMp4Chunk(outputPath: string, frameCount: number): void {
const args = [
"-v",
"error",
"-f",
"lavfi",
"-i",
`testsrc=size=160x120:rate=30:duration=${frameCount / 30}`,
"-c:v",
"libx264",
"-preset",
"ultrafast",
"-g",
String(frameCount),
"-keyint_min",
String(frameCount),
"-sc_threshold",
"0",
"-force_key_frames",
`expr:eq(mod(n,${frameCount}),0)`,
"-bf",
"0",
"-pix_fmt",
"yuv420p",
"-vframes",
String(frameCount),
"-y",
outputPath,
];
const result = spawnSync("ffmpeg", args, { stdio: "pipe" });
if (result.status !== 0) {
throw new Error(`ffmpeg testsrc chunk failed: ${result.stderr.toString().slice(-400)}`);
}
}

/** Generate an AAC audio file of `durationSeconds` of silence. */
function makeAacAudio(outputPath: string, durationSeconds: number): void {
const result = spawnSync("ffmpeg", [
"-v",
"error",
"-f",
"lavfi",
"-i",
`anullsrc=channel_layout=stereo:sample_rate=48000`,
"-t",
String(durationSeconds),
"-c:a",
"aac",
"-b:a",
"128k",
"-y",
outputPath,
]);
if (result.status !== 0) {
throw new Error(`ffmpeg anullsrc failed: ${result.stderr.toString().slice(-400)}`);
}
}

/** Read ffprobe JSON for one stream of `outputPath`. */
function probeStream(
outputPath: string,
streamSelector: "v:0" | "a:0",
): Record<string, unknown> | null {
const result = spawnSync(
"ffprobe",
[
"-v",
"error",
"-select_streams",
streamSelector,
"-show_entries",
"stream=duration,nb_frames,nb_read_packets,codec_name,r_frame_rate",
"-count_packets",
"-of",
"json",
outputPath,
],
{ stdio: "pipe" },
);
if (result.status !== 0) return null;
const parsed = JSON.parse(result.stdout.toString()) as {
streams?: Array<Record<string, unknown>>;
};
return parsed.streams?.[0] ?? null;
}

describe("assemble()", () => {
const TIMEOUT_MS = 30_000;

it(
"concat-copies two mp4 chunks and applies faststart",
async () => {
if (!hasFfmpeg) {
console.warn(
"[assemble.test] skipping mp4 concat test — ffmpeg not available on this host",
);
return;
}

const chunks: ChunkSliceJson[] = [
{ index: 0, startFrame: 0, endFrame: 5 },
{ index: 1, startFrame: 5, endFrame: 10 },
];
const planDir = buildPlanDir("mp4", chunks, 10, false);

const chunkAPath = join(planDir, "chunk-0.mp4");
const chunkBPath = join(planDir, "chunk-1.mp4");
makeMp4Chunk(chunkAPath, 5);
makeMp4Chunk(chunkBPath, 5);

const outputPath = join(planDir, "output.mp4");
const result = await assemble(planDir, [chunkAPath, chunkBPath], null, outputPath);

expect(result.outputPath).toBe(outputPath);
expect(existsSync(outputPath)).toBe(true);
expect(result.fileSize).toBeGreaterThan(0);
expect(result.framesEncoded).toBe(10);

// ── ffprobe: correct frame count + codec ───────────────────────────
const videoStream = probeStream(outputPath, "v:0");
expect(videoStream).toBeDefined();
expect(videoStream?.codec_name).toBe("h264");
const probedFrames = Number(videoStream?.nb_read_packets ?? videoStream?.nb_frames);
expect(probedFrames).toBe(10);

// ── faststart applied ──────────────────────────────────────────────
// Bun.file is async; resolve before asserting.
const buf = await Bun.file(outputPath).arrayBuffer();
const bytes = new Uint8Array(buf);
let cursor = 0;
let moovBeforeMdat = false;
while (cursor + 8 <= bytes.length) {
const size =
(bytes[cursor]! << 24) |
(bytes[cursor + 1]! << 16) |
(bytes[cursor + 2]! << 8) |
bytes[cursor + 3]!;
const fourcc = String.fromCharCode(
bytes[cursor + 4]!,
bytes[cursor + 5]!,
bytes[cursor + 6]!,
bytes[cursor + 7]!,
);
if (fourcc === "moov") {
moovBeforeMdat = true;
break;
}
if (fourcc === "mdat") break;
if (size <= 0) break;
cursor += size;
}
expect(moovBeforeMdat).toBe(true);
},
TIMEOUT_MS,
);

it(
"muxes audio with frame-count-derived duration when audio.aac is present",
async () => {
if (!hasFfmpeg) return;

const chunks: ChunkSliceJson[] = [
{ index: 0, startFrame: 0, endFrame: 6 },
{ index: 1, startFrame: 6, endFrame: 12 },
];
const totalFrames = 12;
const fps = 30;
const planDir = buildPlanDir("mp4", chunks, totalFrames, true);

const chunkAPath = join(planDir, "chunk-0.mp4");
const chunkBPath = join(planDir, "chunk-1.mp4");
const audioPath = join(planDir, "audio.aac");
makeMp4Chunk(chunkAPath, 6);
makeMp4Chunk(chunkBPath, 6);
// Audio is half a second longer than the video — `padOrTrimAudioToVideoFrameCount`
// should trim it down to `totalFrames / fps`.
makeAacAudio(audioPath, totalFrames / fps + 0.5);

const outputPath = join(planDir, "output-audio.mp4");
const result = await assemble(planDir, [chunkAPath, chunkBPath], audioPath, outputPath);

expect(existsSync(outputPath)).toBe(true);
expect(result.framesEncoded).toBe(totalFrames);

const audioStream = probeStream(outputPath, "a:0");
expect(audioStream).toBeDefined();
expect(audioStream?.codec_name).toBe("aac");
// Audio duration should be within ~25ms of `totalFrames / fps` after
// pad/trim. The 25ms tolerance absorbs AAC frame quantization (1024
// samples @ 48kHz = ~21ms).
const audioDuration = Number(audioStream?.duration ?? 0);
const expected = totalFrames / fps;
expect(Math.abs(audioDuration - expected)).toBeLessThan(0.05);
},
TIMEOUT_MS,
);

it(
"merges png-sequence chunk directories with continuous global numbering",
() => {
const chunks: ChunkSliceJson[] = [
{ index: 0, startFrame: 0, endFrame: 3 },
{ index: 1, startFrame: 3, endFrame: 7 },
];
const planDir = buildPlanDir("png-sequence", chunks, 7, false);

// Fabricate two chunk directories with 3 + 4 frames respectively.
// Each chunk uses a 0-indexed naming scheme — `renderChunk` writes
// them this way today.
const chunkADir = join(planDir, "chunk-a");
const chunkBDir = join(planDir, "chunk-b");
mkdirSync(chunkADir, { recursive: true });
mkdirSync(chunkBDir, { recursive: true });
const minimalPngHeader = Buffer.from([
// 8-byte PNG signature followed by an IHDR chunk for a 1×1 RGB image.
0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44,
0x52, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x02, 0x00, 0x00, 0x00, 0x90,
0x77, 0x53, 0xde,
]);
for (let i = 0; i < 3; i++) {
// Each frame's bytes differ (suffix index) so the merged sequence's
// ordering assertion has something to bite on.
writeFileSync(
join(chunkADir, `frame_${String(i).padStart(6, "0")}.png`),
Buffer.concat([minimalPngHeader, Buffer.from([0xaa, i])]),
);
}
for (let i = 0; i < 4; i++) {
writeFileSync(
join(chunkBDir, `frame_${String(i).padStart(6, "0")}.png`),
Buffer.concat([minimalPngHeader, Buffer.from([0xbb, i])]),
);
}

const outputPath = join(planDir, "merged");
// No await — png-sequence assemble is synchronous internally.
const promise = assemble(planDir, [chunkADir, chunkBDir], null, outputPath);
return promise.then((result) => {
expect(result.outputPath).toBe(outputPath);
expect(result.framesEncoded).toBe(7);
const merged = readdirSync(outputPath).sort();
expect(merged).toEqual([
"frame_000001.png",
"frame_000002.png",
"frame_000003.png",
"frame_000004.png",
"frame_000005.png",
"frame_000006.png",
"frame_000007.png",
]);
});
},
TIMEOUT_MS,
);

it("rejects when chunkPaths.length does not match chunks.json length", async () => {
const chunks: ChunkSliceJson[] = [
{ index: 0, startFrame: 0, endFrame: 5 },
{ index: 1, startFrame: 5, endFrame: 10 },
];
const planDir = buildPlanDir("mp4", chunks, 10, false);
let caught: unknown;
try {
await assemble(planDir, ["/tmp/nonexistent.mp4"], null, join(planDir, "out.mp4"));
} catch (err) {
caught = err;
}
expect(caught).toBeDefined();
expect((caught as Error).message).toContain("does not match");
});

it("rejects a planDir missing plan.json", async () => {
const emptyDir = join(runRoot, "empty");
mkdirSync(emptyDir, { recursive: true });
let caught: unknown;
try {
await assemble(emptyDir, [], null, join(emptyDir, "out.mp4"));
} catch (err) {
caught = err;
}
expect(caught).toBeDefined();
expect((caught as Error).message).toContain("plan.json");
});
});
Loading
Loading