Skip to content

Commit 22116b9

Browse files
feat: add WebSocket SSE tunnel to evict corporate proxy caches and enable smooth real-time streaming
- Added optional module tools/server/webui/src/lib/utils/websocket-tunnel.ts implementing a minimal WebSocket tunnel protocol - Supports both 'json' (one-shot) and 'sse' (streaming) modes with async queueing, abort handling, and error propagation - Integrated into ChatService to optionally route streaming completions through the tunnel when configured - Added new setting 'sseWebsocketProxyUrl' (default: empty; no change to app behavior unless set) - Example value: 'wss://www.example.com/tunnel?transport=websocket' - Purpose: evicts proxy caches and avoids buffering delays from corporate middleboxes, allowing smooth, continuous token streaming in the UI
1 parent 579ba40 commit 22116b9

File tree

3 files changed

+714
-30
lines changed

3 files changed

+714
-30
lines changed

tools/server/webui/src/lib/constants/settings-config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ export const SETTING_CONFIG_DEFAULT: Record<string, string | number | boolean> =
3535
max_tokens: -1,
3636
custom: '', // custom json-stringified object
3737
// experimental features
38-
pyInterpreterEnabled: false
38+
pyInterpreterEnabled: false,
39+
sseWebsocketProxyUrl: ''
3940
};
4041

4142
export const SETTING_CONFIG_INFO: Record<string, string> = {

tools/server/webui/src/lib/services/chat.ts

Lines changed: 141 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import { config } from '$lib/stores/settings.svelte';
22
import { selectedModelName } from '$lib/stores/models.svelte';
3+
import { WebSocketTunnelClient, type TunnelSSEEvent } from '$lib/utils/websocket-tunnel';
34
import { slotsService } from './slots';
5+
46
/**
57
* ChatService - Low-level API communication layer for llama.cpp server interactions
68
*
@@ -172,14 +174,38 @@ export class ChatService {
172174

173175
try {
174176
const apiKey = currentConfig.apiKey?.toString().trim();
177+
const headers = {
178+
'Content-Type': 'application/json',
179+
...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {})
180+
};
181+
const requestPayload = JSON.stringify(requestBody);
182+
const tunnelUrl = currentConfig.sseWebsocketProxyUrl?.toString().trim();
183+
184+
if (stream && tunnelUrl) {
185+
const tunnelClient = new WebSocketTunnelClient(tunnelUrl);
186+
const targetUrl = new URL('./v1/chat/completions', window.location.href).toString();
187+
188+
await this.handleStreamResponse(
189+
tunnelClient.stream({
190+
targetUrl,
191+
method: 'POST',
192+
headers,
193+
body: requestPayload,
194+
abortSignal: this.abortController.signal
195+
}),
196+
onChunk,
197+
onComplete,
198+
onError,
199+
onReasoningChunk,
200+
onModel
201+
);
202+
return;
203+
}
175204

176205
const response = await fetch(`./v1/chat/completions`, {
177206
method: 'POST',
178-
headers: {
179-
'Content-Type': 'application/json',
180-
...(apiKey ? { Authorization: `Bearer ${apiKey}` } : {})
181-
},
182-
body: JSON.stringify(requestBody),
207+
headers,
208+
body: requestPayload,
183209
signal: this.abortController.signal
184210
});
185211

@@ -201,9 +227,9 @@ export class ChatService {
201227
onReasoningChunk,
202228
onModel
203229
);
204-
} else {
205-
return this.handleNonStreamResponse(response, onComplete, onError, onModel);
206230
}
231+
232+
return this.handleNonStreamResponse(response, onComplete, onError, onModel);
207233
} catch (error) {
208234
if (error instanceof Error && error.name === 'AbortError') {
209235
console.log('Chat completion request was aborted');
@@ -241,9 +267,9 @@ export class ChatService {
241267

242268
/**
243269
* Handles streaming response from the chat completion API.
244-
* Processes server-sent events and extracts content chunks from the stream.
270+
* Processes server-sent events received through the WebSocket tunnel or direct fetch responses.
245271
*
246-
* @param response - The fetch Response object containing the streaming data
272+
* @param source - Async generator yielding SSE events from the API or a fetch Response
247273
* @param onChunk - Optional callback invoked for each content chunk received
248274
* @param onComplete - Optional callback invoked when the stream is complete with full response
249275
* @param onError - Optional callback invoked if an error occurs during streaming
@@ -252,7 +278,7 @@ export class ChatService {
252278
* @throws {Error} if the stream cannot be read or parsed
253279
*/
254280
private async handleStreamResponse(
255-
response: Response,
281+
source: AsyncGenerator<TunnelSSEEvent> | Response,
256282
onChunk?: (chunk: string) => void,
257283
onComplete?: (
258284
response: string,
@@ -263,32 +289,38 @@ export class ChatService {
263289
onReasoningChunk?: (chunk: string) => void,
264290
onModel?: (model: string) => void
265291
): Promise<void> {
266-
const reader = response.body?.getReader();
267-
268-
if (!reader) {
269-
throw new Error('No response body');
270-
}
271-
272-
const decoder = new TextDecoder();
273292
let aggregatedContent = '';
274293
let fullReasoningContent = '';
275294
let hasReceivedData = false;
276295
let lastTimings: ChatMessageTimings | undefined;
277296
let streamFinished = false;
278297
let modelEmitted = false;
279298

280-
try {
281-
let chunk = '';
282-
while (true) {
283-
const { done, value } = await reader.read();
284-
if (done) break;
299+
if (source instanceof Response) {
300+
const response = source;
301+
const reader = response.body?.getReader();
285302

286-
chunk += decoder.decode(value, { stream: true });
287-
const lines = chunk.split('\n');
288-
chunk = lines.pop() || '';
303+
if (!reader) {
304+
throw new Error('No response body');
305+
}
306+
307+
const decoder = new TextDecoder();
308+
309+
try {
310+
let chunk = '';
311+
while (true) {
312+
const { done, value } = await reader.read();
313+
if (done) break;
314+
315+
chunk += decoder.decode(value, { stream: true });
316+
const lines = chunk.split('\n');
317+
chunk = lines.pop() || '';
318+
319+
for (const line of lines) {
320+
if (!line.startsWith('data: ')) {
321+
continue;
322+
}
289323

290-
for (const line of lines) {
291-
if (line.startsWith('data: ')) {
292324
const data = line.slice(6);
293325
if (data === '[DONE]') {
294326
streamFinished = true;
@@ -333,6 +365,81 @@ export class ChatService {
333365
}
334366
}
335367
}
368+
369+
if (streamFinished) {
370+
if (!hasReceivedData && aggregatedContent.length === 0) {
371+
const noResponseError = new Error(
372+
'No response received from server. Please try again.'
373+
);
374+
throw noResponseError;
375+
}
376+
377+
onComplete?.(aggregatedContent, fullReasoningContent || undefined, lastTimings);
378+
}
379+
} catch (error) {
380+
const err = error instanceof Error ? error : new Error('Stream error');
381+
382+
onError?.(err);
383+
384+
throw err;
385+
} finally {
386+
reader.releaseLock();
387+
}
388+
389+
return;
390+
}
391+
392+
const stream = source;
393+
394+
try {
395+
for await (const event of stream) {
396+
const payload = event?.data?.trim();
397+
398+
if (!payload) {
399+
continue;
400+
}
401+
402+
if (payload === '[DONE]') {
403+
streamFinished = true;
404+
continue;
405+
}
406+
407+
try {
408+
const parsed: ApiChatCompletionStreamChunk = JSON.parse(payload);
409+
410+
const chunkModel = this.extractModelName(parsed);
411+
if (chunkModel && !modelEmitted) {
412+
modelEmitted = true;
413+
onModel?.(chunkModel);
414+
}
415+
416+
const content = parsed.choices[0]?.delta?.content;
417+
const reasoningContent = parsed.choices[0]?.delta?.reasoning_content;
418+
const timings = parsed.timings;
419+
const promptProgress = parsed.prompt_progress;
420+
421+
if (timings || promptProgress) {
422+
this.updateProcessingState(timings, promptProgress);
423+
424+
if (timings) {
425+
lastTimings = timings;
426+
}
427+
}
428+
429+
if (content) {
430+
hasReceivedData = true;
431+
aggregatedContent += content;
432+
onChunk?.(content);
433+
}
434+
435+
if (reasoningContent) {
436+
hasReceivedData = true;
437+
fullReasoningContent += reasoningContent;
438+
onReasoningChunk?.(reasoningContent);
439+
}
440+
} catch (e) {
441+
console.error('Error parsing JSON chunk:', e);
442+
}
336443
}
337444

338445
if (streamFinished) {
@@ -350,10 +457,15 @@ export class ChatService {
350457

351458
throw err;
352459
} finally {
353-
reader.releaseLock();
460+
if (typeof stream.return === 'function') {
461+
try {
462+
await stream.return();
463+
} catch {
464+
/* ignore */
465+
}
466+
}
354467
}
355468
}
356-
357469
/**
358470
* Handles non-streaming response from the chat completion API.
359471
* Parses the JSON response and extracts the generated content.

0 commit comments

Comments
 (0)