gaming-hub/server/src/plugins/streaming/index.ts

302 lines
9 KiB
TypeScript
Raw Normal View History

import type express from 'express';
import http from 'node:http';
import { WebSocketServer, WebSocket } from 'ws';
import crypto from 'node:crypto';
import type { Plugin, PluginContext } from '../../core/plugin.js';
import { sseBroadcast } from '../../core/sse.js';
// ── Types ──
interface StreamInfo {
id: string;
broadcasterName: string;
title: string;
startedAt: string;
viewerCount: number;
}
interface WsClient {
id: string;
ws: WebSocket;
role: 'idle' | 'broadcaster' | 'viewer';
name: string;
streamId?: string; // ID of stream this client broadcasts or views
isAlive: boolean; // heartbeat tracking
}
// ── State ──
/** Active streams keyed by stream ID (password stored server-side, never sent to clients) */
const streams = new Map<string, StreamInfo & { broadcasterId: string; password: string }>();
/** All connected WS clients */
const wsClients = new Map<string, WsClient>();
let wss: WebSocketServer | null = null;
let heartbeatInterval: ReturnType<typeof setInterval> | null = null;
const HEARTBEAT_MS = 10_000; // ping every 10s
const HEARTBEAT_TIMEOUT = 25_000; // dead after missing ~2 pings
// ── Helpers ──
function broadcastStreamStatus(): void {
sseBroadcast({ type: 'streaming_status', plugin: 'streaming', streams: getStreamList() });
}
function sendTo(client: WsClient, data: Record<string, any>): void {
if (client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(data));
}
}
function getStreamList(): (StreamInfo & { hasPassword: boolean })[] {
return [...streams.values()].map(({ broadcasterId: _, password: pw, ...s }) => ({
...s,
hasPassword: pw.length > 0,
}));
}
function endStream(streamId: string, reason: string): void {
const stream = streams.get(streamId);
if (!stream) return;
// Notify all viewers of this stream
for (const c of wsClients.values()) {
if (c.role === 'viewer' && c.streamId === streamId) {
sendTo(c, { type: 'stream_ended', streamId, reason });
c.role = 'idle';
c.streamId = undefined;
}
}
// Reset broadcaster role
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
broadcaster.role = 'idle';
broadcaster.streamId = undefined;
}
streams.delete(streamId);
broadcastStreamStatus();
console.log(`[Streaming] Stream "${stream.title}" ended: ${reason}`);
}
// ── WebSocket Signaling ──
function handleSignalingMessage(client: WsClient, msg: any): void {
switch (msg.type) {
// ── Broadcaster starts a stream ──
case 'start_broadcast': {
// One broadcast per client
if (client.role === 'broadcaster') {
sendTo(client, { type: 'error', code: 'ALREADY_BROADCASTING', message: 'Du streamst bereits.' });
return;
}
const password = String(msg.password || '').trim();
if (!password) {
sendTo(client, { type: 'error', code: 'PASSWORD_REQUIRED', message: 'Passwort ist Pflicht.' });
return;
}
const streamId = crypto.randomUUID();
const name = String(msg.name || 'Anon').slice(0, 32);
const title = String(msg.title || 'Screen Share').slice(0, 64);
client.role = 'broadcaster';
client.name = name;
client.streamId = streamId;
streams.set(streamId, {
id: streamId,
broadcasterId: client.id,
broadcasterName: name,
title,
password,
startedAt: new Date().toISOString(),
viewerCount: 0,
});
sendTo(client, { type: 'broadcast_started', streamId });
broadcastStreamStatus();
// Notify all idle clients
for (const c of wsClients.values()) {
if (c.id !== client.id) {
sendTo(c, { type: 'stream_available', streamId, broadcasterName: name, title });
}
}
console.log(`[Streaming] ${name} started "${title}" (${streamId.slice(0, 8)})`);
break;
}
// ── Broadcaster stops ──
case 'stop_broadcast': {
if (client.role !== 'broadcaster' || !client.streamId) return;
endStream(client.streamId, 'Broadcaster hat den Stream beendet');
break;
}
// ── Viewer joins a stream ──
case 'join_viewer': {
const streamId = msg.streamId;
const stream = streams.get(streamId);
if (!stream) {
sendTo(client, { type: 'error', code: 'NO_STREAM', message: 'Stream nicht gefunden.' });
return;
}
// Validate password
const joinPw = String(msg.password || '').trim();
if (stream.password && joinPw !== stream.password) {
sendTo(client, { type: 'error', code: 'WRONG_PASSWORD', message: 'Falsches Passwort.' });
return;
}
client.role = 'viewer';
client.name = String(msg.name || 'Viewer').slice(0, 32);
client.streamId = streamId;
stream.viewerCount++;
broadcastStreamStatus();
// Tell broadcaster to create an offer for this viewer
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_joined', viewerId: client.id, streamId });
}
break;
}
// ── Viewer leaves ──
case 'leave_viewer': {
if (client.role !== 'viewer' || !client.streamId) return;
const stream = streams.get(client.streamId);
if (stream) {
stream.viewerCount = Math.max(0, stream.viewerCount - 1);
broadcastStreamStatus();
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.streamId });
}
}
client.role = 'idle';
client.streamId = undefined;
break;
}
// ── WebRTC signaling relay ──
case 'offer':
case 'answer':
case 'ice_candidate': {
const target = wsClients.get(msg.targetId);
if (target) {
sendTo(target, { ...msg, fromId: client.id });
}
break;
}
}
}
function handleDisconnect(client: WsClient): void {
if (client.role === 'broadcaster' && client.streamId) {
endStream(client.streamId, 'Broadcaster hat die Verbindung verloren');
} else if (client.role === 'viewer' && client.streamId) {
const stream = streams.get(client.streamId);
if (stream) {
stream.viewerCount = Math.max(0, stream.viewerCount - 1);
broadcastStreamStatus();
const broadcaster = wsClients.get(stream.broadcasterId);
if (broadcaster) {
sendTo(broadcaster, { type: 'viewer_left', viewerId: client.id, streamId: client.streamId });
}
}
}
}
// ── Plugin ──
const streamingPlugin: Plugin = {
name: 'streaming',
version: '1.0.0',
description: 'Screen Streaming',
async init(_ctx) {
console.log('[Streaming] Initialized');
},
registerRoutes(app: express.Application, _ctx: PluginContext) {
app.get('/api/streaming/status', (_req, res) => {
res.json({ streams: getStreamList() });
});
},
getSnapshot(_ctx) {
return {
streaming: { streams: getStreamList() },
};
},
async destroy() {
if (heartbeatInterval) { clearInterval(heartbeatInterval); heartbeatInterval = null; }
if (wss) {
for (const client of wsClients.values()) {
client.ws.close(1001, 'Server shutting down');
}
wsClients.clear();
streams.clear();
wss.close();
wss = null;
}
console.log('[Streaming] Destroyed');
},
};
/** Call after httpServer is created to attach WebSocket signaling */
export function attachWebSocket(server: http.Server): void {
wss = new WebSocketServer({ server, path: '/ws/streaming' });
wss.on('connection', (ws) => {
const clientId = crypto.randomUUID();
const client: WsClient = { id: clientId, ws, role: 'idle', name: '', isAlive: true };
wsClients.set(clientId, client);
sendTo(client, { type: 'welcome', clientId, streams: getStreamList() });
// Pong response marks client as alive
ws.on('pong', () => { client.isAlive = true; });
ws.on('message', (raw) => {
client.isAlive = true; // any message = alive
let msg: any;
try { msg = JSON.parse(raw.toString()); } catch { return; }
handleSignalingMessage(client, msg);
});
ws.on('close', () => {
handleDisconnect(client);
wsClients.delete(clientId);
});
ws.on('error', () => {
handleDisconnect(client);
wsClients.delete(clientId);
});
});
// ── Heartbeat: detect dead connections ──
heartbeatInterval = setInterval(() => {
for (const [id, client] of wsClients) {
if (!client.isAlive) {
// No pong received since last check → dead
console.log(`[Streaming] Heartbeat timeout for ${client.name || id.slice(0, 8)} (role=${client.role})`);
handleDisconnect(client);
wsClients.delete(id);
client.ws.terminate();
continue;
}
client.isAlive = false;
try { client.ws.ping(); } catch { /* ignore */ }
}
}, HEARTBEAT_MS);
console.log('[Streaming] WebSocket signaling attached at /ws/streaming');
}
export default streamingPlugin;