11package blockchain
22
33import (
4+ "bufio"
45 "bytes"
56 "context"
67 "encoding/json"
78 "fmt"
89 "io"
910 "net/http"
11+ "sync"
1012
1113 "github.com/functionland/go-fula/wap/pkg/wifi"
1214 "github.com/libp2p/go-libp2p/core/network"
@@ -20,6 +22,49 @@ const (
2022 GB = 1024 * MB
2123)
2224
25+ type StreamBuffer struct {
26+ mu sync.Mutex
27+ chunks []string
28+ closed bool
29+ err error
30+ }
31+
32+ func NewStreamBuffer () * StreamBuffer {
33+ return & StreamBuffer {
34+ chunks : make ([]string , 0 ),
35+ }
36+ }
37+
38+ func (b * StreamBuffer ) AddChunk (chunk string ) {
39+ b .mu .Lock ()
40+ defer b .mu .Unlock ()
41+ if b .closed {
42+ return
43+ }
44+ b .chunks = append (b .chunks , chunk )
45+ }
46+
47+ func (b * StreamBuffer ) Close (err error ) {
48+ b .mu .Lock ()
49+ defer b .mu .Unlock ()
50+ b .closed = true
51+ b .err = err
52+ }
53+
54+ func (b * StreamBuffer ) GetChunk () (string , error ) {
55+ b .mu .Lock ()
56+ defer b .mu .Unlock ()
57+ if len (b .chunks ) > 0 {
58+ chunk := b .chunks [0 ]
59+ b .chunks = b .chunks [1 :]
60+ return chunk , nil
61+ }
62+ if b .closed {
63+ return "" , b .err
64+ }
65+ return "" , nil // No chunk available yet
66+ }
67+
2368func (bl * FxBlockchain ) BloxFreeSpace (ctx context.Context , to peer.ID ) ([]byte , error ) {
2469 if bl .allowTransientConnection {
2570 ctx = network .WithUseTransient (ctx , "fx.blockchain" )
@@ -245,6 +290,55 @@ func (bl *FxBlockchain) FetchContainerLogs(ctx context.Context, to peer.ID, r wi
245290 }
246291}
247292
293+ func (bl * FxBlockchain ) ChatWithAI (ctx context.Context , to peer.ID , r wifi.ChatWithAIRequest ) (* StreamBuffer , error ) {
294+ if bl .allowTransientConnection {
295+ ctx = network .WithUseTransient (ctx , "fx.blockchain" )
296+ }
297+
298+ // Encode the request into JSON
299+ var buf bytes.Buffer
300+ if err := json .NewEncoder (& buf ).Encode (r ); err != nil {
301+ return nil , fmt .Errorf ("failed to encode request: %v" , err )
302+ }
303+
304+ // Create the HTTP request
305+ req , err := http .NewRequestWithContext (ctx , http .MethodPost , "http://" + to .String ()+ ".invalid/" + actionChatWithAI , & buf )
306+ if err != nil {
307+ return nil , fmt .Errorf ("failed to create request: %v" , err )
308+ }
309+
310+ resp , err := bl .c .Do (req )
311+ if err != nil {
312+ return nil , fmt .Errorf ("failed to send request: %v" , err )
313+ }
314+ if resp .StatusCode != http .StatusOK {
315+ defer resp .Body .Close ()
316+ bodyBytes , _ := io .ReadAll (resp .Body )
317+ return nil , fmt .Errorf ("unexpected response: %d; body: %s" , resp .StatusCode , string (bodyBytes ))
318+ }
319+
320+ buffer := NewStreamBuffer () // Create a new StreamBuffer
321+
322+ go func () {
323+ defer resp .Body .Close ()
324+ reader := bufio .NewReader (resp .Body )
325+ for {
326+ line , err := reader .ReadString ('\n' ) // Read each chunk line by line
327+ if err != nil {
328+ if err == io .EOF {
329+ buffer .Close (nil ) // Close buffer with no error
330+ } else {
331+ buffer .Close (fmt .Errorf ("error reading response stream: %v" , err ))
332+ }
333+ break
334+ }
335+ buffer .AddChunk (line ) // Add each chunk to the buffer
336+ }
337+ }()
338+
339+ return buffer , nil // Return the StreamBuffer
340+ }
341+
248342func (bl * FxBlockchain ) FindBestAndTargetInLogs (ctx context.Context , to peer.ID , r wifi.FindBestAndTargetInLogsRequest ) ([]byte , error ) {
249343
250344 if bl .allowTransientConnection {
@@ -468,6 +562,64 @@ func (bl *FxBlockchain) handleFetchContainerLogs(ctx context.Context, from peer.
468562
469563}
470564
565+ func (bl * FxBlockchain ) handleChatWithAI (ctx context.Context , from peer.ID , w http.ResponseWriter , r * http.Request ) {
566+ log := log .With ("action" , actionChatWithAI , "from" , from )
567+
568+ var req wifi.ChatWithAIRequest
569+ if err := json .NewDecoder (r .Body ).Decode (& req ); err != nil {
570+ log .Error ("failed to decode request: %v" , err )
571+ http .Error (w , "failed to decode request" , http .StatusBadRequest )
572+ return
573+ }
574+
575+ w .Header ().Set ("Content-Type" , "application/json" )
576+ w .WriteHeader (http .StatusAccepted ) // Use StatusAccepted for consistency
577+
578+ flusher , ok := w .(http.Flusher )
579+ if ! ok {
580+ http .Error (w , "Streaming not supported" , http .StatusInternalServerError )
581+ return
582+ }
583+
584+ chunks , err := wifi .FetchAIResponse (ctx , req .AIModel , req .UserMessage )
585+ if err != nil {
586+ log .Error ("error in fetchAIResponse: %v" , err )
587+ http .Error (w , fmt .Sprintf ("Error fetching AI response: %v" , err ), http .StatusInternalServerError )
588+ return
589+ }
590+
591+ log .Debugw ("Streaming AI response started" , "ai_model" , req .AIModel )
592+ defer log .Debugw ("Streaming AI response ended" , "ai_model" , req .AIModel )
593+
594+ for {
595+ select {
596+ case <- ctx .Done (): // Handle client disconnect or cancellation
597+ log .Warn ("client disconnected" )
598+ return
599+ case chunk , ok := <- chunks :
600+ if ! ok {
601+ return // Channel closed
602+ }
603+ response := wifi.ChatWithAIResponse {
604+ Status : true ,
605+ Msg : chunk ,
606+ }
607+
608+ if err := json .NewEncoder (w ).Encode (response ); err != nil {
609+ log .Error ("failed to write response: %v" , err )
610+ errorResponse := wifi.ChatWithAIResponse {
611+ Status : false ,
612+ Msg : fmt .Sprintf ("Error writing response: %v" , err ),
613+ }
614+ json .NewEncoder (w ).Encode (errorResponse ) // Send error as part of stream
615+ flusher .Flush ()
616+ return
617+ }
618+ flusher .Flush ()
619+ }
620+ }
621+ }
622+
471623func (bl * FxBlockchain ) handleFindBestAndTargetInLogs (ctx context.Context , from peer.ID , w http.ResponseWriter , r * http.Request ) {
472624 log := log .With ("action" , actionFindBestAndTargetInLogs , "from" , from )
473625
0 commit comments