From c867527ef97367382dcb9575820b9c98e1e4ad9c Mon Sep 17 00:00:00 2001 From: Sleroq Date: Wed, 4 Mar 2026 00:43:53 +0500 Subject: [PATCH] Implement automatic reconnection logic --- web/src/components/broadcast/Broadcast.tsx | 292 +++++++++++------- web/src/components/player/Player.tsx | 85 ++++- .../player/functions/peerconnection.tsx | 267 +++++++++------- web/src/hooks/useReconnectController.ts | 202 ++++++++++++ 4 files changed, 604 insertions(+), 242 deletions(-) create mode 100644 web/src/hooks/useReconnectController.ts diff --git a/web/src/components/broadcast/Broadcast.tsx b/web/src/components/broadcast/Broadcast.tsx index a3179e69..c4f6351e 100644 --- a/web/src/components/broadcast/Broadcast.tsx +++ b/web/src/components/broadcast/Broadcast.tsx @@ -9,6 +9,7 @@ import ProfileSettings from './ProfileSettings'; import Player from '../player/Player'; import { LocaleContext } from '../../providers/LocaleProvider'; import toBase64Utf8 from '../../utilities/base64'; +import { useReconnectController } from '../../hooks/useReconnectController'; const mediaOptions = { audio: true, @@ -39,9 +40,21 @@ function BrowserBroadcaster() { const peerConnectionRef = useRef(null); const localMediaStreamRef = useRef(null) const eventSourceRef = useRef(null) + const whipResourceUrlRef = useRef(null) + const setupInProgressRef = useRef(false) const videoRef = useRef(null) const hasSignalRef = useRef(false); const badSignalCountRef = useRef(10); + const shouldAutoReconnectRef = useRef(false) + const { + scheduleReconnect, + reset: resetReconnect, + cancel: cancelReconnect, + } = useReconnectController({ + baseDelayMs: 500, + maxDelayMs: 8_000, + maxAttempts: 8, + }) const endStream = () => navigate('/') const requestMedia = (source: MediaSource) => { @@ -64,89 +77,89 @@ function BrowserBroadcaster() { .forEach((streamTrack: MediaStreamTrack) => streamTrack.stop()) }, []) - const getSenderByKind = useCallback((peerConnection: RTCPeerConnection, kind: "audio" | "video") => { - return peerConnection.getTransceivers().find(transceiver => transceiver.sender.track?.kind === kind)?.sender ?? - peerConnection.getTransceivers().find(transceiver => transceiver.receiver.track.kind === kind)?.sender ?? - null + const closeEventSource = useCallback(() => { + eventSourceRef.current?.close() + eventSourceRef.current = null }, []) + const deleteWhipSession = useCallback(async () => { + const currentWhipResource = whipResourceUrlRef.current + if (!currentWhipResource) { + return + } + + whipResourceUrlRef.current = null + + await fetch(currentWhipResource, { + method: 'DELETE' + }).catch((err) => { + console.error("WHIP.DeleteSession", err) + }) + }, []) + + const closePeerConnectionAndSession = useCallback(async () => { + closeEventSource() + peerConnectionRef.current?.close() + peerConnectionRef.current = null + await deleteWhipSession() + }, [closeEventSource, deleteWhipSession]) + + const isFatalWhipStatus = useCallback((statusCode: number) => { + return statusCode === 400 || statusCode === 401 || statusCode === 403 || statusCode === 404 + }, []) + + const triggerReconnect = useCallback((setupPublisherSession: () => Promise) => { + if (!shouldAutoReconnectRef.current) { + return + } + + scheduleReconnect(() => { + void setupPublisherSession() + }) + }, [scheduleReconnect]) + useEffect(() => { return () => { - eventSourceRef.current?.close() + cancelReconnect() + shouldAutoReconnectRef.current = false + void closePeerConnectionAndSession() stopLocalMediaStream(localMediaStreamRef.current) localMediaStreamRef.current = null - peerConnectionRef.current?.close() - peerConnectionRef.current = null } - }, [stopLocalMediaStream]) + }, [cancelReconnect, closePeerConnectionAndSession, stopLocalMediaStream]) useEffect(() => { if (useDisplayMedia === "None") { + shouldAutoReconnectRef.current = false + cancelReconnect() return; } - let cancelled = false + let cancelled = false + shouldAutoReconnectRef.current = true - const mediaPromise = useDisplayMedia == "Screen" ? - navigator.mediaDevices.getDisplayMedia(mediaOptions) : - navigator.mediaDevices.getUserMedia(mediaOptions) - - mediaPromise.then(async mediaStream => { - const nextLocalMediaStream = mediaStream + const setupPublisherSession = async () => { + if (setupInProgressRef.current || cancelled) { + return + } - if (cancelled) { - stopLocalMediaStream(nextLocalMediaStream) - return; + const mediaStream = localMediaStreamRef.current + if (!mediaStream) { + return } + setupInProgressRef.current = true + setPeerConnectionDisconnected(() => false) + setConnectFailed(() => false) + const videoTrack = mediaStream.getVideoTracks()[0] ?? null const audioTrack = mediaStream.getAudioTracks()[0] ?? null - const existingPeerConnection = peerConnectionRef.current - if (existingPeerConnection) { - const videoSender = getSenderByKind(existingPeerConnection, "video") - const audioSender = getSenderByKind(existingPeerConnection, "audio") - - await Promise.all([ - videoSender?.replaceTrack(videoTrack) ?? Promise.resolve(), - audioSender?.replaceTrack(audioTrack) ?? Promise.resolve(), - ]) - - if ( - cancelled || - peerConnectionRef.current !== existingPeerConnection - ) { - stopLocalMediaStream(nextLocalMediaStream) - return; - } + await closePeerConnectionAndSession() - videoRef.current!.srcObject = mediaStream - const previousLocalMediaStream = localMediaStreamRef.current - localMediaStreamRef.current = nextLocalMediaStream - stopLocalMediaStream(previousLocalMediaStream) - return - } - - const peerConnection = new RTCPeerConnection(); + const peerConnection = new RTCPeerConnection() peerConnectionRef.current = peerConnection - if ( - cancelled || - peerConnectionRef.current !== peerConnection - ) { - if (peerConnectionRef.current === peerConnection) { - peerConnectionRef.current = null - } - peerConnection.close() - stopLocalMediaStream(nextLocalMediaStream) - return - } - - videoRef.current!.srcObject = mediaStream - const previousLocalMediaStream = localMediaStreamRef.current - localMediaStreamRef.current = nextLocalMediaStream - stopLocalMediaStream(previousLocalMediaStream) - peerConnection.addTransceiver(audioTrack ? audioTrack : "audio", { direction: 'sendonly' }) const isFirefox = navigator.userAgent.toLowerCase().includes('firefox') @@ -154,17 +167,9 @@ function BrowserBroadcaster() { peerConnection.addTransceiver(videoTrack ? videoTrack : "video", { direction: 'sendonly', sendEncodings: isFirefox ? undefined : [ - { - rid: encodingPrefix + 'High', - }, - { - rid: encodingPrefix + 'Mid', - scaleResolutionDownBy: 2.0 - }, - { - rid: encodingPrefix + 'Low', - scaleResolutionDownBy: 4.0 - } + { rid: encodingPrefix + 'High' }, + { rid: encodingPrefix + 'Mid', scaleResolutionDownBy: 2.0 }, + { rid: encodingPrefix + 'Low', scaleResolutionDownBy: 4.0 }, ], }) @@ -173,63 +178,114 @@ function BrowserBroadcaster() { setPublishSuccess(() => true) setMediaAccessError(() => null) setPeerConnectionDisconnected(() => false) - } else if (peerConnection.iceConnectionState === 'disconnected' || peerConnection.iceConnectionState === 'failed') { + resetReconnect() + return + } + + if (peerConnection.iceConnectionState === 'disconnected' || peerConnection.iceConnectionState === 'failed') { setPublishSuccess(() => false) setPeerConnectionDisconnected(() => true) + triggerReconnect(setupPublisherSession) } } - peerConnection - .createOffer() - .then(offer => { - peerConnection.setLocalDescription(offer) - .catch((err) => console.error("SetLocalDescription", err)); - - fetch(`/api/whip`, { - method: 'POST', - body: offer.sdp, - headers: { - Authorization: `Bearer ${toBase64Utf8(streamKey)}`, - 'Content-Type': 'application/sdp' - } - }).then(r => { - - if (r.status !== 201) { - setConnectFailed(() => true) - console.error("WHIP Endpoint did not return 201") - } - const parsedLinkHeader = parseLinkHeader(r.headers.get('Link')) - - if (parsedLinkHeader === null || parsedLinkHeader === undefined) { - throw new DOMException("Missing link header"); - } - - eventSourceRef.current?.close() - const evtSource = new EventSource(`${parsedLinkHeader['urn:ietf:params:whep:ext:core:server-sent-events'].url}`) - eventSourceRef.current = evtSource - - evtSource.onerror = () => evtSource.close(); - - // Receive current status of the stream - // evtSource.addEventListener("status", (event: MessageEvent) => setCurrentStreamStatus(JSON.parse(event.data))) - - return r.text() - }).then(answer => { - peerConnection.setRemoteDescription({ - sdp: answer, - type: 'answer' - }).catch((err) => console.error("SetRemoteDescription", err)) - }) + try { + const offer = await peerConnection.createOffer() + await peerConnection.setLocalDescription(offer) + + const response = await fetch(`/api/whip`, { + method: 'POST', + body: offer.sdp, + headers: { + Authorization: `Bearer ${toBase64Utf8(streamKey)}`, + 'Content-Type': 'application/sdp' + } }) - }, (reason: ErrorMessageEnum) => { - setMediaAccessError(() => reason) - setUseDisplayMedia("None"); - }) + + if (response.status !== 201) { + setConnectFailed(() => true) + setPublishSuccess(() => false) + if (isFatalWhipStatus(response.status)) { + shouldAutoReconnectRef.current = false + cancelReconnect() + return + } + + throw new DOMException("WHIP Endpoint did not return 201") + } + + whipResourceUrlRef.current = response.headers.get('Location') + + const parsedLinkHeader = parseLinkHeader(response.headers.get('Link')) + if (parsedLinkHeader === null || parsedLinkHeader === undefined) { + throw new DOMException("Missing link header") + } + + closeEventSource() + const evtSource = new EventSource(`${parsedLinkHeader['urn:ietf:params:whep:ext:core:server-sent-events'].url}`) + eventSourceRef.current = evtSource + + evtSource.onerror = () => { + closeEventSource() + setPublishSuccess(() => false) + setPeerConnectionDisconnected(() => true) + triggerReconnect(setupPublisherSession) + } + + const answer = await response.text() + await peerConnection.setRemoteDescription({ + sdp: answer, + type: 'answer' + }) + } catch (err) { + console.error("Broadcast.SetupPublisherSession", err) + setPublishSuccess(() => false) + triggerReconnect(setupPublisherSession) + } finally { + setupInProgressRef.current = false + } + } + + const requestAndStartSession = async () => { + const mediaPromise = useDisplayMedia == "Screen" + ? navigator.mediaDevices.getDisplayMedia(mediaOptions) + : navigator.mediaDevices.getUserMedia(mediaOptions) + + try { + const mediaStream = await mediaPromise + if (cancelled) { + stopLocalMediaStream(mediaStream) + return + } + + videoRef.current!.srcObject = mediaStream + const previousLocalMediaStream = localMediaStreamRef.current + localMediaStreamRef.current = mediaStream + stopLocalMediaStream(previousLocalMediaStream) + + await setupPublisherSession() + } catch (reason) { + const mediaError = reason as { name?: string } + if (mediaError.name === 'NotAllowedError') { + setMediaAccessError(() => ErrorMessageEnum.NotAllowedError) + } else if (mediaError.name === 'NotFoundError') { + setMediaAccessError(() => ErrorMessageEnum.NotFoundError) + } else { + setMediaAccessError(() => ErrorMessageEnum.NoMediaDevices) + } + + shouldAutoReconnectRef.current = false + cancelReconnect() + setUseDisplayMedia("None") + } + } + + void requestAndStartSession() return () => { cancelled = true } - }, [getSenderByKind, mediaRequestCount, stopLocalMediaStream, streamKey, useDisplayMedia]) + }, [cancelReconnect, closeEventSource, closePeerConnectionAndSession, isFatalWhipStatus, mediaRequestCount, resetReconnect, stopLocalMediaStream, streamKey, triggerReconnect, useDisplayMedia]) useEffect(() => { hasSignalRef.current = hasSignal; diff --git a/web/src/components/player/Player.tsx b/web/src/components/player/Player.tsx index ad7e18b9..54a9d63f 100644 --- a/web/src/components/player/Player.tsx +++ b/web/src/components/player/Player.tsx @@ -5,12 +5,13 @@ import VideoLayerSelectorComponent from "./components/VideoLayerSelectorComponen import AudioLayerSelectorComponent from "./components/AudioLayerSelectorComponent"; import CurrentViewersComponent from "./components/CurrentViewersComponent"; import { StreamStatus } from '../../providers/StatusProvider'; -import { CurrentLayersMessage, PeerConnectionSetup, SetupPeerConnectionProps } from './functions/peerconnection'; +import { CurrentLayersMessage, PeerConnectionSetup, SetupPeerConnectionFailureType, SetupPeerConnectionProps, SetupPeerConnectionStateChange } from './functions/peerconnection'; import { ChatAdapter } from '../../hooks/useChatSession'; import { ArrowsPointingOutIcon, Square2StackIcon, XMarkIcon } from '@heroicons/react/20/solid'; import { ChatBubbleLeftRightIcon } from '@heroicons/react/24/outline'; import VolumeComponent from './components/VolumeComponent'; import { StatusMessageComponent } from './components/StatusMessageComponent'; +import { useReconnectController } from '../../hooks/useReconnectController'; interface PlayerProps { streamKey: string; @@ -56,11 +57,23 @@ const Player = (props: PlayerProps) => { const clickDelay = 250; const videoRef = useRef(null); + const currentPeerConnectionRef = useRef(null) + const setupInProgressRef = useRef(false) + const fatalSetupErrorRef = useRef(false) const layerEndpointRef = useRef(''); const videoOverlayVisibleTimeoutRef = useRef(undefined); const lastClickTimeRef = useRef(0); const clickTimeoutRef = useRef(undefined); const streamVideoPlayerId = streamKey + "_videoPlayer"; + const { + scheduleReconnect, + reset: resetReconnect, + cancel: cancelReconnect, + } = useReconnectController({ + baseDelayMs: 500, + maxDelayMs: 8_000, + maxAttempts: 8, + }) const setVideoRef = useCallback((element: HTMLVideoElement | null) => { videoRef.current = element setVideoElement(element) @@ -93,7 +106,12 @@ const Player = (props: PlayerProps) => { setStreamState("Loading") }, - onError: () => setStreamState("Error"), + onError: (_, failureType) => { + if (failureType === SetupPeerConnectionFailureType.FATAL) { + fatalSetupErrorRef.current = true + setStreamState("Error") + } + }, onChatAdapterChange: (adapter) => onChatAdapterChange?.(streamKey, adapter), }), [onChatAdapterChange, onStreamStatusChange, streamKey]) @@ -170,37 +188,79 @@ const Player = (props: PlayerProps) => { player?.addEventListener('mouseenter', handleMouseEnter) player?.addEventListener('mouseleave', handleMouseLeave) - let currentPeerConnection: RTCPeerConnection | null = null - const beforeUnloadHandler = () => currentPeerConnection?.close() + const closeCurrentPeerConnection = () => { + currentPeerConnectionRef.current?.close() + currentPeerConnectionRef.current = null + } + + const beforeUnloadHandler = () => closeCurrentPeerConnection() window.addEventListener("beforeunload", beforeUnloadHandler) const setupPeerConnection = () => { + if (setupInProgressRef.current) { + return + } + + setupInProgressRef.current = true + fatalSetupErrorRef.current = false + setStreamState(() => "Loading") + const setupProps: SetupPeerConnectionProps = { ...peerConnectionConfig, - onStreamRestart: setupPeerConnection, + onStateChange: (state) => { + if (state === SetupPeerConnectionStateChange.OFFLINE) { + scheduleReconnect(() => { + setupPeerConnection() + }) + } + }, + onStreamRestart: () => { + resetReconnect() + scheduleReconnect(() => { + setupPeerConnection() + }, { immediate: true }) + }, } + closeCurrentPeerConnection() + PeerConnectionSetup(setupProps) .then((peerConnection) => { - currentPeerConnection = peerConnection + currentPeerConnectionRef.current = peerConnection + setupInProgressRef.current = false + resetReconnect() + }) + .catch((err) => { + setupInProgressRef.current = false + console.log("PeerConnectionConfig.Error", err) + + if (fatalSetupErrorRef.current) { + cancelReconnect() + return + } + + scheduleReconnect(() => { + setupPeerConnection() + }) }) - .catch((err) => console.log("PeerConnectionConfig.Error", err)) } setupPeerConnection() return () => { onChatAdapterChange?.(streamKey, undefined) + cancelReconnect() + setupInProgressRef.current = false player?.removeEventListener('mouseup', handleMouseUp) player?.removeEventListener('mouseenter', handleMouseEnter) player?.removeEventListener('mouseleave', handleMouseLeave) player?.removeEventListener('mousemove', handleMouseMove) window.removeEventListener("beforeunload", beforeUnloadHandler) - currentPeerConnection?.close() + closeCurrentPeerConnection() clearTimeout(videoOverlayVisibleTimeoutRef.current) } - }, [onChatAdapterChange, onStreamStatusChange, peerConnectionConfig, resetTimer, streamKey, streamVideoPlayerId]) + }, [cancelReconnect, onChatAdapterChange, onStreamStatusChange, peerConnectionConfig, resetReconnect, resetTimer, scheduleReconnect, streamKey, streamVideoPlayerId]) return (
@@ -316,8 +376,11 @@ const Player = (props: PlayerProps) => { muted playsInline className="rounded-md w-full h-full relative bg-gray-950" - onPlaying={() => setStreamState("Playing")} - onLoadStart={() => setStreamState("Loading")} + onPlaying={() => { + setStreamState(() => "Playing") + resetReconnect() + }} + onLoadStart={() => setStreamState(() => "Loading")} onLoadedData={(event) => { console.log("VideoPlayer.onLoadedMetadata", event) event.currentTarget.play() diff --git a/web/src/components/player/functions/peerconnection.tsx b/web/src/components/player/functions/peerconnection.tsx index 17cea9a3..099568f3 100644 --- a/web/src/components/player/functions/peerconnection.tsx +++ b/web/src/components/player/functions/peerconnection.tsx @@ -5,68 +5,80 @@ import { ChatAdapter } from "../../../hooks/useChatSession"; import { ChatDataChannelAdapter, DATA_CHANNEL_LABEL } from "./chatDataChannel"; export interface CurrentLayersMessage { - id: string, - audioLayerCurrent: string - audioTimestamp: number - audioPacketsWritten: number - audioSequenceNumber: number - - videoLayerCurrent: string - videoTimestamp: number - videoPacketsWritten: number - videoSequenceNumber: number + id: string; + audioLayerCurrent: string; + audioTimestamp: number; + audioPacketsWritten: number; + audioSequenceNumber: number; + + videoLayerCurrent: string; + videoTimestamp: number; + videoPacketsWritten: number; + videoSequenceNumber: number; } interface LayerEncoding { - encodingId: string + encodingId: string; } interface LayersMessageTrack { - layers: LayerEncoding[] + layers: LayerEncoding[]; } interface LayersMessagePayload { - [mediaId: string]: LayersMessageTrack | undefined + [mediaId: string]: LayersMessageTrack | undefined; } enum SetupPeerConnectionError { - INVALID_WHEP_RESPONSE + INVALID_WHEP_RESPONSE, } -enum SetupPeerConnectionStateChange { + +export enum SetupPeerConnectionFailureType { + RETRYABLE, + FATAL, +} + +export enum SetupPeerConnectionStateChange { ONLINE, - OFFLINE + OFFLINE, } export interface SetupPeerConnectionProps { - streamKey: string, - videoRef: RefObject, - layerEndpointRef: RefObject, - - onError: (error: SetupPeerConnectionError) => void, - onStreamStatus: (status: StreamStatus) => void, - onLayerStatus: (layers: CurrentLayersMessage) => void, - onAudioLayerChange: (layers: string[]) => void, - onVideoLayerChange: (layers: string[]) => void, - onLayerEndpointChange?: (endpoint: string) => void, - onStateChange: (state: SetupPeerConnectionStateChange) => void, - onStreamRestart: () => void, - onChatAdapterChange?: (adapter: ChatAdapter | undefined) => void, + streamKey: string; + videoRef: RefObject; + layerEndpointRef: RefObject; + + onError: ( + error: SetupPeerConnectionError, + failureType: SetupPeerConnectionFailureType, + ) => void; + onStreamStatus: (status: StreamStatus) => void; + onLayerStatus: (layers: CurrentLayersMessage) => void; + onAudioLayerChange: (layers: string[]) => void; + onVideoLayerChange: (layers: string[]) => void; + onLayerEndpointChange?: (endpoint: string) => void; + onStateChange: (state: SetupPeerConnectionStateChange) => void; + onStreamRestart: () => void; + onChatAdapterChange?: (adapter: ChatAdapter | undefined) => void; } const stopVideoTrack = (videoElement: HTMLVideoElement | null) => { const currentStream = videoElement?.srcObject; if (currentStream instanceof MediaStream) { - currentStream.getTracks().forEach(track => track.stop()); + currentStream.getTracks().forEach((track) => track.stop()); } -} +}; + const clearVideoElement = (videoElement: HTMLVideoElement | null) => { - if(videoElement){ - videoElement.muted = true - videoElement.srcObject = null + if (videoElement) { + videoElement.muted = true; + videoElement.srcObject = null; } -} +}; -export async function PeerConnectionSetup(props: SetupPeerConnectionProps): Promise { +export async function PeerConnectionSetup( + props: SetupPeerConnectionProps, +): Promise { const { streamKey, videoRef, @@ -79,126 +91,146 @@ export async function PeerConnectionSetup(props: SetupPeerConnectionProps): Prom onLayerEndpointChange, onStateChange, onError, - onChatAdapterChange } = props + onChatAdapterChange, + } = props; - if (videoRef.current === null){ - throw new Error("PeerConnection.VideoRef is null") + if (videoRef.current === null) { + throw new Error("PeerConnection.VideoRef is null"); } - stopVideoTrack(videoRef.current) - clearVideoElement(videoRef.current) + stopVideoTrack(videoRef.current); + clearVideoElement(videoRef.current); - // Create peerconnection - const peerConnection = await createPeerConnection() - const chatDataChannel = peerConnection.createDataChannel(DATA_CHANNEL_LABEL) - const chatAdapter = new ChatDataChannelAdapter() - chatAdapter.attachChannel(chatDataChannel) - onChatAdapterChange?.(chatAdapter) + const peerConnection = await createPeerConnection(); + const chatDataChannel = peerConnection.createDataChannel(DATA_CHANNEL_LABEL); + const chatAdapter = new ChatDataChannelAdapter(); + chatAdapter.attachChannel(chatDataChannel); + onChatAdapterChange?.(chatAdapter); - // Config - peerConnection.addTransceiver('audio', { direction: 'recvonly' }) - peerConnection.addTransceiver('video', { direction: 'recvonly' }) + peerConnection.addTransceiver("audio", { direction: "recvonly" }); + peerConnection.addTransceiver("video", { direction: "recvonly" }); - // Setup events const remoteStream = new MediaStream(); peerConnection.ontrack = (event: RTCTrackEvent) => { remoteStream.addTrack(event.track); if (videoRef.current) { - videoRef.current!.srcObject = remoteStream; + videoRef.current.srcObject = remoteStream; } else { - console.log("PeerConnection.onTrack", "Could not find VideoRef") + console.log("PeerConnection.onTrack", "Could not find VideoRef"); } - event.track.onended = () => remoteStream.removeTrack(event.track) - } + event.track.onended = () => remoteStream.removeTrack(event.track); + }; - // Begin negotiation - const offer = await peerConnection.createOffer({ iceRestart: true }) - offer["sdp"] = offer["sdp"]!.replace("useinbandfec=1", "useinbandfec=1;stereo=1") + const offer = await peerConnection.createOffer({ iceRestart: true }); + offer.sdp = offer.sdp!.replace("useinbandfec=1", "useinbandfec=1;stereo=1"); await peerConnection .setLocalDescription(offer) .catch((err) => console.error("PeerConnection.SetLocalDescription", err)); const whepResponse = await fetch(`/api/whep`, { - method: 'POST', + method: "POST", headers: { Authorization: `Bearer ${streamKey}`, - 'Content-Type': 'application/sdp' + "Content-Type": "application/sdp", }, body: offer.sdp, - }) + }); if (!whepResponse.ok) { - console.log("PeerConnection.WhepResponse.Error", SetupPeerConnectionError.INVALID_WHEP_RESPONSE) - onError(SetupPeerConnectionError.INVALID_WHEP_RESPONSE) + console.log( + "PeerConnection.WhepResponse.Error", + SetupPeerConnectionError.INVALID_WHEP_RESPONSE, + ); + onError( + SetupPeerConnectionError.INVALID_WHEP_RESPONSE, + isFatalWhepStatus(whepResponse.status) + ? SetupPeerConnectionFailureType.FATAL + : SetupPeerConnectionFailureType.RETRYABLE, + ); + peerConnection.close(); + chatAdapter.detachChannel(); + onChatAdapterChange?.(undefined); + throw new DOMException("Invalid WHEP response"); } - const parsedLinkHeader = parseLinkHeader(whepResponse.headers.get('Link')) - - if (parsedLinkHeader === null || parsedLinkHeader === undefined) { + const parsedLinkHeader = parseLinkHeader(whepResponse.headers.get("Link")); + if (!parsedLinkHeader) { throw new DOMException("Missing link header"); } - layerEndpointRef.current = `${parsedLinkHeader['urn:ietf:params:whep:ext:core:layer'].url}` - onLayerEndpointChange?.(layerEndpointRef.current) - const evtSource = new EventSource(`${parsedLinkHeader['urn:ietf:params:whep:ext:core:server-sent-events'].url}`) + layerEndpointRef.current = `${parsedLinkHeader["urn:ietf:params:whep:ext:core:layer"].url}`; + onLayerEndpointChange?.(layerEndpointRef.current); - evtSource.onerror = (ev: Event) => { - console.error("PeerConnection.EventSource", ev) + const evtSource = new EventSource( + `${parsedLinkHeader["urn:ietf:params:whep:ext:core:server-sent-events"].url}`, + ); + + const cleanupResources = () => { evtSource.close(); - chatAdapter.detachChannel() - onChatAdapterChange?.(undefined) - onStateChange(SetupPeerConnectionStateChange.OFFLINE) - } + chatAdapter.detachChannel(); + onChatAdapterChange?.(undefined); + }; - // Receive current status of the whep stream - evtSource.addEventListener("streamStart", () => { - console.log("PeerConnection.EventSource", "Reset Stream", streamKey) + const closeConnection = () => { + cleanupResources(); + peerConnection.close(); + }; - evtSource.close() - chatAdapter.detachChannel() - onChatAdapterChange?.(undefined) - peerConnection.close() + evtSource.onerror = (ev: Event) => { + console.error("PeerConnection.EventSource", ev); + closeConnection(); + onStateChange(SetupPeerConnectionStateChange.OFFLINE); + }; - onStreamRestart() - }) + evtSource.addEventListener("streamStart", () => { + console.log("PeerConnection.EventSource", "Reset Stream", streamKey); + closeConnection(); + onStreamRestart(); + }); - // Receive current status of the whep stream evtSource.addEventListener("status", (event: MessageEvent) => { - onStreamStatus(JSON.parse(event.data) as StreamStatus) - }) + onStreamStatus(JSON.parse(event.data) as StreamStatus); + }); - // Receive current current layers of this whep stream evtSource.addEventListener("currentLayers", (event: MessageEvent) => { - onLayerStatus(JSON.parse(event.data) as CurrentLayersMessage) - }) - - // Receive layers - evtSource.addEventListener("layers", event => { - const parsed = JSON.parse(event.data) as LayersMessagePayload - const videoLayerIds = parsed['1']?.layers.map((layer) => layer.encodingId) ?? [] - const audioLayerIds = parsed['2']?.layers.map((layer) => layer.encodingId) ?? [] - onVideoLayerChange(videoLayerIds) - onAudioLayerChange(audioLayerIds) - }) - - const answer = await whepResponse.text() - await peerConnection.setRemoteDescription({ - sdp: answer, - type: 'answer' - }).catch((err) => console.error("PeerConnection.RemoteDescription", err)) - - peerConnection.addEventListener('connectionstatechange', () => { + onLayerStatus(JSON.parse(event.data) as CurrentLayersMessage); + }); + + evtSource.addEventListener("layers", (event) => { + const parsed = JSON.parse(event.data) as LayersMessagePayload; + const videoLayerIds = + parsed["1"]?.layers.map((layer) => layer.encodingId) ?? []; + const audioLayerIds = + parsed["2"]?.layers.map((layer) => layer.encodingId) ?? []; + onVideoLayerChange(videoLayerIds); + onAudioLayerChange(audioLayerIds); + }); + + const answer = await whepResponse.text(); + await peerConnection + .setRemoteDescription({ + sdp: answer, + type: "answer", + }) + .catch((err) => console.error("PeerConnection.RemoteDescription", err)); + + peerConnection.addEventListener("connectionstatechange", () => { + if (peerConnection.connectionState === "connected") { + onStateChange(SetupPeerConnectionStateChange.ONLINE); + return; + } + if ( - peerConnection.connectionState === 'closed' || - peerConnection.connectionState === 'failed' || - peerConnection.connectionState === 'disconnected' + peerConnection.connectionState === "closed" || + peerConnection.connectionState === "failed" || + peerConnection.connectionState === "disconnected" ) { - chatAdapter.detachChannel() - onChatAdapterChange?.(undefined) + cleanupResources(); + onStateChange(SetupPeerConnectionStateChange.OFFLINE); } - }) + }); return peerConnection; } @@ -206,3 +238,12 @@ export async function PeerConnectionSetup(props: SetupPeerConnectionProps): Prom async function createPeerConnection(): Promise { return new RTCPeerConnection(); } + +function isFatalWhepStatus(statusCode: number): boolean { + return ( + statusCode === 400 || + statusCode === 401 || + statusCode === 403 || + statusCode === 404 + ); +} diff --git a/web/src/hooks/useReconnectController.ts b/web/src/hooks/useReconnectController.ts new file mode 100644 index 00000000..0e7f6064 --- /dev/null +++ b/web/src/hooks/useReconnectController.ts @@ -0,0 +1,202 @@ +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; + +interface ReconnectControllerOptions { + baseDelayMs?: number; + maxDelayMs?: number; + maxAttempts?: number; + jitterRatio?: number; + pauseWhileHidden?: boolean; +} + +interface ReconnectControllerState { + attempt: number; + isReconnecting: boolean; + isExhausted: boolean; + nextDelayMs: number | null; +} + +interface ScheduleReconnectOptions { + immediate?: boolean; +} + +interface ReconnectController { + state: ReconnectControllerState; + scheduleReconnect: ( + onReconnect: () => void, + options?: ScheduleReconnectOptions, + ) => boolean; + reset: () => void; + cancel: () => void; +} + +const PAUSE_RETRY_MS = 1_000; + +const clampDelay = (delayMs: number, maxDelayMs: number) => { + if (delayMs > maxDelayMs) { + return maxDelayMs; + } + + return delayMs; +}; + +const getBackoffDelay = ( + attempt: number, + baseDelayMs: number, + maxDelayMs: number, +) => { + const rawDelay = baseDelayMs * Math.pow(2, Math.max(0, attempt - 1)); + return clampDelay(rawDelay, maxDelayMs); +}; + +const withJitter = (delayMs: number, jitterRatio: number) => { + const jitterRange = Math.floor(delayMs * jitterRatio); + if (jitterRange <= 0) { + return delayMs; + } + + const jitter = + Math.floor(Math.random() * (jitterRange * 2 + 1)) - jitterRange; + return Math.max(0, delayMs + jitter); +}; + +const shouldPauseReconnect = (pauseWhileHidden: boolean) => { + if (typeof window === "undefined") { + return false; + } + + if (typeof navigator !== "undefined" && navigator.onLine === false) { + return true; + } + + if (pauseWhileHidden && typeof document !== "undefined") { + return document.visibilityState !== "visible"; + } + + return false; +}; + +export function useReconnectController( + options?: ReconnectControllerOptions, +): ReconnectController { + const { + baseDelayMs = 500, + maxDelayMs = 8_000, + maxAttempts = 8, + jitterRatio = 0.2, + pauseWhileHidden = true, + } = options ?? {}; + + const timeoutRef = useRef(undefined); + const stoppedRef = useRef(false); + const [attempt, setAttempt] = useState(0); + const [isReconnecting, setIsReconnecting] = useState(false); + const [nextDelayMs, setNextDelayMs] = useState(null); + const [isExhausted, setIsExhausted] = useState(false); + + const clearPendingTimer = useCallback(() => { + clearTimeout(timeoutRef.current); + timeoutRef.current = undefined; + }, []); + + const cancel = useCallback(() => { + stoppedRef.current = true; + clearPendingTimer(); + setIsReconnecting(false); + setNextDelayMs(null); + }, [clearPendingTimer]); + + const reset = useCallback(() => { + stoppedRef.current = false; + clearPendingTimer(); + setAttempt(0); + setIsReconnecting(false); + setIsExhausted(false); + setNextDelayMs(null); + }, [clearPendingTimer]); + + const scheduleReconnect = useCallback( + (onReconnect: () => void, scheduleOptions?: ScheduleReconnectOptions) => { + if (stoppedRef.current) { + return false; + } + + setIsExhausted(false); + + let scheduled = false; + setAttempt((currentAttempt) => { + const nextAttempt = currentAttempt + 1; + + if (!scheduleOptions?.immediate && nextAttempt > maxAttempts) { + setIsExhausted(true); + setIsReconnecting(false); + setNextDelayMs(null); + scheduled = false; + return currentAttempt; + } + + clearPendingTimer(); + const delayMs = scheduleOptions?.immediate + ? 0 + : withJitter( + getBackoffDelay(nextAttempt, baseDelayMs, maxDelayMs), + jitterRatio, + ); + + setIsReconnecting(true); + setNextDelayMs(delayMs); + scheduled = true; + + const triggerReconnect = () => { + if (stoppedRef.current) { + return; + } + + if (shouldPauseReconnect(pauseWhileHidden)) { + timeoutRef.current = setTimeout(triggerReconnect, PAUSE_RETRY_MS); + return; + } + + setIsReconnecting(false); + setNextDelayMs(null); + onReconnect(); + }; + + timeoutRef.current = setTimeout(triggerReconnect, delayMs); + return nextAttempt; + }); + + return scheduled; + }, + [ + baseDelayMs, + clearPendingTimer, + jitterRatio, + maxAttempts, + maxDelayMs, + pauseWhileHidden, + ], + ); + + useEffect(() => { + return () => { + clearPendingTimer(); + }; + }, [clearPendingTimer]); + + const state = useMemo( + () => ({ + attempt, + isReconnecting, + isExhausted, + nextDelayMs, + }), + [attempt, isExhausted, isReconnecting, nextDelayMs], + ); + + return { + state, + scheduleReconnect, + reset, + cancel, + }; +}