Skip to content

Commit 306d4b9

Browse files
authored
Ai chat stream (#225)
* initial version * Update properties.go * Update blox.go * Update blox.go * Update blox.go * Update blox.go * corrected chat * Update blox.go * mod tidy * Update blox.go * Update blox.go * Update blockchain.go * u
1 parent 400c7c6 commit 306d4b9

File tree

9 files changed

+467
-39
lines changed

9 files changed

+467
-39
lines changed

blockchain/blockchain.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ func (bl *FxBlockchain) serve(w http.ResponseWriter, r *http.Request) {
533533
actionFetchContainerLogs: func(from peer.ID, w http.ResponseWriter, r *http.Request) {
534534
bl.handleFetchContainerLogs(r.Context(), from, w, r)
535535
},
536+
actionChatWithAI: func(from peer.ID, w http.ResponseWriter, r *http.Request) {
537+
bl.handleChatWithAI(r.Context(), from, w, r)
538+
},
536539
actionFindBestAndTargetInLogs: func(from peer.ID, w http.ResponseWriter, r *http.Request) {
537540
bl.handleFindBestAndTargetInLogs(r.Context(), from, w, r)
538541
},
@@ -981,7 +984,7 @@ func (bl *FxBlockchain) authorized(pid peer.ID, action string) bool {
981984
switch action {
982985
case actionReplicateInPool:
983986
return (bl.authorizer == bl.h.ID() || bl.authorizer == "")
984-
case actionBloxFreeSpace, actionAccountFund, actionManifestBatchUpload, actionAssetsBalance, actionGetDatastoreSize, actionGetFolderSize, actionFindBestAndTargetInLogs, actionFetchContainerLogs, actionEraseBlData, actionWifiRemoveall, actionReboot, actionPartition, actionDeleteWifi, actionDisconnectWifi, actionDeleteFulaConfig, actionGetAccount, actionSeeded, actionAccountExists, actionPoolCreate, actionPoolJoin, actionPoolCancelJoin, actionPoolRequests, actionPoolList, actionPoolVote, actionPoolLeave, actionManifestUpload, actionManifestStore, actionManifestAvailable, actionManifestRemove, actionManifestRemoveStorer, actionManifestRemoveStored, actionTransferToMumbai, actionListPlugins, actionListActivePlugins, actionInstallPlugin, actionUninstallPlugin, actionGetInstallStatus, actionGetInstallOutput, actionUpdatePlugin:
987+
case actionBloxFreeSpace, actionAccountFund, actionManifestBatchUpload, actionAssetsBalance, actionGetDatastoreSize, actionGetFolderSize, actionFindBestAndTargetInLogs, actionFetchContainerLogs, actionChatWithAI, actionEraseBlData, actionWifiRemoveall, actionReboot, actionPartition, actionDeleteWifi, actionDisconnectWifi, actionDeleteFulaConfig, actionGetAccount, actionSeeded, actionAccountExists, actionPoolCreate, actionPoolJoin, actionPoolCancelJoin, actionPoolRequests, actionPoolList, actionPoolVote, actionPoolLeave, actionManifestUpload, actionManifestStore, actionManifestAvailable, actionManifestRemove, actionManifestRemoveStorer, actionManifestRemoveStored, actionTransferToMumbai, actionListPlugins, actionListActivePlugins, actionInstallPlugin, actionUninstallPlugin, actionGetInstallStatus, actionGetInstallOutput, actionUpdatePlugin:
985988
bl.authorizedPeersLock.RLock()
986989
_, ok := bl.authorizedPeers[pid]
987990
bl.authorizedPeersLock.RUnlock()

blockchain/blox.go

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
package blockchain
22

33
import (
4+
"bufio"
45
"bytes"
56
"context"
67
"encoding/json"
78
"fmt"
89
"io"
910
"net/http"
11+
"strings"
12+
"sync"
1013

1114
"github.com/functionland/go-fula/wap/pkg/wifi"
1215
"github.com/libp2p/go-libp2p/core/network"
@@ -20,6 +23,51 @@ const (
2023
GB = 1024 * MB
2124
)
2225

26+
type StreamBuffer struct {
27+
Chunks chan string
28+
closed bool
29+
mu sync.Mutex
30+
closeOnce sync.Once
31+
err error
32+
}
33+
34+
func NewStreamBuffer() *StreamBuffer {
35+
return &StreamBuffer{
36+
Chunks: make(chan string, 100), // Buffered channel to prevent blocking
37+
}
38+
}
39+
40+
func (b *StreamBuffer) GetChunk() (string, error) {
41+
chunk, ok := <-b.Chunks
42+
if !ok {
43+
return "", b.err
44+
}
45+
return chunk, nil
46+
}
47+
48+
func (b *StreamBuffer) AddChunk(chunk string) {
49+
b.mu.Lock()
50+
defer b.mu.Unlock()
51+
if !b.closed {
52+
b.Chunks <- chunk
53+
}
54+
}
55+
56+
func (b *StreamBuffer) Close(err error) {
57+
b.closeOnce.Do(func() {
58+
b.mu.Lock()
59+
defer b.mu.Unlock()
60+
b.closed = true
61+
close(b.Chunks)
62+
})
63+
}
64+
65+
func (b *StreamBuffer) IsClosed() bool {
66+
b.mu.Lock()
67+
defer b.mu.Unlock()
68+
return b.closed
69+
}
70+
2371
func (bl *FxBlockchain) BloxFreeSpace(ctx context.Context, to peer.ID) ([]byte, error) {
2472
if bl.allowTransientConnection {
2573
ctx = network.WithUseTransient(ctx, "fx.blockchain")
@@ -245,6 +293,85 @@ func (bl *FxBlockchain) FetchContainerLogs(ctx context.Context, to peer.ID, r wi
245293
}
246294
}
247295

296+
func (bl *FxBlockchain) ChatWithAI(ctx context.Context, to peer.ID, r wifi.ChatWithAIRequest) (*StreamBuffer, error) {
297+
if bl.allowTransientConnection {
298+
ctx = network.WithUseTransient(ctx, "fx.blockchain")
299+
}
300+
301+
var buf bytes.Buffer
302+
if err := json.NewEncoder(&buf).Encode(r); err != nil {
303+
return nil, fmt.Errorf("failed to encode request: %w", err)
304+
}
305+
306+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "http://"+to.String()+".invalid/"+actionChatWithAI, &buf)
307+
if err != nil {
308+
return nil, fmt.Errorf("failed to create request: %w", err)
309+
}
310+
311+
resp, err := bl.c.Do(req)
312+
if err != nil {
313+
return nil, fmt.Errorf("failed to send request: %w", err)
314+
}
315+
316+
if resp.StatusCode != http.StatusOK {
317+
defer resp.Body.Close()
318+
bodyBytes, _ := io.ReadAll(resp.Body)
319+
return nil, fmt.Errorf("unexpected response: %d; body: %s", resp.StatusCode, string(bodyBytes))
320+
}
321+
322+
buffer := NewStreamBuffer()
323+
324+
go func() {
325+
defer func() {
326+
resp.Body.Close()
327+
if !buffer.IsClosed() {
328+
buffer.Close(nil)
329+
}
330+
}()
331+
332+
reader := bufio.NewReader(resp.Body)
333+
var accum []byte
334+
335+
for {
336+
select {
337+
case <-ctx.Done():
338+
buffer.Close(fmt.Errorf("request canceled: %w", ctx.Err()))
339+
return
340+
default:
341+
line, err := reader.ReadBytes('\n')
342+
if err != nil {
343+
if err == io.EOF {
344+
if len(accum) > 0 {
345+
buffer.AddChunk(string(accum))
346+
}
347+
buffer.Close(nil)
348+
} else {
349+
buffer.Close(fmt.Errorf("error reading response: %w", err))
350+
}
351+
return
352+
}
353+
354+
// Check for completion marker
355+
if bytes.HasPrefix(line, []byte("!COMPLETION!")) {
356+
buffer.Close(nil)
357+
return
358+
}
359+
360+
accum = append(accum, line...)
361+
362+
// Try to parse as JSON to verify chunk completeness
363+
var temp interface{}
364+
if json.Unmarshal(accum, &temp) == nil {
365+
buffer.AddChunk(string(accum))
366+
accum = accum[:0] // Reset accumulator
367+
}
368+
}
369+
}
370+
}()
371+
372+
return buffer, nil
373+
}
374+
248375
func (bl *FxBlockchain) FindBestAndTargetInLogs(ctx context.Context, to peer.ID, r wifi.FindBestAndTargetInLogsRequest) ([]byte, error) {
249376

250377
if bl.allowTransientConnection {
@@ -468,6 +595,98 @@ func (bl *FxBlockchain) handleFetchContainerLogs(ctx context.Context, from peer.
468595

469596
}
470597

598+
func (bl *FxBlockchain) handleChatWithAI(ctx context.Context, from peer.ID, w http.ResponseWriter, r *http.Request) {
599+
log := log.With("action", actionChatWithAI, "from", from)
600+
601+
// Decode the incoming request
602+
var req wifi.ChatWithAIRequest
603+
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
604+
log.Error("failed to decode request: %v", err)
605+
http.Error(w, "failed to decode request", http.StatusBadRequest)
606+
return
607+
}
608+
609+
// Set up headers for streaming response
610+
w.Header().Set("Content-Type", "application/json")
611+
w.WriteHeader(http.StatusOK)
612+
613+
flusher, ok := w.(http.Flusher)
614+
if !ok {
615+
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
616+
return
617+
}
618+
619+
// Fetch AI response using FetchAIResponse
620+
chunks, err := wifi.FetchAIResponse(ctx, req.AIModel, req.UserMessage)
621+
if err != nil {
622+
log.Error("error in fetchAIResponse: %v", err)
623+
http.Error(w, fmt.Sprintf("Error fetching AI response: %v", err), http.StatusInternalServerError)
624+
return
625+
}
626+
627+
log.Debugw("Streaming AI response started", "ai_model", req.AIModel, "user_message", req.UserMessage)
628+
defer log.Debugw("Streaming AI response ended", "ai_model", req.AIModel, "user_message", req.UserMessage)
629+
630+
var buffer string // Buffer to store incomplete chunks
631+
632+
for {
633+
select {
634+
case <-ctx.Done(): // Handle client disconnect or cancellation
635+
log.Warn("client disconnected")
636+
return
637+
case chunk, ok := <-chunks:
638+
if !ok {
639+
return // Channel closed
640+
}
641+
642+
chunk = strings.TrimSpace(chunk) // Remove leading/trailing whitespace
643+
644+
if chunk == "" { // Skip empty chunks
645+
continue
646+
}
647+
648+
buffer += chunk // Append chunk to buffer
649+
650+
var parsedChunk struct {
651+
ID string `json:"id"`
652+
Object string `json:"object"`
653+
Choices []struct {
654+
Delta struct {
655+
Content string `json:"content"`
656+
} `json:"delta"`
657+
} `json:"choices"`
658+
}
659+
660+
if err := json.Unmarshal([]byte(buffer), &parsedChunk); err != nil {
661+
log.Error("failed to parse chunk: %v", err)
662+
continue // Wait for more data to complete the JSON object
663+
}
664+
665+
buffer = "" // Clear buffer after successful parsing
666+
667+
var newContent string
668+
for _, choice := range parsedChunk.Choices {
669+
newContent += choice.Delta.Content
670+
}
671+
672+
newContent = strings.TrimSpace(newContent) // Remove whitespace
673+
674+
response := wifi.ChatWithAIResponse{
675+
Status: true,
676+
Msg: newContent,
677+
}
678+
log.Debugw("Streaming AI response chunk", "chunk", newContent)
679+
680+
if err := json.NewEncoder(w).Encode(response); err != nil {
681+
log.Error("failed to write response: %v", err)
682+
http.Error(w, fmt.Sprintf("Error writing response: %v", err), http.StatusInternalServerError)
683+
return
684+
}
685+
flusher.Flush() // Flush each chunk to ensure real-time streaming
686+
}
687+
}
688+
}
689+
471690
func (bl *FxBlockchain) handleFindBestAndTargetInLogs(ctx context.Context, from peer.ID, w http.ResponseWriter, r *http.Request) {
472691
log := log.With("action", actionFindBestAndTargetInLogs, "from", from)
473692

blockchain/interface.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ const (
6464
actionGetInstallOutput = "get-install-output"
6565
actionGetInstallStatus = "get-install-status"
6666
actionUpdatePlugin = "update-plugin"
67+
68+
// AI
69+
actionChatWithAI = "chat-ai"
6770
)
6871

6972
type ReplicateRequest struct {
@@ -520,6 +523,9 @@ type Blockchain interface {
520523
GetInstallOutput(context.Context, peer.ID, string, string) ([]byte, error)
521524
GetInstallStatus(context.Context, peer.ID, string) ([]byte, error)
522525
UpdatePlugin(context.Context, peer.ID, string) ([]byte, error)
526+
527+
// AI
528+
ChatWithAI(context.Context, peer.ID, wifi.ChatWithAIRequest) (*StreamBuffer, error)
523529
}
524530

525531
var requestTypes = map[string]reflect.Type{
@@ -574,6 +580,9 @@ var requestTypes = map[string]reflect.Type{
574580
actionGetInstallOutput: reflect.TypeOf(GetInstallOutputRequest{}),
575581
actionGetInstallStatus: reflect.TypeOf(GetInstallStatusRequest{}),
576582
actionUpdatePlugin: reflect.TypeOf(UpdatePluginRequest{}),
583+
584+
// AI
585+
actionChatWithAI: reflect.TypeOf(wifi.ChatWithAIRequest{}),
577586
}
578587

579588
var responseTypes = map[string]reflect.Type{
@@ -628,4 +637,7 @@ var responseTypes = map[string]reflect.Type{
628637
actionGetInstallOutput: reflect.TypeOf(GetInstallOutputResponse{}),
629638
actionGetInstallStatus: reflect.TypeOf(GetInstallStatusResponse{}),
630639
actionUpdatePlugin: reflect.TypeOf(UpdatePluginResponse{}),
640+
641+
// AI
642+
actionChatWithAI: reflect.TypeOf(wifi.ChatWithAIResponse{}),
631643
}

go.mod

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
module github.com/functionland/go-fula
22

3-
go 1.21
3+
go 1.22.0
4+
5+
toolchain go1.22.1
46

57
require (
68
github.com/docker/docker v24.0.7+incompatible
9+
github.com/google/uuid v1.6.0
710
github.com/grandcat/zeroconf v1.0.0
811
github.com/ipfs-cluster/ipfs-cluster v1.0.8
912
github.com/ipfs/boxo v0.17.0
@@ -28,14 +31,16 @@ require (
2831
github.com/mdp/qrterminal v1.0.1
2932
github.com/mr-tron/base58 v1.2.0
3033
github.com/multiformats/go-multiaddr v0.12.2
34+
github.com/multiformats/go-multibase v0.2.0
3135
github.com/multiformats/go-multicodec v0.9.0
3236
github.com/multiformats/go-multihash v0.2.3
3337
github.com/multiformats/go-varint v0.0.7
38+
github.com/sony/gobreaker v0.5.0
3439
github.com/tyler-smith/go-bip39 v1.1.0
3540
github.com/urfave/cli/v2 v2.27.1
3641
go.uber.org/ratelimit v0.3.0
37-
golang.org/x/crypto v0.21.0
38-
golang.org/x/sync v0.6.0
42+
golang.org/x/crypto v0.32.0
43+
golang.org/x/sync v0.10.0
3944
gopkg.in/ini.v1 v1.67.0
4045
gopkg.in/yaml.v3 v3.0.1
4146
)
@@ -49,7 +54,7 @@ require (
4954
github.com/morikuni/aec v1.0.0 // indirect
5055
github.com/opencontainers/go-digest v1.0.0 // indirect
5156
github.com/opencontainers/image-spec v1.0.2 // indirect
52-
golang.org/x/sys v0.18.0 // indirect
57+
golang.org/x/sys v0.29.0 // indirect
5358
gotest.tools/v3 v3.4.0 // indirect
5459
)
5560

@@ -110,7 +115,6 @@ require (
110115
github.com/golang/snappy v0.0.4 // indirect
111116
github.com/google/gopacket v1.1.19 // indirect
112117
github.com/google/pprof v0.0.0-20231229205709-960ae82b1e42 // indirect
113-
github.com/google/uuid v1.6.0 // indirect
114118
github.com/gorilla/mux v1.8.1 // indirect
115119
github.com/gorilla/websocket v1.5.1 // indirect
116120
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0 // indirect
@@ -193,7 +197,6 @@ require (
193197
github.com/multiformats/go-base36 v0.2.0 // indirect
194198
github.com/multiformats/go-multiaddr-dns v0.3.1 // indirect
195199
github.com/multiformats/go-multiaddr-fmt v0.1.0 // indirect
196-
github.com/multiformats/go-multibase v0.2.0 // indirect
197200
github.com/multiformats/go-multistream v0.5.0 // indirect
198201
github.com/onsi/ginkgo/v2 v2.13.2 // indirect
199202
github.com/opencontainers/runtime-spec v1.1.0 // indirect
@@ -233,7 +236,6 @@ require (
233236
github.com/rs/cors v1.10.1 // indirect
234237
github.com/russross/blackfriday/v2 v2.1.0 // indirect
235238
github.com/samber/lo v1.39.0 // indirect
236-
github.com/sony/gobreaker v0.5.0 // indirect
237239
github.com/spaolacci/murmur3 v1.1.0 // indirect
238240
github.com/stretchr/testify v1.8.4 // indirect
239241
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
@@ -268,12 +270,11 @@ require (
268270
go.uber.org/zap v1.26.0 // indirect
269271
go4.org v0.0.0-20230225012048-214862532bf5 // indirect
270272
golang.org/x/exp v0.0.0-20240103183307-be819d1f06fc // indirect
271-
golang.org/x/mobile v0.0.0-20240320162201-c76e57eead38 // indirect
272-
golang.org/x/mod v0.16.0 // indirect
273-
golang.org/x/net v0.22.0 // indirect
273+
golang.org/x/mod v0.22.0 // indirect
274+
golang.org/x/net v0.34.0 // indirect
274275
golang.org/x/oauth2 v0.16.0 // indirect
275-
golang.org/x/text v0.14.0 // indirect
276-
golang.org/x/tools v0.19.0 // indirect
276+
golang.org/x/text v0.21.0 // indirect
277+
golang.org/x/tools v0.29.0 // indirect
277278
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
278279
gonum.org/v1/gonum v0.14.0 // indirect
279280
google.golang.org/appengine v1.6.8 // indirect

0 commit comments

Comments
 (0)