fix: NCWF-2 bugs and desing fixes (#7)
Co-authored-by: Lala, Shahd <Shahd.Lala@sybit.de> Reviewed-on: #7
This commit was merged in pull request #7.
This commit is contained in:
@@ -2,26 +2,14 @@ import { Injectable } from '@angular/core';
|
||||
import { Observable } from 'rxjs';
|
||||
import { GameStreamEvent, ErrorEvent } from '../models/game.models';
|
||||
|
||||
const WS_CONNECT_TIMEOUT_MS = 3000;
|
||||
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class StreamHandlerService {
|
||||
createGameStream(wsUrl: string, fallbackUrl: string, gameId: string): Observable<GameStreamEvent> {
|
||||
createGameStream(wsUrl: string, gameId: string): Observable<GameStreamEvent> {
|
||||
return new Observable<GameStreamEvent>((observer) => {
|
||||
const ws = new WebSocket(wsUrl);
|
||||
const abortController = new AbortController();
|
||||
let connected = false;
|
||||
let fallbackActive = false;
|
||||
|
||||
const parseEvent = (raw: string): GameStreamEvent | null => {
|
||||
if (!raw.trim()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
try {
|
||||
return JSON.parse(raw) as GameStreamEvent;
|
||||
} catch {
|
||||
return null;
|
||||
}
|
||||
};
|
||||
|
||||
const emitErrorEvent = (message: string): void => {
|
||||
const errorEvent: ErrorEvent = {
|
||||
@@ -31,67 +19,18 @@ export class StreamHandlerService {
|
||||
observer.next(errorEvent);
|
||||
};
|
||||
|
||||
const startNdjsonFallback = async (): Promise<void> => {
|
||||
if (fallbackActive) {
|
||||
return;
|
||||
}
|
||||
|
||||
fallbackActive = true;
|
||||
console.log(`[StreamHandler] NDJSON fallback started for ${gameId}, URL:`, fallbackUrl);
|
||||
|
||||
try {
|
||||
const response = await fetch(fallbackUrl, {
|
||||
headers: { Accept: 'application/x-ndjson' },
|
||||
signal: abortController.signal
|
||||
});
|
||||
|
||||
if (!response.ok || !response.body) {
|
||||
console.error(`[StreamHandler] NDJSON fetch failed: HTTP ${response.status}`);
|
||||
emitErrorEvent(`Unable to open stream: HTTP ${response.status}`);
|
||||
observer.complete();
|
||||
return;
|
||||
}
|
||||
|
||||
console.log(`[StreamHandler] NDJSON stream connected for ${gameId}`);
|
||||
const reader = response.body.getReader();
|
||||
const decoder = new TextDecoder();
|
||||
let buffer = '';
|
||||
|
||||
while (true) {
|
||||
const { value, done } = await reader.read();
|
||||
if (done) {
|
||||
break;
|
||||
}
|
||||
|
||||
buffer += decoder.decode(value, { stream: true });
|
||||
const lines = buffer.split('\n');
|
||||
buffer = lines.pop() ?? '';
|
||||
|
||||
for (const line of lines) {
|
||||
const event = parseEvent(line);
|
||||
if (event) {
|
||||
observer.next(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
observer.complete();
|
||||
} catch (error) {
|
||||
if ((error as Error).name !== 'AbortError') {
|
||||
emitErrorEvent((error as Error).message);
|
||||
observer.error(error);
|
||||
}
|
||||
}
|
||||
const failAndComplete = (reason: string): void => {
|
||||
console.warn(`[StreamHandler] WebSocket failed for ${gameId}: ${reason}`);
|
||||
emitErrorEvent(reason);
|
||||
observer.complete();
|
||||
};
|
||||
|
||||
// Set timeout to fallback if WebSocket doesn't connect quickly
|
||||
const connectionTimeoutId = setTimeout(() => {
|
||||
if (!connected && !fallbackActive) {
|
||||
console.warn(`[StreamHandler] WebSocket timeout for ${gameId}, attempting NDJSON fallback`);
|
||||
if (!connected) {
|
||||
ws.close();
|
||||
void startNdjsonFallback();
|
||||
failAndComplete('WebSocket connection timed out — falling back to polling');
|
||||
}
|
||||
}, 3000);
|
||||
}, WS_CONNECT_TIMEOUT_MS);
|
||||
|
||||
ws.onopen = () => {
|
||||
connected = true;
|
||||
@@ -101,35 +40,30 @@ export class StreamHandlerService {
|
||||
|
||||
ws.onmessage = (message) => {
|
||||
const payload = typeof message.data === 'string' ? message.data : '';
|
||||
const event = parseEvent(payload);
|
||||
if (event) {
|
||||
if (!payload.trim()) return;
|
||||
try {
|
||||
const event = JSON.parse(payload) as GameStreamEvent;
|
||||
observer.next(event);
|
||||
} catch {
|
||||
// ignore malformed frames
|
||||
}
|
||||
};
|
||||
|
||||
ws.onerror = (error) => {
|
||||
console.warn(`[StreamHandler] WebSocket error for ${gameId}:`, error);
|
||||
ws.onerror = () => {
|
||||
clearTimeout(connectionTimeoutId);
|
||||
if (!connected && !fallbackActive) {
|
||||
void startNdjsonFallback();
|
||||
if (!connected) {
|
||||
failAndComplete('WebSocket connection error — falling back to polling');
|
||||
}
|
||||
};
|
||||
|
||||
ws.onclose = () => {
|
||||
clearTimeout(connectionTimeoutId);
|
||||
console.warn(`[StreamHandler] WebSocket closed for ${gameId}, connected=${connected}`);
|
||||
if (connected) {
|
||||
// Connection was established but closed, stream is complete
|
||||
observer.complete();
|
||||
} else if (!fallbackActive) {
|
||||
// Connection never established, try fallback
|
||||
console.log(`[StreamHandler] Starting NDJSON fallback for ${gameId}`);
|
||||
void startNdjsonFallback();
|
||||
}
|
||||
};
|
||||
|
||||
return () => {
|
||||
abortController.abort();
|
||||
ws.close();
|
||||
};
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user