Streaming: Stale-Stream Fix, Broadcast+View gleichzeitig, 3-Punkt-Menü

Server:
- Dual-Role: Client kann gleichzeitig broadcasten UND zuschauen
  (broadcastStreamId + viewingStreamId statt single role)
- POST /api/streaming/disconnect Beacon-Endpoint fuer
  zuverlaessigen Cleanup bei Page-Unload
- Heartbeat auf 5s reduziert (schnellere Erkennung)

Frontend:
- pagehide + sendBeacon: Streams werden sofort aufgeraeumt wenn
  Browser geschlossen/neugeladen wird
- ICE Routing: Broadcaster-Map wird zuerst geprueft, dann Viewer-PC
  → Broadcast + View im selben Tab moeglich
- 3-Punkt-Menü mit Stream-Details, "In neuem Fenster oeffnen" und
  "Link teilen" (Clipboard)
- Auto-Join via ?viewStream=... Query-Parameter (fuer geteilte Links)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Daniel 2026-03-07 01:56:14 +01:00
parent 813e017036
commit 470bef62e4
7 changed files with 5091 additions and 5013 deletions

View file

@ -18,10 +18,12 @@ interface StreamInfo {
interface WsClient {
id: string;
ws: WebSocket;
role: 'idle' | 'broadcaster' | 'viewer';
name: string;
streamId?: string; // ID of stream this client broadcasts or views
isAlive: boolean; // heartbeat tracking
/** Stream ID this client broadcasts (undefined if not broadcasting) */
broadcastStreamId?: string;
/** Stream ID this client views (undefined if not viewing) */
viewingStreamId?: string;
isAlive: boolean;
}
// ── State ──
@ -33,8 +35,7 @@ const wsClients = new Map<string, WsClient>();
let wss: WebSocketServer | null = null;
let heartbeatInterval: ReturnType<typeof setInterval> | null = null;
const HEARTBEAT_MS = 10_000; // ping every 10s
const HEARTBEAT_TIMEOUT = 25_000; // dead after missing ~2 pings
const HEARTBEAT_MS = 5_000; // ping every 5s (faster detection)
// ── Helpers ──
@ -61,18 +62,16 @@ function endStream(streamId: string, reason: string): void {
// Notify all viewers of this stream
for (const c of wsClients.values()) {
if (c.role === 'viewer' && c.streamId === streamId) {
if (c.viewingStreamId === streamId) {
sendTo(c, { type: 'stream_ended', streamId, reason });
c.role = 'idle';
c.streamId = undefined;
c.viewingStreamId = undefined;
}
}
// Reset broadcaster role
// Reset broadcaster
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
broadcaster.role = 'idle';
broadcaster.streamId = undefined;
broadcaster.broadcastStreamId = undefined;
}
streams.delete(streamId);
@ -80,14 +79,27 @@ function endStream(streamId: string, reason: string): void {
console.log(`[Streaming] Stream "${stream.title}" ended: ${reason}`);
}
function leaveViewer(client: WsClient): void {
if (!client.viewingStreamId) return;
const stream = streams.get(client.viewingStreamId);
if (stream) {
stream.viewerCount = Math.max(0, stream.viewerCount - 1);
broadcastStreamStatus();
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.viewingStreamId });
}
}
client.viewingStreamId = undefined;
}
// ── WebSocket Signaling ──
function handleSignalingMessage(client: WsClient, msg: any): void {
switch (msg.type) {
// ── Broadcaster starts a stream ──
case 'start_broadcast': {
// One broadcast per client
if (client.role === 'broadcaster') {
if (client.broadcastStreamId) {
sendTo(client, { type: 'error', code: 'ALREADY_BROADCASTING', message: 'Du streamst bereits.' });
return;
}
@ -100,9 +112,8 @@ function handleSignalingMessage(client: WsClient, msg: any): void {
const name = String(msg.name || 'Anon').slice(0, 32);
const title = String(msg.title || 'Screen Share').slice(0, 64);
client.role = 'broadcaster';
client.name = name;
client.streamId = streamId;
client.broadcastStreamId = streamId;
streams.set(streamId, {
id: streamId,
@ -117,7 +128,7 @@ function handleSignalingMessage(client: WsClient, msg: any): void {
sendTo(client, { type: 'broadcast_started', streamId });
broadcastStreamStatus();
// Notify all idle clients
// Notify all other clients
for (const c of wsClients.values()) {
if (c.id !== client.id) {
sendTo(c, { type: 'stream_available', streamId, broadcasterName: name, title });
@ -129,8 +140,8 @@ function handleSignalingMessage(client: WsClient, msg: any): void {
// ── Broadcaster stops ──
case 'stop_broadcast': {
if (client.role !== 'broadcaster' || !client.streamId) return;
endStream(client.streamId, 'Broadcaster hat den Stream beendet');
if (!client.broadcastStreamId) return;
endStream(client.broadcastStreamId, 'Broadcaster hat den Stream beendet');
break;
}
@ -149,9 +160,13 @@ function handleSignalingMessage(client: WsClient, msg: any): void {
return;
}
client.role = 'viewer';
client.name = String(msg.name || 'Viewer').slice(0, 32);
client.streamId = streamId;
// Leave current view if already viewing
if (client.viewingStreamId) {
leaveViewer(client);
}
client.name = client.name || String(msg.name || 'Viewer').slice(0, 32);
client.viewingStreamId = streamId;
stream.viewerCount++;
broadcastStreamStatus();
@ -165,18 +180,7 @@ function handleSignalingMessage(client: WsClient, msg: any): void {
// ── Viewer leaves ──
case 'leave_viewer': {
if (client.role !== 'viewer' || !client.streamId) return;
const stream = streams.get(client.streamId);
if (stream) {
stream.viewerCount = Math.max(0, stream.viewerCount - 1);
broadcastStreamStatus();
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.streamId });
}
}
client.role = 'idle';
client.streamId = undefined;
leaveViewer(client);
break;
}
@ -194,18 +198,13 @@ function handleSignalingMessage(client: WsClient, msg: any): void {
}
function handleDisconnect(client: WsClient): void {
if (client.role === 'broadcaster' && client.streamId) {
endStream(client.streamId, 'Broadcaster hat die Verbindung verloren');
} else if (client.role === 'viewer' && client.streamId) {
const stream = streams.get(client.streamId);
if (stream) {
stream.viewerCount = Math.max(0, stream.viewerCount - 1);
broadcastStreamStatus();
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.streamId });
}
}
// Clean up broadcast
if (client.broadcastStreamId) {
endStream(client.broadcastStreamId, 'Broadcaster hat die Verbindung verloren');
}
// Clean up viewing
if (client.viewingStreamId) {
leaveViewer(client);
}
}
@ -224,6 +223,25 @@ const streamingPlugin: Plugin = {
app.get('/api/streaming/status', (_req, res) => {
res.json({ streams: getStreamList() });
});
// Beacon cleanup endpoint — called via navigator.sendBeacon() on page unload
app.post('/api/streaming/disconnect', (req, res) => {
let body = '';
req.on('data', (chunk: Buffer) => { body += chunk.toString(); });
req.on('end', () => {
try {
const { clientId } = JSON.parse(body);
const client = wsClients.get(clientId);
if (client) {
console.log(`[Streaming] Beacon disconnect for ${client.name || clientId.slice(0, 8)}`);
handleDisconnect(client);
wsClients.delete(clientId);
client.ws.terminate();
}
} catch { /* ignore malformed */ }
res.status(204).end();
});
});
},
getSnapshot(_ctx) {
@ -253,7 +271,7 @@ export function attachWebSocket(server: http.Server): void {
wss.on('connection', (ws) => {
const clientId = crypto.randomUUID();
const client: WsClient = { id: clientId, ws, role: 'idle', name: '', isAlive: true };
const client: WsClient = { id: clientId, ws, name: '', isAlive: true };
wsClients.set(clientId, client);
sendTo(client, { type: 'welcome', clientId, streams: getStreamList() });
@ -283,8 +301,7 @@ export function attachWebSocket(server: http.Server): void {
heartbeatInterval = setInterval(() => {
for (const [id, client] of wsClients) {
if (!client.isAlive) {
// No pong received since last check → dead
console.log(`[Streaming] Heartbeat timeout for ${client.name || id.slice(0, 8)} (role=${client.role})`);
console.log(`[Streaming] Heartbeat timeout for ${client.name || id.slice(0, 8)}`);
handleDisconnect(client);
wsClients.delete(id);
client.ws.terminate();