1+ import 'dart:async' ;
12import 'dart:convert' ;
23import 'package:agixt/models/agixt/auth/auth.dart' ;
34import 'package:agixt/models/agixt/calendar.dart' ;
@@ -12,6 +13,7 @@ import 'package:agixt/services/secure_storage_service.dart';
1213import 'package:agixt/services/location_service.dart' ; // Import LocationService
1314import 'package:agixt/services/client_commands_service.dart' ; // Import ClientSideTools
1415import 'package:device_calendar/device_calendar.dart' ;
16+ import 'package:flutter/foundation.dart' ;
1517import 'package:flutter/material.dart' ;
1618import 'package:geolocator/geolocator.dart' ; // Import Geolocator
1719import 'package:hive/hive.dart' ;
@@ -232,6 +234,156 @@ class AGiXTChatWidget implements AGiXTWidget {
232234 }
233235 }
234236
237+ /// Send chat message with streaming response
238+ /// Returns a stream of response chunks as they arrive
239+ Stream <String > sendChatMessageStreaming (String message) async * {
240+ try {
241+ final jwt = await AuthService .getJwt ();
242+ if (jwt == null ) {
243+ yield "Please login to use AGiXT chat." ;
244+ return ;
245+ }
246+
247+ final conversationId = await _getCurrentConversationId ();
248+ debugPrint ('Using conversation ID for streaming chat: $conversationId ' );
249+
250+ // Build context data with timeout
251+ String contextData = '' ;
252+ try {
253+ contextData = await _buildContextData ().timeout (
254+ const Duration (seconds: 3 ),
255+ onTimeout: () {
256+ debugPrint ('Context building timed out' );
257+ return '' ;
258+ },
259+ );
260+ } catch (e) {
261+ debugPrint ('Error building context data: $e ' );
262+ }
263+
264+ // Get available tools
265+ final availableTools = await ClientSideTools .getToolDefinitions ();
266+
267+ final Map <String , dynamic > requestBody = {
268+ "model" : await _getAgentName (),
269+ "messages" : [
270+ {
271+ "role" : "user" ,
272+ "content" : message,
273+ if (contextData.isNotEmpty) "context" : contextData,
274+ },
275+ ],
276+ "user" : conversationId,
277+ "stream" : true , // Enable streaming!
278+ if (availableTools.isNotEmpty) "tools" : availableTools,
279+ if (availableTools.isNotEmpty) "tool_choice" : "auto" ,
280+ };
281+
282+ // Create streaming request
283+ final client = http.Client ();
284+ final request = http.Request (
285+ 'POST' ,
286+ Uri .parse ('${AuthService .serverUrl }/v1/chat/completions' ),
287+ );
288+ request.headers.addAll ({
289+ 'Content-Type' : 'application/json' ,
290+ 'Authorization' : 'Bearer $jwt ' ,
291+ 'Accept' : 'text/event-stream' ,
292+ });
293+ request.body = jsonEncode (requestBody);
294+
295+ debugPrint ('Sending streaming chat request...' );
296+ final streamedResponse = await client.send (request);
297+
298+ if (streamedResponse.statusCode == 200 ) {
299+ String buffer = '' ;
300+ String fullResponse = '' ;
301+ String ? newConversationId;
302+
303+ await for (final chunk in streamedResponse.stream.transform (utf8.decoder)) {
304+ buffer += chunk;
305+
306+ // Process complete SSE events (lines starting with "data: ")
307+ while (buffer.contains ('\n ' )) {
308+ final newlineIndex = buffer.indexOf ('\n ' );
309+ final line = buffer.substring (0 , newlineIndex).trim ();
310+ buffer = buffer.substring (newlineIndex + 1 );
311+
312+ if (line.isEmpty) continue ;
313+ if (! line.startsWith ('data: ' )) continue ;
314+
315+ final data = line.substring (6 ); // Remove "data: " prefix
316+
317+ // Check for stream end
318+ if (data == '[DONE]' ) {
319+ debugPrint ('Stream complete' );
320+ continue ;
321+ }
322+
323+ try {
324+ final jsonData = jsonDecode (data);
325+
326+ // Extract conversation ID from first chunk
327+ if (newConversationId == null && jsonData['id' ] != null ) {
328+ newConversationId = jsonData['id' ].toString ();
329+ debugPrint ('Got conversation ID from stream: $newConversationId ' );
330+ }
331+
332+ // Extract content delta
333+ if (jsonData['choices' ] != null &&
334+ jsonData['choices' ].isNotEmpty) {
335+ final delta = jsonData['choices' ][0 ]['delta' ];
336+ if (delta != null && delta['content' ] != null ) {
337+ final content = delta['content' ].toString ();
338+ fullResponse += content;
339+ yield content; // Yield each chunk as it arrives
340+ }
341+ }
342+ } catch (e) {
343+ // Skip malformed JSON chunks
344+ debugPrint ('Error parsing SSE chunk: $e ' );
345+ }
346+ }
347+ }
348+
349+ // Save conversation ID after stream completes
350+ if (newConversationId != null && newConversationId != '-' ) {
351+ final cookieManager = CookieManager ();
352+ await cookieManager.saveAgixtConversationId (newConversationId);
353+ debugPrint ('Saved conversation ID: $newConversationId ' );
354+ _navigateToConversation (newConversationId, jwt);
355+ }
356+
357+ // Save the full interaction
358+ if (fullResponse.isNotEmpty) {
359+ await _saveInteraction (message, fullResponse);
360+ }
361+
362+ client.close ();
363+ } else if (streamedResponse.statusCode == 401 ) {
364+ await SessionManager .clearSession ();
365+ yield "Authentication expired. Please login again." ;
366+ client.close ();
367+ } else {
368+ debugPrint ('Streaming API Error: ${streamedResponse .statusCode }' );
369+ yield "Error: Unable to get response (${streamedResponse .statusCode })" ;
370+ client.close ();
371+ }
372+ } catch (e) {
373+ debugPrint ('Streaming chat error: $e ' );
374+ yield "An error occurred while connecting to AGiXT." ;
375+ }
376+ }
377+
378+ /// Convenience method to collect full streaming response as a single string
379+ Future <String ?> sendChatMessageStreamingFull (String message) async {
380+ final buffer = StringBuffer ();
381+ await for (final chunk in sendChatMessageStreaming (message)) {
382+ buffer.write (chunk);
383+ }
384+ return buffer.isEmpty ? null : buffer.toString ();
385+ }
386+
235387 // Navigate to the conversation in the WebView
236388 Future <void > _navigateToConversation (
237389 String conversationId,
0 commit comments