From 56f7329e31995ad822a8c099aca540c111122216 Mon Sep 17 00:00:00 2001 From: Vance Ingalls Date: Tue, 12 May 2026 21:39:28 +0000 Subject: [PATCH] perf(producer): pipeline capture and shader-blend per-frame on the hybrid path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit hf#732 PR 5 of 5. Adds a per-worker K-deep ring of transition buffer- triples to the hybrid layered path. Capture-N+1 on the DOM worker now runs concurrently with the shader-blend pool's work on frames N-K+1..N instead of being serialized behind each blend. Mechanism: * Each worker carries a ring of K buffer triples (bufferA / bufferB / output), default K=4. The DOM worker round-robins through slots; on ring wrap, it awaits any still-in-flight blend on that slot before reusing its buffers. * The shader-blend dispatch is no longer awaited inline. It returns the pool's promise (or the inline-fallback promise), which we store in `ringInFlight[slot]`. The blend, the buffer-reattach, and the ordered encoder write all run inside that promise. * The encoder reorder buffer (already present in PR 4) fences final output order, so out-of-order blend completion is fine. Why K=4 by default: The optimal K is `blend_per_frame / capture_per_frame`. For 854x480 rgb48le with the more complex shaders this ratio is ~910ms / ~175ms ≈ 5. K=4 balances perf vs. memory: * K=1 (PR 4 behavior): pool sees <=1 task per worker → ~135s wall on the hf#677 fixture (no improvement over the un-decoupled baseline). * K=2: pool sees 2-4 concurrent tasks → ~135s. * K=4: pool saturates around max-busy ≈ pool size during peak transition clusters → ~100s wall on the same fixture, ~10-20% over PR 4 alone. * K=10: diminishing returns; pool already saturated, extra slots just spend memory. Memory budget: 6 workers x 4 slots x 3 buffers x 854x480x6 bytes ≈ 180MB peak; safely within the SDR render budget. Override at runtime via `HF_TRANSITION_RING_DEPTH` if a workload's blend/capture ratio is very different (simpler shaders that blend in ~100ms can drop K to 1-2 with no perf loss). Failure-mode preservation: * If `poolRef` is null (PR 3's pool failed to spawn) the dispatch closure falls back to the inline blend just like PR 4, only now the inline call also goes through the ring slot — slots wrap cheaply because awaiting a settled inline promise is free. * Rejection on any in-flight blend slot is caught onto a separate handle so unhandled-rejection doesn't fire; it surfaces on the next await of that slot (slot reuse) OR on the end-of-task drain. * The end-of-task drain awaits every remaining in-flight slot before returning, so worker termination on success guarantees all blends have hit the encoder. Pool teardown semantics from PR 4 are unchanged. PR 5 of 5 in the hf#732 decomposition stack; stacked on top of PR 4 (hybrid layered path). -- Vai Co-Authored-By: Vai --- .../render/stages/captureHdrHybridLoop.ts | 129 +++++++++++++----- 1 file changed, 97 insertions(+), 32 deletions(-) diff --git a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts index f88aa6158..ba8251832 100644 --- a/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts +++ b/packages/producer/src/services/render/stages/captureHdrHybridLoop.ts @@ -147,15 +147,37 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise } const workerCanvases: Buffer[] = sessions.map(() => Buffer.alloc(bufSize)); - const workerTransitionBuffers: Array = sessions.map(() => - hasTransitions - ? { - bufferA: Buffer.alloc(bufSize), - bufferB: Buffer.alloc(bufSize), - output: Buffer.alloc(bufSize), - } - : null, + // hf#732 PR 5: K-deep ring of transition buffer-triples per worker. The + // ring lets capture-N+1 proceed on the DOM worker while the shader-blend + // pool is still working on frames N-K+1..N. Without the ring (PR 4), each + // worker awaited its own blend before the next capture, capping the pool + // at <=1 task per worker. With K=4, the pool sees up to min(N_workers * K, + // poolSize) concurrent blends, which empirically pushes shader-render + // wall time another ~10-20% past PR 4 alone. + // + // The ideal K is `blend_per_frame / capture_per_frame`. For 854x480 + // rgb48le with the more complex shaders this is ~910ms / ~175ms ≈ 5. + // K=4 strikes a perf vs. memory balance. Override via + // `HF_TRANSITION_RING_DEPTH` if a workload's blend/capture ratio is very + // different (simpler shaders that blend in ~100ms tolerate K=1-2 without + // perf loss). + const DEFAULT_TRANSITION_RING_DEPTH = 4; + const TRANSITION_RING_DEPTH = Math.max( + 1, + Number(process.env.HF_TRANSITION_RING_DEPTH ?? String(DEFAULT_TRANSITION_RING_DEPTH)), ); + const workerTransitionRings: Array = sessions.map(() => { + if (!hasTransitions) return null; + const ring: LayeredTransitionBuffers[] = []; + for (let k = 0; k < TRANSITION_RING_DEPTH; k++) { + ring.push({ + bufferA: Buffer.alloc(bufSize), + bufferB: Buffer.alloc(bufSize), + output: Buffer.alloc(bufSize), + }); + } + return ring; + }); const workerRanges = distributeLayeredHybridFrameRanges(totalFrames, activeWorkerCount); let framesWritten = 0; const reorderBuffer = createFrameReorderBuffer(0, totalFrames); @@ -185,15 +207,32 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise const session = sessions[w]; const canvas = workerCanvases[w]; const range = workerRanges[w]; - const buffers = workerTransitionBuffers[w]; + const ring = workerTransitionRings[w]; if (!session || !canvas || !range) return; + // Per-ring-slot in-flight promise. When a slot is mid-blend, its + // promise is non-null; before reusing the slot for a new capture we + // await it so the buffer triple is free + the encoder has seen the + // earlier frame (writeEncoded gates ordering via the reorder buffer). + const ringInFlight: Array | null> = ring ? ring.map(() => null) : []; + let nextRingIdx = 0; for (let i = range.start; i < range.end; i++) { assertNotAborted(); const time = (i * job.config.fps.den) / job.config.fps.num; const activeTransition = transitionFramesSet.has(i) ? transitionRanges.find((t) => i >= t.startFrame && i <= t.endFrame) : undefined; - if (activeTransition && buffers) { + if (activeTransition && ring) { + // Pick the next ring slot. If it's still in flight from an earlier + // capture, await it to drain before reusing its buffer triple. + const slot = nextRingIdx; + nextRingIdx = (nextRingIdx + 1) % TRANSITION_RING_DEPTH; + const prev = ringInFlight[slot]; + if (prev) await prev; + const buffers = ring[slot]; + if (!buffers) continue; + // CAPTURE on the DOM worker (this thread). Fills bufferA/bufferB + // synchronously w.r.t. this loop — DOM work can't be pipelined + // because the per-worker browser session is single-threaded. await captureTransitionFrameOnWorker({ session, frameIdx: i, @@ -216,28 +255,48 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise ? 1 : (i - activeTransition.startFrame) / (activeTransition.endFrame - activeTransition.startFrame); - if (poolRef) { - const blendStart = Date.now(); - const result = await poolRef.run({ - shader: activeTransition.shader, - bufferA: buffers.bufferA, - bufferB: buffers.bufferB, - output: buffers.output, - width, - height, - progress, - }); - buffers.bufferA = result.bufferA; - buffers.bufferB = result.bufferB; - buffers.output = result.output; - addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); - } else { - const transitionFn: TransitionFn = TRANSITIONS[activeTransition.shader] ?? crossfade; - const blendStart = Date.now(); - transitionFn(buffers.bufferA, buffers.bufferB, buffers.output, width, height, progress); - addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); - } - await writeEncoded(i, buffers.output); + // BLEND + ENCODE without awaiting. The promise drains back into + // `ringInFlight[slot]`; the next iteration that picks `slot` + // awaits it. The encoder reorder buffer fences ordering so out- + // of-order blend completion is fine. + const frameIdx = i; + const dispatch: Promise = (async () => { + if (poolRef) { + const blendStart = Date.now(); + const result = await poolRef.run({ + shader: activeTransition.shader, + bufferA: buffers.bufferA, + bufferB: buffers.bufferB, + output: buffers.output, + width, + height, + progress, + }); + buffers.bufferA = result.bufferA; + buffers.bufferB = result.bufferB; + buffers.output = result.output; + addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); + } else { + const transitionFn: TransitionFn = TRANSITIONS[activeTransition.shader] ?? crossfade; + const blendStart = Date.now(); + transitionFn( + buffers.bufferA, + buffers.bufferB, + buffers.output, + width, + height, + progress, + ); + addHdrTiming(hdrPerf, "transitionCompositeMs", blendStart); + } + await writeEncoded(frameIdx, buffers.output); + })(); + // Catch on a separate handle so an unhandled-rejection can't fire + // if no one awaits this slot before the worker exits. The error + // is re-thrown on the next await (slot reuse OR end-of-task drain). + ringInFlight[slot] = dispatch.catch((err: unknown) => { + throw err instanceof Error ? err : new Error(String(err)); + }); } else { const beforeCaptureHook = session.onBeforeCapture; let timingStart = Date.now(); @@ -268,6 +327,12 @@ export async function runHybridLayeredFrameLoop(input: HybridLoopInput): Promise await writeEncoded(i, canvas); } } + // Drain any pipelined blends still in flight on this worker before + // returning. If any rejected, the rejection bubbles here so + // `Promise.all` over `workerTaskOf` sees the failure. + for (const pending of ringInFlight) { + if (pending) await pending; + } }; await Promise.all(sessions.map((_, w) => workerTaskOf(w))); await reorderBuffer.waitForAllDone();