Files
NowChess-Frontend/src/app/services/stream-handler.service.ts
T
2026-04-22 08:28:16 +02:00

123 lines
3.5 KiB
TypeScript

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs';
import { GameStreamEvent, ErrorEvent } from '../models/game.models';
@Injectable({ providedIn: 'root' })
export class StreamHandlerService {
createGameStream(wsUrl: string, fallbackUrl: string, gameId: string): Observable<GameStreamEvent> {
return new Observable<GameStreamEvent>((observer) => {
const ws = new WebSocket(wsUrl);
const abortController = new AbortController();
let connected = false;
let fallbackActive = false;
const parseEvent = (raw: string): GameStreamEvent | null => {
if (!raw.trim()) {
return null;
}
try {
return JSON.parse(raw) as GameStreamEvent;
} catch {
return null;
}
};
const emitErrorEvent = (message: string): void => {
const errorEvent: ErrorEvent = {
type: 'error',
error: { code: 'STREAM_ERROR', message }
};
observer.next(errorEvent);
};
const startNdjsonFallback = async (): Promise<void> => {
if (fallbackActive) {
return;
}
fallbackActive = true;
console.log(`[StreamHandler] NDJSON fallback started for ${gameId}, URL:`, fallbackUrl);
try {
const response = await fetch(fallbackUrl, {
headers: { Accept: 'application/x-ndjson' },
signal: abortController.signal
});
if (!response.ok || !response.body) {
console.error(`[StreamHandler] NDJSON fetch failed: HTTP ${response.status}`);
emitErrorEvent(`Unable to open stream: HTTP ${response.status}`);
observer.complete();
return;
}
console.log(`[StreamHandler] NDJSON stream connected for ${gameId}`);
const reader = response.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { value, done } = await reader.read();
if (done) {
break;
}
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n');
buffer = lines.pop() ?? '';
for (const line of lines) {
const event = parseEvent(line);
if (event) {
observer.next(event);
}
}
}
observer.complete();
} catch (error) {
if ((error as Error).name !== 'AbortError') {
emitErrorEvent((error as Error).message);
observer.error(error);
}
}
};
ws.onopen = () => {
connected = true;
};
ws.onmessage = (message) => {
const payload = typeof message.data === 'string' ? message.data : '';
const event = parseEvent(payload);
if (event) {
observer.next(event);
}
};
ws.onerror = (error) => {
console.warn(`[StreamHandler] WebSocket error for ${gameId}, attempting NDJSON fallback:`, error);
if (!connected) {
void startNdjsonFallback();
}
};
ws.onclose = () => {
console.warn(`[StreamHandler] WebSocket closed for ${gameId}, connected=${connected}`);
if (!connected) {
console.log(`[StreamHandler] Starting NDJSON fallback for ${gameId}`);
void startNdjsonFallback();
} else {
observer.complete();
}
};
return () => {
abortController.abort();
ws.close();
};
});
}
}