Skip to content

Commit 7a3189d

Browse files
committed
feat: Enhance ChatService with streaming message handling and state management
1 parent 3434219 commit 7a3189d

File tree

2 files changed

+267
-3
lines changed

2 files changed

+267
-3
lines changed

packages/realtime/src/chat-service.ts

Lines changed: 240 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { SignalRService } from './signalr-service';
2-
import { SignalRConfig, SignalRMessage, ChatMessage } from './types';
2+
import { SignalRConfig, SignalRMessage, ChatMessage, StreamingMessage, StreamingChatState, StreamingMessageHandler, StreamingStateHandler } from './types';
33

44
export interface ChatServiceConfig extends Omit<SignalRConfig, 'hubUrl'> {
55
baseUrl?: string;
@@ -10,6 +10,13 @@ export class ChatService {
1010
private signalRService: SignalRService;
1111
private messageQueue: string[] = [];
1212
private isProcessing = false;
13+
private streamingState: StreamingChatState = {
14+
isStreaming: false,
15+
currentMessage: ''
16+
};
17+
private streamingHandlers: StreamingMessageHandler[] = [];
18+
private streamingStateHandlers: StreamingStateHandler[] = [];
19+
private regularMessageHandlers: ((message: any) => void)[] = [];
1320

1421
constructor(config: ChatServiceConfig) {
1522
const baseUrl = config.baseUrl || (typeof window !== 'undefined' ? window.location.origin : 'https://mixcore.net');
@@ -32,8 +39,209 @@ export class ChatService {
3239
}
3340

3441
private handleIncomingMessage(message: SignalRMessage): void {
35-
// Override this method in subclasses or provide callbacks for custom handling
36-
console.log('Received SignalR message:', message);
42+
console.log('Raw incoming message:', message);
43+
44+
// Check if the message itself is streaming format
45+
if (this.isStreamingMessage(message)) {
46+
console.log('Detected streaming message');
47+
this.handleStreamingMessage(message);
48+
return;
49+
}
50+
51+
// Handle as regular message only if not currently streaming
52+
if (!this.streamingState.isStreaming && message?.data?.response) {
53+
console.log('Processing as regular message');
54+
55+
let content = '';
56+
if (typeof message.data.response === 'string') {
57+
content = message.data.response;
58+
} else if (typeof message.data.response === 'object' && message.data.response !== null) {
59+
// Handle complex SignalR response structures
60+
const responseObj = message.data.response as any;
61+
if (responseObj.content) {
62+
content = responseObj.content;
63+
} else if (responseObj.data) {
64+
content = responseObj.data;
65+
} else if (responseObj.message) {
66+
content = responseObj.message;
67+
} else {
68+
// Fallback - try to extract meaningful text from the object
69+
content = JSON.stringify(message.data.response, null, 2);
70+
}
71+
} else {
72+
content = String(message.data.response);
73+
}
74+
75+
// Only add if we have actual content and it's not streaming format
76+
if (content.trim() && !content.includes('"type":1') && !content.includes('"type":3')) {
77+
// Emit as regular message for the UI to handle
78+
this.emitRegularMessage({
79+
id: Date.now().toString(),
80+
content: content,
81+
role: "assistant" as const,
82+
timestamp: new Date().toISOString(),
83+
});
84+
}
85+
} else {
86+
console.log('Message ignored - either streaming active or no response content');
87+
}
88+
}
89+
90+
private isStreamingMessage(message: any): boolean {
91+
// Check direct streaming format
92+
if (typeof message === 'object' &&
93+
typeof message.type === 'number' &&
94+
typeof message.target === 'string' &&
95+
Array.isArray(message.arguments)) {
96+
return true;
97+
}
98+
99+
// Check if it's a SignalR message containing streaming data
100+
if (message?.data?.response && typeof message.data.response === 'string') {
101+
// Check if response contains streaming JSON pattern
102+
const response = message.data.response;
103+
return response.includes('"type":1') && response.includes('"target":"receive_message"') ||
104+
response.includes('"type":3');
105+
}
106+
107+
return false;
108+
}
109+
110+
private handleStreamingMessage(rawMessage: any): void {
111+
try {
112+
let streamingMessage: StreamingMessage;
113+
114+
// If it's wrapped in SignalR format, extract the streaming data
115+
if (rawMessage?.data?.response && typeof rawMessage.data.response === 'string') {
116+
const response = rawMessage.data.response;
117+
console.log('Parsing streaming response:', response);
118+
119+
// Split by }{ pattern to handle concatenated JSON objects
120+
const jsonChunks = response.split('}{').map((chunk, index, array) => {
121+
if (index === 0 && array.length > 1) {
122+
return chunk + '}';
123+
} else if (index === array.length - 1 && array.length > 1) {
124+
return '{' + chunk;
125+
} else if (array.length > 1) {
126+
return '{' + chunk + '}';
127+
}
128+
return chunk;
129+
});
130+
131+
for (const jsonChunk of jsonChunks) {
132+
try {
133+
const parsed = JSON.parse(jsonChunk);
134+
if (parsed.type === 1 || parsed.type === 3) {
135+
console.log('Processing streaming chunk:', parsed);
136+
this.processStreamingMessage(parsed);
137+
}
138+
} catch (e) {
139+
console.warn('Failed to parse streaming chunk:', jsonChunk, e);
140+
}
141+
}
142+
return;
143+
}
144+
145+
// Direct streaming message format
146+
streamingMessage = rawMessage as StreamingMessage;
147+
this.processStreamingMessage(streamingMessage);
148+
149+
} catch (error) {
150+
console.error('Error handling streaming message:', error);
151+
}
152+
}
153+
154+
private processStreamingMessage(streamingMessage: StreamingMessage): void {
155+
console.log('Processing streaming message:', streamingMessage);
156+
157+
// Handle streaming data messages (type 1)
158+
if (streamingMessage.type === 1 && streamingMessage.target === 'receive_message') {
159+
for (const arg of streamingMessage.arguments) {
160+
if (arg.action === 'NewStreamingMessage' && arg.data.isSuccess) {
161+
console.log('Appending chunk:', arg.data.response);
162+
this.appendStreamingChunk(arg.data.response);
163+
}
164+
}
165+
}
166+
167+
// Handle completion messages (type 3)
168+
if (streamingMessage.type === 3) {
169+
console.log('Completing streaming');
170+
this.completeStreaming();
171+
}
172+
}
173+
174+
private appendStreamingChunk(chunk: string): void {
175+
if (!this.streamingState.isStreaming) {
176+
this.streamingState.isStreaming = true;
177+
this.streamingState.currentMessage = '';
178+
this.notifyStreamingStateChange();
179+
}
180+
181+
this.streamingState.currentMessage += chunk;
182+
183+
// Notify streaming handlers
184+
this.streamingHandlers.forEach(handler => {
185+
try {
186+
handler(chunk, false);
187+
} catch (error) {
188+
console.error('Error in streaming handler:', error);
189+
}
190+
});
191+
}
192+
193+
private completeStreaming(): void {
194+
if (this.streamingState.isStreaming) {
195+
// Notify completion
196+
this.streamingHandlers.forEach(handler => {
197+
try {
198+
handler('', true);
199+
} catch (error) {
200+
console.error('Error in streaming completion handler:', error);
201+
}
202+
});
203+
204+
// Reset streaming state
205+
this.streamingState.isStreaming = false;
206+
const finalMessage = this.streamingState.currentMessage;
207+
this.streamingState.currentMessage = '';
208+
209+
this.notifyStreamingStateChange();
210+
211+
console.log('Streaming completed. Final message:', finalMessage);
212+
}
213+
}
214+
215+
private notifyStreamingStateChange(): void {
216+
this.streamingStateHandlers.forEach(handler => {
217+
try {
218+
handler({ ...this.streamingState });
219+
} catch (error) {
220+
console.error('Error in streaming state handler:', error);
221+
}
222+
});
223+
}
224+
225+
private emitRegularMessage(message: any): void {
226+
console.log('Emitting regular message:', message);
227+
this.regularMessageHandlers.forEach(handler => {
228+
try {
229+
handler(message);
230+
} catch (error) {
231+
console.error('Error in regular message handler:', error);
232+
}
233+
});
234+
}
235+
236+
public onRegularMessage(handler: (message: any) => void): void {
237+
this.regularMessageHandlers.push(handler);
238+
}
239+
240+
public offRegularMessage(handler: (message: any) => void): void {
241+
const index = this.regularMessageHandlers.indexOf(handler);
242+
if (index > -1) {
243+
this.regularMessageHandlers.splice(index, 1);
244+
}
37245
}
38246

39247
public async start(): Promise<void> {
@@ -69,6 +277,32 @@ export class ChatService {
69277
this.signalRService.offMessage('receive_message', handler);
70278
}
71279

280+
public onStreaming(handler: StreamingMessageHandler): void {
281+
this.streamingHandlers.push(handler);
282+
}
283+
284+
public offStreaming(handler: StreamingMessageHandler): void {
285+
const index = this.streamingHandlers.indexOf(handler);
286+
if (index > -1) {
287+
this.streamingHandlers.splice(index, 1);
288+
}
289+
}
290+
291+
public onStreamingStateChange(handler: StreamingStateHandler): void {
292+
this.streamingStateHandlers.push(handler);
293+
}
294+
295+
public offStreamingStateChange(handler: StreamingStateHandler): void {
296+
const index = this.streamingStateHandlers.indexOf(handler);
297+
if (index > -1) {
298+
this.streamingStateHandlers.splice(index, 1);
299+
}
300+
}
301+
302+
public getStreamingState(): StreamingChatState {
303+
return { ...this.streamingState };
304+
}
305+
72306
public onConnectionStateChange(handler: (state: string) => void): void {
73307
this.signalRService.onConnectionStateChange(handler);
74308
}
@@ -94,6 +328,9 @@ export class ChatService {
94328
}
95329

96330
public dispose(): void {
331+
this.streamingHandlers.length = 0;
332+
this.streamingStateHandlers.length = 0;
333+
this.regularMessageHandlers.length = 0;
97334
this.signalRService.dispose();
98335
}
99336
}

packages/realtime/src/types.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,33 @@ export interface SignalRMessage {
1313
createdDateTime: string;
1414
}
1515

16+
export interface StreamingMessage {
17+
type: number;
18+
target: string;
19+
arguments: StreamingMessageArgument[];
20+
invocationId?: string;
21+
result?: any;
22+
}
23+
24+
export interface StreamingMessageArgument {
25+
action: string;
26+
data: {
27+
isSuccess: boolean;
28+
response: string;
29+
result: string;
30+
};
31+
type: string;
32+
}
33+
34+
export interface StreamingChatState {
35+
isStreaming: boolean;
36+
currentMessage: string;
37+
messageId?: string;
38+
}
39+
40+
export type StreamingMessageHandler = (chunk: string, isComplete: boolean) => void;
41+
export type StreamingStateHandler = (state: StreamingChatState) => void;
42+
1643
export interface SignalRConfig {
1744
hubUrl: string;
1845
accessTokenFactory: () => string | null;

0 commit comments

Comments
 (0)