gaming-hub/server/src/plugins/streaming/index.ts
Daniel e4895a792c Fix: WebSocket-Konflikt zwischen Streaming und Watch Together
Root Cause: ws-Library killt WS-Verbindungen mit HTTP 400 wenn
mehrere WebSocketServer mit { server, path } registriert werden.
Der erste WSS (streaming) hat abortHandshake() fuer watch-together
Verbindungen aufgerufen.

- Beide WSS auf noServer-Modus umgestellt
- Zentrales upgrade-Routing in index.ts nach Pathname
- Frontend: connectWs prueft jetzt auch CONNECTING-State
- Frontend: onclose clobbert wsRef nur noch wenn gleicher Socket
- Frontend: waitForWs hat jetzt 10s Timeout statt Endlosschleife

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-07 11:02:36 +01:00

317 lines
9.2 KiB
TypeScript

import type express from 'express';
import { WebSocketServer, WebSocket } from 'ws';
import crypto from 'node:crypto';
import type { Plugin, PluginContext } from '../../core/plugin.js';
import { sseBroadcast } from '../../core/sse.js';
// ── Types ──
interface StreamInfo {
id: string;
broadcasterName: string;
title: string;
startedAt: string;
viewerCount: number;
}
interface WsClient {
id: string;
ws: WebSocket;
name: string;
/** Stream ID this client broadcasts (undefined if not broadcasting) */
broadcastStreamId?: string;
/** Stream ID this client views (undefined if not viewing) */
viewingStreamId?: string;
isAlive: boolean;
}
// ── State ──
/** Active streams keyed by stream ID (password stored server-side, never sent to clients) */
const streams = new Map<string, StreamInfo & { broadcasterId: string; password: string }>();
/** All connected WS clients */
const wsClients = new Map<string, WsClient>();
let wss: WebSocketServer | null = null;
let heartbeatInterval: ReturnType<typeof setInterval> | null = null;
const HEARTBEAT_MS = 5_000; // ping every 5s (faster detection)
// ── Helpers ──
function broadcastStreamStatus(): void {
sseBroadcast({ type: 'streaming_status', plugin: 'streaming', streams: getStreamList() });
}
function sendTo(client: WsClient, data: Record<string, any>): void {
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(data));
}
}
function getStreamList(): (StreamInfo & { hasPassword: boolean })[] {
return [...streams.values()].map(({ broadcasterId: _, password: pw, ...s }) => ({
...s,
hasPassword: pw.length > 0,
}));
}
function endStream(streamId: string, reason: string): void {
const stream = streams.get(streamId);
if (!stream) return;
// Notify all viewers of this stream
for (const c of wsClients.values()) {
if (c.viewingStreamId === streamId) {
sendTo(c, { type: 'stream_ended', streamId, reason });
c.viewingStreamId = undefined;
}
}
// Reset broadcaster
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
broadcaster.broadcastStreamId = undefined;
}
streams.delete(streamId);
broadcastStreamStatus();
console.log(`[Streaming] Stream "${stream.title}" ended: ${reason}`);
}
function leaveViewer(client: WsClient): void {
if (!client.viewingStreamId) return;
const stream = streams.get(client.viewingStreamId);
if (stream) {
stream.viewerCount = Math.max(0, stream.viewerCount - 1);
broadcastStreamStatus();
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.viewingStreamId });
}
}
client.viewingStreamId = undefined;
}
// ── WebSocket Signaling ──
function handleSignalingMessage(client: WsClient, msg: any): void {
switch (msg.type) {
// ── Broadcaster starts a stream ──
case 'start_broadcast': {
if (client.broadcastStreamId) {
sendTo(client, { type: 'error', code: 'ALREADY_BROADCASTING', message: 'Du streamst bereits.' });
return;
}
const password = String(msg.password || '').trim();
if (!password) {
sendTo(client, { type: 'error', code: 'PASSWORD_REQUIRED', message: 'Passwort ist Pflicht.' });
return;
}
const streamId = crypto.randomUUID();
const name = String(msg.name || 'Anon').slice(0, 32);
const title = String(msg.title || 'Screen Share').slice(0, 64);
client.name = name;
client.broadcastStreamId = streamId;
streams.set(streamId, {
id: streamId,
broadcasterId: client.id,
broadcasterName: name,
title,
password,
startedAt: new Date().toISOString(),
viewerCount: 0,
});
sendTo(client, { type: 'broadcast_started', streamId });
broadcastStreamStatus();
// Notify all other clients
for (const c of wsClients.values()) {
if (c.id !== client.id) {
sendTo(c, { type: 'stream_available', streamId, broadcasterName: name, title });
}
}
console.log(`[Streaming] ${name} started "${title}" (${streamId.slice(0, 8)})`);
break;
}
// ── Broadcaster stops ──
case 'stop_broadcast': {
if (!client.broadcastStreamId) return;
endStream(client.broadcastStreamId, 'Broadcaster hat den Stream beendet');
break;
}
// ── Viewer joins a stream ──
case 'join_viewer': {
const streamId = msg.streamId;
const stream = streams.get(streamId);
if (!stream) {
sendTo(client, { type: 'error', code: 'NO_STREAM', message: 'Stream nicht gefunden.' });
return;
}
// Validate password
const joinPw = String(msg.password || '').trim();
if (stream.password && joinPw !== stream.password) {
sendTo(client, { type: 'error', code: 'WRONG_PASSWORD', message: 'Falsches Passwort.' });
return;
}
// Leave current view if already viewing
if (client.viewingStreamId) {
leaveViewer(client);
}
client.name = client.name || String(msg.name || 'Viewer').slice(0, 32);
client.viewingStreamId = streamId;
stream.viewerCount++;
broadcastStreamStatus();
// Tell broadcaster to create an offer for this viewer
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_joined', viewerId: client.id, streamId });
}
break;
}
// ── Viewer leaves ──
case 'leave_viewer': {
leaveViewer(client);
break;
}
// ── WebRTC signaling relay ──
case 'offer':
case 'answer':
case 'ice_candidate': {
const target = wsClients.get(msg.targetId);
if (target) {
sendTo(target, { ...msg, fromId: client.id });
}
break;
}
}
}
function handleDisconnect(client: WsClient): void {
// Clean up broadcast
if (client.broadcastStreamId) {
endStream(client.broadcastStreamId, 'Broadcaster hat die Verbindung verloren');
}
// Clean up viewing
if (client.viewingStreamId) {
leaveViewer(client);
}
}
// ── Plugin ──
const streamingPlugin: Plugin = {
name: 'streaming',
version: '1.0.0',
description: 'Screen Streaming',
async init(_ctx) {
console.log('[Streaming] Initialized');
},
registerRoutes(app: express.Application, _ctx: PluginContext) {
app.get('/api/streaming/status', (_req, res) => {
res.json({ streams: getStreamList() });
});
// Beacon cleanup endpoint — called via navigator.sendBeacon() on page unload
app.post('/api/streaming/disconnect', (req, res) => {
let body = '';
req.on('data', (chunk: Buffer) => { body += chunk.toString(); });
req.on('end', () => {
try {
const { clientId } = JSON.parse(body);
const client = wsClients.get(clientId);
if (client) {
console.log(`[Streaming] Beacon disconnect for ${client.name || clientId.slice(0, 8)}`);
handleDisconnect(client);
wsClients.delete(clientId);
client.ws.terminate();
}
} catch { /* ignore malformed */ }
res.status(204).end();
});
});
},
getSnapshot(_ctx) {
return {
streaming: { streams: getStreamList() },
};
},
async destroy() {
if (heartbeatInterval) { clearInterval(heartbeatInterval); heartbeatInterval = null; }
if (wss) {
for (const client of wsClients.values()) {
client.ws.close(1001, 'Server shutting down');
}
wsClients.clear();
streams.clear();
wss.close();
wss = null;
}
console.log('[Streaming] Destroyed');
},
};
/** Attach WebSocket signaling to a pre-created WebSocketServer (noServer mode) */
export function attachWebSocket(existingWss: WebSocketServer): void {
wss = existingWss;
wss.on('connection', (ws) => {
const clientId = crypto.randomUUID();
const client: WsClient = { id: clientId, ws, name: '', isAlive: true };
wsClients.set(clientId, client);
sendTo(client, { type: 'welcome', clientId, streams: getStreamList() });
// Pong response marks client as alive
ws.on('pong', () => { client.isAlive = true; });
ws.on('message', (raw) => {
client.isAlive = true; // any message = alive
let msg: any;
try { msg = JSON.parse(raw.toString()); } catch { return; }
handleSignalingMessage(client, msg);
});
ws.on('close', () => {
handleDisconnect(client);
wsClients.delete(clientId);
});
ws.on('error', () => {
handleDisconnect(client);
wsClients.delete(clientId);
});
});
// ── Heartbeat: detect dead connections ──
heartbeatInterval = setInterval(() => {
for (const [id, client] of wsClients) {
if (!client.isAlive) {
console.log(`[Streaming] Heartbeat timeout for ${client.name || id.slice(0, 8)}`);
handleDisconnect(client);
wsClients.delete(id);
client.ws.terminate();
continue;
}
client.isAlive = false;
try { client.ws.ping(); } catch { /* ignore */ }
}
}, HEARTBEAT_MS);
console.log('[Streaming] WebSocket signaling attached at /ws/streaming');
}
export default streamingPlugin;