import type express from 'express'; import http from 'node:http'; import { WebSocketServer, WebSocket } from 'ws'; import crypto from 'node:crypto'; import type { Plugin, PluginContext } from '../../core/plugin.js'; import { sseBroadcast } from '../../core/sse.js'; // ── Types ── interface StreamInfo { id: string; broadcasterName: string; title: string; startedAt: string; viewerCount: number; } interface WsClient { id: string; ws: WebSocket; role: 'idle' | 'broadcaster' | 'viewer'; name: string; streamId?: string; // ID of stream this client broadcasts or views } // ── State ── /** Active streams keyed by stream ID */ const streams = new Map(); /** All connected WS clients */ const wsClients = new Map(); let wss: WebSocketServer | null = null; // ── Helpers ── function broadcastStreamStatus(): void { const list = [...streams.values()].map(({ broadcasterId: _, ...s }) => s); sseBroadcast({ type: 'streaming_status', plugin: 'streaming', streams: list }); } function sendTo(client: WsClient, data: Record): void { if (client.ws.readyState === WebSocket.OPEN) { client.ws.send(JSON.stringify(data)); } } function getStreamList(): StreamInfo[] { return [...streams.values()].map(({ broadcasterId: _, ...s }) => s); } function endStream(streamId: string, reason: string): void { const stream = streams.get(streamId); if (!stream) return; // Notify all viewers of this stream for (const c of wsClients.values()) { if (c.role === 'viewer' && c.streamId === streamId) { sendTo(c, { type: 'stream_ended', streamId, reason }); c.role = 'idle'; c.streamId = undefined; } } // Reset broadcaster role const broadcaster = wsClients.get(stream.broadcasterId); if (broadcaster) { broadcaster.role = 'idle'; broadcaster.streamId = undefined; } streams.delete(streamId); broadcastStreamStatus(); console.log(`[Streaming] Stream "${stream.title}" ended: ${reason}`); } // ── WebSocket Signaling ── function handleSignalingMessage(client: WsClient, msg: any): void { switch (msg.type) { // ── Broadcaster starts a stream ── case 'start_broadcast': { // One broadcast per client if (client.role === 'broadcaster') { sendTo(client, { type: 'error', code: 'ALREADY_BROADCASTING', message: 'Du streamst bereits.' }); return; } const streamId = crypto.randomUUID(); const name = String(msg.name || 'Anon').slice(0, 32); const title = String(msg.title || 'Screen Share').slice(0, 64); client.role = 'broadcaster'; client.name = name; client.streamId = streamId; streams.set(streamId, { id: streamId, broadcasterId: client.id, broadcasterName: name, title, startedAt: new Date().toISOString(), viewerCount: 0, }); sendTo(client, { type: 'broadcast_started', streamId }); broadcastStreamStatus(); // Notify all idle clients for (const c of wsClients.values()) { if (c.id !== client.id) { sendTo(c, { type: 'stream_available', streamId, broadcasterName: name, title }); } } console.log(`[Streaming] ${name} started "${title}" (${streamId.slice(0, 8)})`); break; } // ── Broadcaster stops ── case 'stop_broadcast': { if (client.role !== 'broadcaster' || !client.streamId) return; endStream(client.streamId, 'Broadcaster hat den Stream beendet'); break; } // ── Viewer joins a stream ── case 'join_viewer': { const streamId = msg.streamId; const stream = streams.get(streamId); if (!stream) { sendTo(client, { type: 'error', code: 'NO_STREAM', message: 'Stream nicht gefunden.' }); return; } client.role = 'viewer'; client.name = String(msg.name || 'Viewer').slice(0, 32); client.streamId = streamId; stream.viewerCount++; broadcastStreamStatus(); // Tell broadcaster to create an offer for this viewer const broadcaster = wsClients.get(stream.broadcasterId); if (broadcaster) { sendTo(broadcaster, { type: 'viewer_joined', viewerId: client.id, streamId }); } break; } // ── Viewer leaves ── case 'leave_viewer': { if (client.role !== 'viewer' || !client.streamId) return; const stream = streams.get(client.streamId); if (stream) { stream.viewerCount = Math.max(0, stream.viewerCount - 1); broadcastStreamStatus(); const broadcaster = wsClients.get(stream.broadcasterId); if (broadcaster) { sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.streamId }); } } client.role = 'idle'; client.streamId = undefined; break; } // ── WebRTC signaling relay ── case 'offer': case 'answer': case 'ice_candidate': { const target = wsClients.get(msg.targetId); if (target) { sendTo(target, { ...msg, fromId: client.id }); } break; } } } function handleDisconnect(client: WsClient): void { if (client.role === 'broadcaster' && client.streamId) { endStream(client.streamId, 'Broadcaster hat die Verbindung verloren'); } else if (client.role === 'viewer' && client.streamId) { const stream = streams.get(client.streamId); if (stream) { stream.viewerCount = Math.max(0, stream.viewerCount - 1); broadcastStreamStatus(); const broadcaster = wsClients.get(stream.broadcasterId); if (broadcaster) { sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.streamId }); } } } } // ── Plugin ── const streamingPlugin: Plugin = { name: 'streaming', version: '1.0.0', description: 'Screen Streaming', async init(_ctx) { console.log('[Streaming] Initialized'); }, registerRoutes(app: express.Application, _ctx: PluginContext) { app.get('/api/streaming/status', (_req, res) => { res.json({ streams: getStreamList() }); }); }, getSnapshot(_ctx) { return { streaming: { streams: getStreamList() }, }; }, async destroy() { if (wss) { for (const client of wsClients.values()) { client.ws.close(1001, 'Server shutting down'); } wsClients.clear(); streams.clear(); wss.close(); wss = null; } console.log('[Streaming] Destroyed'); }, }; /** Call after httpServer is created to attach WebSocket signaling */ export function attachWebSocket(server: http.Server): void { wss = new WebSocketServer({ server, path: '/ws/streaming' }); wss.on('connection', (ws) => { const clientId = crypto.randomUUID(); const client: WsClient = { id: clientId, ws, role: 'idle', name: '' }; wsClients.set(clientId, client); sendTo(client, { type: 'welcome', clientId, streams: getStreamList() }); ws.on('message', (raw) => { let msg: any; try { msg = JSON.parse(raw.toString()); } catch { return; } handleSignalingMessage(client, msg); }); ws.on('close', () => { handleDisconnect(client); wsClients.delete(clientId); }); ws.on('error', () => { handleDisconnect(client); wsClients.delete(clientId); }); }); console.log('[Streaming] WebSocket signaling attached at /ws/streaming'); } export default streamingPlugin;