From dacfde4328d2482df5c2ee9c5d85ccdc14bec12c Mon Sep 17 00:00:00 2001 From: Daniel Date: Sat, 7 Mar 2026 00:51:09 +0100 Subject: [PATCH] fix(streaming): resolve stale closure preventing remote WebRTC connections The WebSocket onmessage handler captured isBroadcasting=false at creation time. When ICE candidates arrived from remote viewers, the handler looked up viewerPcRef instead of peerConnectionsRef, dropping all candidates. Fix: use refs (isBroadcastingRef, viewingRef, handleWsMessageRef) so the WS handler always reads current state. connectWs() now has [] deps and delegates to handleWsMessageRef.current for every message. Co-Authored-By: Claude Opus 4.6 --- web/src/plugins/streaming/StreamingTab.tsx | 139 ++++++++++++--------- 1 file changed, 78 insertions(+), 61 deletions(-) diff --git a/web/src/plugins/streaming/StreamingTab.tsx b/web/src/plugins/streaming/StreamingTab.tsx index 8cee2bd..40121dc 100644 --- a/web/src/plugins/streaming/StreamingTab.tsx +++ b/web/src/plugins/streaming/StreamingTab.tsx @@ -61,6 +61,12 @@ export default function StreamingTab({ data }: { data: any }) { const reconnectTimerRef = useRef | null>(null); const reconnectDelayRef = useRef(1000); + // ── Refs that mirror state (to avoid stale closures in WS handler) ── + const isBroadcastingRef = useRef(false); + const viewingRef = useRef(null); + useEffect(() => { isBroadcastingRef.current = isBroadcasting; }, [isBroadcasting]); + useEffect(() => { viewingRef.current = viewing; }, [viewing]); + // ── Elapsed time ticker ── useEffect(() => { const hasActive = streams.length > 0 || isBroadcasting; @@ -81,42 +87,25 @@ export default function StreamingTab({ data }: { data: any }) { if (userName) localStorage.setItem('streaming_name', userName); }, [userName]); - // ── WebSocket connect ── - const connectWs = useCallback(() => { - if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) return; + // ── Send via WS ── + const wsSend = useCallback((d: Record) => { + if (wsRef.current?.readyState === WebSocket.OPEN) { + wsRef.current.send(JSON.stringify(d)); + } + }, []); - const proto = location.protocol === 'https:' ? 'wss' : 'ws'; - const ws = new WebSocket(`${proto}://${location.host}/ws/streaming`); - wsRef.current = ws; + // ── Viewer cleanup ── + const cleanupViewer = useCallback(() => { + if (viewerPcRef.current) { + viewerPcRef.current.close(); + viewerPcRef.current = null; + } + if (remoteVideoRef.current) remoteVideoRef.current.srcObject = null; + }, []); - ws.onopen = () => { - reconnectDelayRef.current = 1000; - }; - - ws.onmessage = (ev) => { - let msg: any; - try { msg = JSON.parse(ev.data); } catch { return; } - handleWsMessage(msg); - }; - - ws.onclose = () => { - wsRef.current = null; - // Auto-reconnect if broadcasting or viewing - if (isBroadcasting || viewing) { - reconnectTimerRef.current = setTimeout(() => { - reconnectDelayRef.current = Math.min(reconnectDelayRef.current * 2, 10000); - connectWs(); - }, reconnectDelayRef.current); - } - }; - - ws.onerror = () => { - ws.close(); - }; - }, [isBroadcasting, viewing]); - - // ── WS message handler ── - const handleWsMessage = useCallback((msg: any) => { + // ── WS message handler (uses refs, never stale) ── + const handleWsMessageRef = useRef<(msg: any) => void>(() => {}); + handleWsMessageRef.current = (msg: any) => { switch (msg.type) { case 'welcome': clientIdRef.current = msg.clientId; @@ -126,15 +115,16 @@ export default function StreamingTab({ data }: { data: any }) { case 'broadcast_started': setMyStreamId(msg.streamId); setIsBroadcasting(true); + isBroadcastingRef.current = true; // immediate update for handler setStarting(false); break; case 'stream_available': - // SSE will update streams list; this is just a hint + // SSE will update streams list break; case 'stream_ended': - if (viewing?.streamId === msg.streamId) { + if (viewingRef.current?.streamId === msg.streamId) { cleanupViewer(); setViewing(null); } @@ -160,6 +150,16 @@ export default function StreamingTab({ data }: { data: any }) { } }; + pc.onnegotiationneeded = () => { + pc.createOffer() + .then(offer => pc.setLocalDescription(offer)) + .then(() => { + wsSend({ type: 'offer', targetId: viewerId, sdp: pc.localDescription }); + }) + .catch(console.error); + }; + + // If negotiationneeded doesn't fire (tracks already added), create offer now pc.createOffer() .then(offer => pc.setLocalDescription(offer)) .then(() => { @@ -222,9 +222,9 @@ export default function StreamingTab({ data }: { data: any }) { break; } - // ── ICE candidate relay ── + // ── ICE candidate relay (uses ref, not stale state!) ── case 'ice_candidate': { - const pc = isBroadcasting + const pc = isBroadcastingRef.current ? peerConnectionsRef.current.get(msg.fromId) : viewerPcRef.current; if (pc && msg.candidate) { @@ -238,20 +238,47 @@ export default function StreamingTab({ data }: { data: any }) { setStarting(false); break; } - }, [isBroadcasting, viewing]); - - // ── Send via WS ── - const wsSend = (data: Record) => { - if (wsRef.current?.readyState === WebSocket.OPEN) { - wsRef.current.send(JSON.stringify(data)); - } }; + // ── WebSocket connect (stable, no state deps) ── + const connectWs = useCallback(() => { + if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) return; + + const proto = location.protocol === 'https:' ? 'wss' : 'ws'; + const ws = new WebSocket(`${proto}://${location.host}/ws/streaming`); + wsRef.current = ws; + + ws.onopen = () => { + reconnectDelayRef.current = 1000; + }; + + // Delegate to ref so handler is always current + ws.onmessage = (ev) => { + let msg: any; + try { msg = JSON.parse(ev.data); } catch { return; } + handleWsMessageRef.current(msg); + }; + + ws.onclose = () => { + wsRef.current = null; + // Auto-reconnect if broadcasting or viewing (read from refs) + if (isBroadcastingRef.current || viewingRef.current) { + reconnectTimerRef.current = setTimeout(() => { + reconnectDelayRef.current = Math.min(reconnectDelayRef.current * 2, 10000); + connectWs(); + }, reconnectDelayRef.current); + } + }; + + ws.onerror = () => { + ws.close(); + }; + }, []); + // ── Start broadcasting ── const startBroadcast = useCallback(async () => { if (!userName.trim()) { setError('Bitte gib einen Namen ein.'); return; } - // Check browser support if (!navigator.mediaDevices?.getDisplayMedia) { setError('Dein Browser unterstützt keine Bildschirmfreigabe.'); return; @@ -280,7 +307,6 @@ export default function StreamingTab({ data }: { data: any }) { // Connect WS and start broadcast connectWs(); - // Wait for WS to open, then send start_broadcast const waitForWs = () => { if (wsRef.current?.readyState === WebSocket.OPEN) { wsSend({ type: 'start_broadcast', name: userName.trim(), title: streamTitle.trim() || 'Screen Share' }); @@ -297,24 +323,23 @@ export default function StreamingTab({ data }: { data: any }) { setError(`Fehler: ${e.message}`); } } - }, [userName, streamTitle, connectWs]); + }, [userName, streamTitle, connectWs, wsSend]); // ── Stop broadcasting ── const stopBroadcast = useCallback(() => { wsSend({ type: 'stop_broadcast' }); - // Stop all tracks localStreamRef.current?.getTracks().forEach(t => t.stop()); localStreamRef.current = null; if (localVideoRef.current) localVideoRef.current.srcObject = null; - // Close all peer connections for (const pc of peerConnectionsRef.current.values()) pc.close(); peerConnectionsRef.current.clear(); setIsBroadcasting(false); + isBroadcastingRef.current = false; setMyStreamId(null); - }, []); + }, [wsSend]); // ── Join as viewer ── const joinStream = useCallback((streamId: string) => { @@ -330,22 +355,14 @@ export default function StreamingTab({ data }: { data: any }) { } }; waitForWs(); - }, [userName, connectWs]); + }, [userName, connectWs, wsSend]); // ── Leave viewer ── - const cleanupViewer = useCallback(() => { - if (viewerPcRef.current) { - viewerPcRef.current.close(); - viewerPcRef.current = null; - } - if (remoteVideoRef.current) remoteVideoRef.current.srcObject = null; - }, []); - const leaveViewing = useCallback(() => { wsSend({ type: 'leave_viewer' }); cleanupViewer(); setViewing(null); - }, [cleanupViewer]); + }, [cleanupViewer, wsSend]); // ── Cleanup on unmount ── useEffect(() => {