import type express from 'express'; import http from 'node:http'; import { WebSocketServer, WebSocket } from 'ws'; import crypto from 'node:crypto'; import type { Plugin, PluginContext } from '../../core/plugin.js'; import { sseBroadcast } from '../../core/sse.js'; // ── Types ── interface StreamInfo { id: string; broadcasterName: string; title: string; startedAt: string; viewerCount: number; } interface WsClient { id: string; ws: WebSocket; name: string; /** Stream ID this client broadcasts (undefined if not broadcasting) */ broadcastStreamId?: string; /** Stream ID this client views (undefined if not viewing) */ viewingStreamId?: string; isAlive: boolean; } // ── State ── /** Active streams keyed by stream ID (password stored server-side, never sent to clients) */ const streams = new Map(); /** All connected WS clients */ const wsClients = new Map(); let wss: WebSocketServer | null = null; let heartbeatInterval: ReturnType | null = null; const HEARTBEAT_MS = 5_000; // ping every 5s (faster detection) // ── Helpers ── function broadcastStreamStatus(): void { sseBroadcast({ type: 'streaming_status', plugin: 'streaming', streams: getStreamList() }); } function sendTo(client: WsClient, data: Record): void { if (client.ws.readyState === WebSocket.OPEN) { client.ws.send(JSON.stringify(data)); } } function getStreamList(): (StreamInfo & { hasPassword: boolean })[] { return [...streams.values()].map(({ broadcasterId: _, password: pw, ...s }) => ({ ...s, hasPassword: pw.length > 0, })); } function endStream(streamId: string, reason: string): void { const stream = streams.get(streamId); if (!stream) return; // Notify all viewers of this stream for (const c of wsClients.values()) { if (c.viewingStreamId === streamId) { sendTo(c, { type: 'stream_ended', streamId, reason }); c.viewingStreamId = undefined; } } // Reset broadcaster const broadcaster = wsClients.get(stream.broadcasterId); if (broadcaster) { broadcaster.broadcastStreamId = undefined; } streams.delete(streamId); broadcastStreamStatus(); console.log(`[Streaming] Stream "${stream.title}" ended: ${reason}`); } function leaveViewer(client: WsClient): void { if (!client.viewingStreamId) return; const stream = streams.get(client.viewingStreamId); if (stream) { stream.viewerCount = Math.max(0, stream.viewerCount - 1); broadcastStreamStatus(); const broadcaster = wsClients.get(stream.broadcasterId); if (broadcaster) { sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.viewingStreamId }); } } client.viewingStreamId = undefined; } // ── WebSocket Signaling ── function handleSignalingMessage(client: WsClient, msg: any): void { switch (msg.type) { // ── Broadcaster starts a stream ── case 'start_broadcast': { if (client.broadcastStreamId) { sendTo(client, { type: 'error', code: 'ALREADY_BROADCASTING', message: 'Du streamst bereits.' }); return; } const password = String(msg.password || '').trim(); if (!password) { sendTo(client, { type: 'error', code: 'PASSWORD_REQUIRED', message: 'Passwort ist Pflicht.' }); return; } const streamId = crypto.randomUUID(); const name = String(msg.name || 'Anon').slice(0, 32); const title = String(msg.title || 'Screen Share').slice(0, 64); client.name = name; client.broadcastStreamId = streamId; streams.set(streamId, { id: streamId, broadcasterId: client.id, broadcasterName: name, title, password, startedAt: new Date().toISOString(), viewerCount: 0, }); sendTo(client, { type: 'broadcast_started', streamId }); broadcastStreamStatus(); // Notify all other clients for (const c of wsClients.values()) { if (c.id !== client.id) { sendTo(c, { type: 'stream_available', streamId, broadcasterName: name, title }); } } console.log(`[Streaming] ${name} started "${title}" (${streamId.slice(0, 8)})`); break; } // ── Broadcaster stops ── case 'stop_broadcast': { if (!client.broadcastStreamId) return; endStream(client.broadcastStreamId, 'Broadcaster hat den Stream beendet'); break; } // ── Viewer joins a stream ── case 'join_viewer': { const streamId = msg.streamId; const stream = streams.get(streamId); if (!stream) { sendTo(client, { type: 'error', code: 'NO_STREAM', message: 'Stream nicht gefunden.' }); return; } // Validate password const joinPw = String(msg.password || '').trim(); if (stream.password && joinPw !== stream.password) { sendTo(client, { type: 'error', code: 'WRONG_PASSWORD', message: 'Falsches Passwort.' }); return; } // Leave current view if already viewing if (client.viewingStreamId) { leaveViewer(client); } client.name = client.name || String(msg.name || 'Viewer').slice(0, 32); client.viewingStreamId = streamId; stream.viewerCount++; broadcastStreamStatus(); // Tell broadcaster to create an offer for this viewer const broadcaster = wsClients.get(stream.broadcasterId); if (broadcaster) { sendTo(broadcaster, { type: 'viewer_joined', viewerId: client.id, streamId }); } break; } // ── Viewer leaves ── case 'leave_viewer': { leaveViewer(client); break; } // ── WebRTC signaling relay ── case 'offer': case 'answer': case 'ice_candidate': { const target = wsClients.get(msg.targetId); if (target) { sendTo(target, { ...msg, fromId: client.id }); } break; } } } function handleDisconnect(client: WsClient): void { // Clean up broadcast if (client.broadcastStreamId) { endStream(client.broadcastStreamId, 'Broadcaster hat die Verbindung verloren'); } // Clean up viewing if (client.viewingStreamId) { leaveViewer(client); } } // ── Plugin ── const streamingPlugin: Plugin = { name: 'streaming', version: '1.0.0', description: 'Screen Streaming', async init(_ctx) { console.log('[Streaming] Initialized'); }, registerRoutes(app: express.Application, _ctx: PluginContext) { app.get('/api/streaming/status', (_req, res) => { res.json({ streams: getStreamList() }); }); // Beacon cleanup endpoint — called via navigator.sendBeacon() on page unload app.post('/api/streaming/disconnect', (req, res) => { let body = ''; req.on('data', (chunk: Buffer) => { body += chunk.toString(); }); req.on('end', () => { try { const { clientId } = JSON.parse(body); const client = wsClients.get(clientId); if (client) { console.log(`[Streaming] Beacon disconnect for ${client.name || clientId.slice(0, 8)}`); handleDisconnect(client); wsClients.delete(clientId); client.ws.terminate(); } } catch { /* ignore malformed */ } res.status(204).end(); }); }); }, getSnapshot(_ctx) { return { streaming: { streams: getStreamList() }, }; }, async destroy() { if (heartbeatInterval) { clearInterval(heartbeatInterval); heartbeatInterval = null; } if (wss) { for (const client of wsClients.values()) { client.ws.close(1001, 'Server shutting down'); } wsClients.clear(); streams.clear(); wss.close(); wss = null; } console.log('[Streaming] Destroyed'); }, }; /** Call after httpServer is created to attach WebSocket signaling */ export function attachWebSocket(server: http.Server): void { wss = new WebSocketServer({ server, path: '/ws/streaming' }); wss.on('connection', (ws) => { const clientId = crypto.randomUUID(); const client: WsClient = { id: clientId, ws, name: '', isAlive: true }; wsClients.set(clientId, client); sendTo(client, { type: 'welcome', clientId, streams: getStreamList() }); // Pong response marks client as alive ws.on('pong', () => { client.isAlive = true; }); ws.on('message', (raw) => { client.isAlive = true; // any message = alive let msg: any; try { msg = JSON.parse(raw.toString()); } catch { return; } handleSignalingMessage(client, msg); }); ws.on('close', () => { handleDisconnect(client); wsClients.delete(clientId); }); ws.on('error', () => { handleDisconnect(client); wsClients.delete(clientId); }); }); // ── Heartbeat: detect dead connections ── heartbeatInterval = setInterval(() => { for (const [id, client] of wsClients) { if (!client.isAlive) { console.log(`[Streaming] Heartbeat timeout for ${client.name || id.slice(0, 8)}`); handleDisconnect(client); wsClients.delete(id); client.ws.terminate(); continue; } client.isAlive = false; try { client.ws.ping(); } catch { /* ignore */ } } }, HEARTBEAT_MS); console.log('[Streaming] WebSocket signaling attached at /ws/streaming'); } export default streamingPlugin;