diff --git a/_mocks/opencsg.com/csghub-server/builder/rpc/mock_CsgbotSvcClient.go b/_mocks/opencsg.com/csghub-server/builder/rpc/mock_CsgbotSvcClient.go new file mode 100644 index 000000000..fb9894083 --- /dev/null +++ b/_mocks/opencsg.com/csghub-server/builder/rpc/mock_CsgbotSvcClient.go @@ -0,0 +1,252 @@ +// Code generated by mockery v2.53.0. DO NOT EDIT. + +package rpc + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" + rpc "opencsg.com/csghub-server/builder/rpc" + + types "opencsg.com/csghub-server/common/types" +) + +// MockCsgbotSvcClient is an autogenerated mock type for the CsgbotSvcClient type +type MockCsgbotSvcClient struct { + mock.Mock +} + +type MockCsgbotSvcClient_Expecter struct { + mock *mock.Mock +} + +func (_m *MockCsgbotSvcClient) EXPECT() *MockCsgbotSvcClient_Expecter { + return &MockCsgbotSvcClient_Expecter{mock: &_m.Mock} +} + +// CreateKnowledgeBase provides a mock function with given fields: ctx, userUUID, username, token, req +func (_m *MockCsgbotSvcClient) CreateKnowledgeBase(ctx context.Context, userUUID string, username string, token string, req *rpc.CreateKnowledgeBaseRequest) (*rpc.CreateKnowledgeBaseResponse, error) { + ret := _m.Called(ctx, userUUID, username, token, req) + + if len(ret) == 0 { + panic("no return value specified for CreateKnowledgeBase") + } + + var r0 *rpc.CreateKnowledgeBaseResponse + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *rpc.CreateKnowledgeBaseRequest) (*rpc.CreateKnowledgeBaseResponse, error)); ok { + return rf(ctx, userUUID, username, token, req) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, *rpc.CreateKnowledgeBaseRequest) *rpc.CreateKnowledgeBaseResponse); ok { + r0 = rf(ctx, userUUID, username, token, req) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*rpc.CreateKnowledgeBaseResponse) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string, string, *rpc.CreateKnowledgeBaseRequest) error); ok { + r1 = rf(ctx, userUUID, username, token, req) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockCsgbotSvcClient_CreateKnowledgeBase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CreateKnowledgeBase' +type MockCsgbotSvcClient_CreateKnowledgeBase_Call struct { + *mock.Call +} + +// CreateKnowledgeBase is a helper method to define mock.On call +// - ctx context.Context +// - userUUID string +// - username string +// - token string +// - req *rpc.CreateKnowledgeBaseRequest +func (_e *MockCsgbotSvcClient_Expecter) CreateKnowledgeBase(ctx interface{}, userUUID interface{}, username interface{}, token interface{}, req interface{}) *MockCsgbotSvcClient_CreateKnowledgeBase_Call { + return &MockCsgbotSvcClient_CreateKnowledgeBase_Call{Call: _e.mock.On("CreateKnowledgeBase", ctx, userUUID, username, token, req)} +} + +func (_c *MockCsgbotSvcClient_CreateKnowledgeBase_Call) Run(run func(ctx context.Context, userUUID string, username string, token string, req *rpc.CreateKnowledgeBaseRequest)) *MockCsgbotSvcClient_CreateKnowledgeBase_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(*rpc.CreateKnowledgeBaseRequest)) + }) + return _c +} + +func (_c *MockCsgbotSvcClient_CreateKnowledgeBase_Call) Return(_a0 *rpc.CreateKnowledgeBaseResponse, _a1 error) *MockCsgbotSvcClient_CreateKnowledgeBase_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockCsgbotSvcClient_CreateKnowledgeBase_Call) RunAndReturn(run func(context.Context, string, string, string, *rpc.CreateKnowledgeBaseRequest) (*rpc.CreateKnowledgeBaseResponse, error)) *MockCsgbotSvcClient_CreateKnowledgeBase_Call { + _c.Call.Return(run) + return _c +} + +// DeleteKnowledgeBase provides a mock function with given fields: ctx, userUUID, username, token, contentID +func (_m *MockCsgbotSvcClient) DeleteKnowledgeBase(ctx context.Context, userUUID string, username string, token string, contentID string) error { + ret := _m.Called(ctx, userUUID, username, token, contentID) + + if len(ret) == 0 { + panic("no return value specified for DeleteKnowledgeBase") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string) error); ok { + r0 = rf(ctx, userUUID, username, token, contentID) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCsgbotSvcClient_DeleteKnowledgeBase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteKnowledgeBase' +type MockCsgbotSvcClient_DeleteKnowledgeBase_Call struct { + *mock.Call +} + +// DeleteKnowledgeBase is a helper method to define mock.On call +// - ctx context.Context +// - userUUID string +// - username string +// - token string +// - contentID string +func (_e *MockCsgbotSvcClient_Expecter) DeleteKnowledgeBase(ctx interface{}, userUUID interface{}, username interface{}, token interface{}, contentID interface{}) *MockCsgbotSvcClient_DeleteKnowledgeBase_Call { + return &MockCsgbotSvcClient_DeleteKnowledgeBase_Call{Call: _e.mock.On("DeleteKnowledgeBase", ctx, userUUID, username, token, contentID)} +} + +func (_c *MockCsgbotSvcClient_DeleteKnowledgeBase_Call) Run(run func(ctx context.Context, userUUID string, username string, token string, contentID string)) *MockCsgbotSvcClient_DeleteKnowledgeBase_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(string)) + }) + return _c +} + +func (_c *MockCsgbotSvcClient_DeleteKnowledgeBase_Call) Return(_a0 error) *MockCsgbotSvcClient_DeleteKnowledgeBase_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCsgbotSvcClient_DeleteKnowledgeBase_Call) RunAndReturn(run func(context.Context, string, string, string, string) error) *MockCsgbotSvcClient_DeleteKnowledgeBase_Call { + _c.Call.Return(run) + return _c +} + +// DeleteWorkspaceFiles provides a mock function with given fields: ctx, userUUID, username, token, agentName +func (_m *MockCsgbotSvcClient) DeleteWorkspaceFiles(ctx context.Context, userUUID string, username string, token string, agentName string) error { + ret := _m.Called(ctx, userUUID, username, token, agentName) + + if len(ret) == 0 { + panic("no return value specified for DeleteWorkspaceFiles") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string) error); ok { + r0 = rf(ctx, userUUID, username, token, agentName) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCsgbotSvcClient_DeleteWorkspaceFiles_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DeleteWorkspaceFiles' +type MockCsgbotSvcClient_DeleteWorkspaceFiles_Call struct { + *mock.Call +} + +// DeleteWorkspaceFiles is a helper method to define mock.On call +// - ctx context.Context +// - userUUID string +// - username string +// - token string +// - agentName string +func (_e *MockCsgbotSvcClient_Expecter) DeleteWorkspaceFiles(ctx interface{}, userUUID interface{}, username interface{}, token interface{}, agentName interface{}) *MockCsgbotSvcClient_DeleteWorkspaceFiles_Call { + return &MockCsgbotSvcClient_DeleteWorkspaceFiles_Call{Call: _e.mock.On("DeleteWorkspaceFiles", ctx, userUUID, username, token, agentName)} +} + +func (_c *MockCsgbotSvcClient_DeleteWorkspaceFiles_Call) Run(run func(ctx context.Context, userUUID string, username string, token string, agentName string)) *MockCsgbotSvcClient_DeleteWorkspaceFiles_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(string)) + }) + return _c +} + +func (_c *MockCsgbotSvcClient_DeleteWorkspaceFiles_Call) Return(_a0 error) *MockCsgbotSvcClient_DeleteWorkspaceFiles_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCsgbotSvcClient_DeleteWorkspaceFiles_Call) RunAndReturn(run func(context.Context, string, string, string, string) error) *MockCsgbotSvcClient_DeleteWorkspaceFiles_Call { + _c.Call.Return(run) + return _c +} + +// UpdateKnowledgeBase provides a mock function with given fields: ctx, userUUID, username, token, contentID, req +func (_m *MockCsgbotSvcClient) UpdateKnowledgeBase(ctx context.Context, userUUID string, username string, token string, contentID string, req *types.UpdateAgentKnowledgeBaseRequest) error { + ret := _m.Called(ctx, userUUID, username, token, contentID, req) + + if len(ret) == 0 { + panic("no return value specified for UpdateKnowledgeBase") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string, string, *types.UpdateAgentKnowledgeBaseRequest) error); ok { + r0 = rf(ctx, userUUID, username, token, contentID, req) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockCsgbotSvcClient_UpdateKnowledgeBase_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateKnowledgeBase' +type MockCsgbotSvcClient_UpdateKnowledgeBase_Call struct { + *mock.Call +} + +// UpdateKnowledgeBase is a helper method to define mock.On call +// - ctx context.Context +// - userUUID string +// - username string +// - token string +// - contentID string +// - req *types.UpdateAgentKnowledgeBaseRequest +func (_e *MockCsgbotSvcClient_Expecter) UpdateKnowledgeBase(ctx interface{}, userUUID interface{}, username interface{}, token interface{}, contentID interface{}, req interface{}) *MockCsgbotSvcClient_UpdateKnowledgeBase_Call { + return &MockCsgbotSvcClient_UpdateKnowledgeBase_Call{Call: _e.mock.On("UpdateKnowledgeBase", ctx, userUUID, username, token, contentID, req)} +} + +func (_c *MockCsgbotSvcClient_UpdateKnowledgeBase_Call) Run(run func(ctx context.Context, userUUID string, username string, token string, contentID string, req *types.UpdateAgentKnowledgeBaseRequest)) *MockCsgbotSvcClient_UpdateKnowledgeBase_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string), args[4].(string), args[5].(*types.UpdateAgentKnowledgeBaseRequest)) + }) + return _c +} + +func (_c *MockCsgbotSvcClient_UpdateKnowledgeBase_Call) Return(_a0 error) *MockCsgbotSvcClient_UpdateKnowledgeBase_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockCsgbotSvcClient_UpdateKnowledgeBase_Call) RunAndReturn(run func(context.Context, string, string, string, string, *types.UpdateAgentKnowledgeBaseRequest) error) *MockCsgbotSvcClient_UpdateKnowledgeBase_Call { + _c.Call.Return(run) + return _c +} + +// NewMockCsgbotSvcClient creates a new instance of MockCsgbotSvcClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockCsgbotSvcClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockCsgbotSvcClient { + mock := &MockCsgbotSvcClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/builder/rpc/agenthub_svc_client.go b/builder/rpc/agenthub_svc_client.go new file mode 100644 index 000000000..c7cad2745 --- /dev/null +++ b/builder/rpc/agenthub_svc_client.go @@ -0,0 +1,503 @@ +package rpc + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "errors" + "io" + "log/slog" + "net/http" + "strconv" + "strings" + + "opencsg.com/csghub-server/common/errorx" + "opencsg.com/csghub-server/common/types" +) + +type AgentHubSvcClient interface { + CreateAgentInstance(ctx context.Context, userUUID string, req *CreateAgentInstanceRequest) (*CreateAgentInstanceResponse, error) + DeleteAgentInstance(ctx context.Context, userUUID string, contentID string) error + UpdateAgentInstance(ctx context.Context, userUUID string, contentID string, req *UpdateAgentInstanceRequest) error + GetAgentInstances(ctx context.Context, req *GetAgentInstancesRequest) (GetAgentInstancesResponse, error) + RunAgentInstance(ctx context.Context, userUUID string, contentID string, req *RunAgentInstanceRequest) (*RunAgentInstanceResponse, error) + RunAgentInstanceStream(ctx context.Context, userUUID string, contentID string, req *RunAgentInstanceRequest) (<-chan types.AgentStreamEvent, error) +} + +type CreateAgentInstanceRequest struct { + Name string `json:"name"` + Description string `json:"description"` + Data json.RawMessage `json:"data"` +} + +type CreateAgentInstanceResponse AgentInstance + +type GetAgentInstancesRequest struct { + IDs []string `json:"ids"` + UserUUID string `json:"user_uuid"` +} + +type GetAgentInstancesResponse []*AgentInstance + +type DeleteAgentInstanceRequest struct { + IDs []string `json:"ids"` +} + +type DeleteAgentInstanceResponse struct { + IDs []string `json:"ids"` + Total int `json:"total"` +} + +type UpdateAgentInstanceRequest struct { + Name string `json:"name,omitempty"` + Description string `json:"description,omitempty"` + Data json.RawMessage `json:"data,omitempty"` + FolderID string `json:"folder_id,omitempty"` + EndpointName string `json:"endpoint_name,omitempty"` + MCPEnabled *bool `json:"mcp_enabled,omitempty"` + Locked *bool `json:"locked,omitempty"` + ActionName string `json:"action_name,omitempty"` + ActionDescription string `json:"action_description,omitempty"` + AccessType string `json:"access_type,omitempty"` + FSPath string `json:"fs_path,omitempty"` +} + +type RunAgentInstanceRequest struct { + InputValue string `json:"input_value"` + InputType string `json:"input_type"` + OutputType string `json:"output_type"` + Tweaks json.RawMessage `json:"tweaks"` + SessionID string `json:"session_id"` + Stream bool `json:"stream"` +} + +type RunAgentInstanceResponse struct { + SessionID string `json:"session_id"` + Outputs []Outputs `json:"outputs"` +} + +type Outputs struct { + Outputs []InnerOutput `json:"outputs,omitempty"` // camada extra para stream=true + Results *Results `json:"results,omitempty"` // stream=false +} + +type InnerOutput struct { + Results *Results `json:"results,omitempty"` +} + +type Results struct { + Message Message `json:"message"` +} + +type Message struct { + Timestamp string `json:"timestamp"` + Sender string `json:"sender"` + SenderName string `json:"sender_name"` + TextKey string `json:"text_key"` + Text string `json:"text"` +} + +// TokenData holds the data for an event of type "token". +type TokenData struct { + Chunk string `json:"chunk"` +} + +// EndData holds the final result from an event of type "end". +// The 'Result' field conveniently matches our target RunAgentInstanceResponse struct. +type EndData struct { + Result RunAgentInstanceResponse `json:"result"` +} + +/* + { + "id": "new-flow-id", + "name": "New Flow Name", + "description": "Flow description", + "data": { + // Flow graph data + }, + "hasIO": true, + "created_at": "2024-01-01T00:00:00Z", + "updated_at": "2024-01-01T00:00:00Z", + "user_id": "user-uuid", + "folder_id": "folder-uuid" + } +*/ +type AgentInstance struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Data json.RawMessage `json:"data"` + HasIO bool `json:"hasIO"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + UserUUID string `json:"user_id"` // user uuid + FolderID string `json:"folder_id"` +} + +type AgentHubSvcClientImpl struct { + hc *HttpClient + token string +} + +func NewAgentHubSvcClientImpl(endpoint string, token string, opts ...RequestOption) AgentHubSvcClient { + return &AgentHubSvcClientImpl{ + hc: NewHttpClient(strings.TrimSuffix(endpoint, "/"), opts...), + token: token, + } +} + +// POST /api/v1/opencsg/flows/ +func (c *AgentHubSvcClientImpl) CreateAgentInstance(ctx context.Context, userUUID string, req *CreateAgentInstanceRequest) (*CreateAgentInstanceResponse, error) { + if req == nil { + return nil, errorx.BadRequest(errors.New("create agent instance request is nil"), nil) + } + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "service": "agenthub", + "api": "/api/v1/opencsg/flows/", + } + var resp CreateAgentInstanceResponse + var buf io.Reader + + jsonData, err := json.Marshal(req) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + buf = bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/opencsg/flows/" + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, path, buf) + if err != nil { + return nil, errorx.InternalServerError(err, nil) + } + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("User-UUID", userUUID) + hreq.Header.Set("Token", c.token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to create agent instance in agenthub", "error", err, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to create agent instance in agenthub"), rpcErrorCtx) + } + defer hresp.Body.Close() + if hresp.StatusCode != http.StatusOK { + slog.ErrorContext(ctx, "failed to create agent instance in agenthub", "status_code", hresp.StatusCode, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to create agent instance in agenthub"), rpcErrorCtx) + } + + body, err := io.ReadAll(hresp.Body) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + err = json.Unmarshal(body, &resp) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + return &resp, nil +} + +// POST /api/v1/opencsg/flows/query +func (c *AgentHubSvcClientImpl) GetAgentInstances(ctx context.Context, req *GetAgentInstancesRequest) (GetAgentInstancesResponse, error) { + if req == nil { + return nil, errorx.BadRequest(errors.New("get agent instances request is nil"), nil) + } + rpcErrorCtx := map[string]any{ + "user_uuid": req.UserUUID, + "service": "agenthub", + "api": "/api/v1/opencsg/flows/query", + } + + var resp GetAgentInstancesResponse + var buf io.Reader + jsonData, err := json.Marshal(req) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + buf = bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/opencsg/flows/query" + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, path, buf) + if err != nil { + return nil, errorx.InternalServerError(err, nil) + } + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("User-UUID", req.UserUUID) + hreq.Header.Set("Token", c.token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to get agent instance from agenthub", "error", err, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to get agent instance from agenthub"), rpcErrorCtx) + } + defer hresp.Body.Close() + if hresp.StatusCode != http.StatusOK { + slog.ErrorContext(ctx, "failed to get agent instance from agenthub", "status_code", hresp.StatusCode, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to get agent instance from agenthub"), rpcErrorCtx) + } + body, err := io.ReadAll(hresp.Body) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + err = json.Unmarshal(body, &resp) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + return resp, nil +} + +// POST /api/v1/opencsg/flows/delete +func (c *AgentHubSvcClientImpl) DeleteAgentInstance(ctx context.Context, userUUID string, contentID string) error { + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "service": "agenthub", + "api": "/api/v1/opencsg/flows/delete", + } + var resp DeleteAgentInstanceResponse + + req := DeleteAgentInstanceRequest{ + IDs: []string{contentID}, + } + jsonData, err := json.Marshal(req) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + buf := bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/opencsg/flows/delete" + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, path, buf) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("User-UUID", userUUID) + hreq.Header.Set("Token", c.token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to delete agent instance in agenthub", "error", err, "rpc_error_ctx", rpcErrorCtx) + return errorx.RemoteSvcFail(errors.New("failed to delete agent instance in agenthub"), rpcErrorCtx) + } + defer hresp.Body.Close() + + if hresp.StatusCode != http.StatusOK { + return errorx.RemoteSvcFail(errors.New("failed to delete agent instance in agenthub, status code: "+strconv.Itoa(hresp.StatusCode)), rpcErrorCtx) + } + + body, err := io.ReadAll(hresp.Body) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + if err := json.Unmarshal(body, &resp); err != nil { + return errorx.RemoteSvcFail(errors.New("failed to delete agent instance in agenthub, unmarshal response error: "+err.Error()), rpcErrorCtx) + } + + if resp.Total != 1 { + return errorx.RemoteSvcFail(errors.New("failed to delete agent instance in agenthub, total: "+strconv.Itoa(resp.Total)), rpcErrorCtx) + } + + if len(resp.IDs) == 0 { + return errorx.RemoteSvcFail(errors.New("failed to delete agent instance in agenthub, response IDs is empty"), rpcErrorCtx) + } + + if resp.IDs[0] != contentID { + return errorx.RemoteSvcFail(errors.New("failed to delete agent instance in agenthub, content ID mismatch: "+contentID+" != "+resp.IDs[0]), rpcErrorCtx) + } + return nil +} + +// PATCH /api/v1/opencsg/flows/{id} +func (c *AgentHubSvcClientImpl) UpdateAgentInstance(ctx context.Context, userUUID string, contentID string, req *UpdateAgentInstanceRequest) error { + if req == nil { + return errorx.BadRequest(errors.New("update agent instance request is nil"), nil) + } + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "service": "agenthub", + "api": "/api/v1/opencsg/flows/" + contentID, + } + var buf io.Reader + jsonData, err := json.Marshal(req) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + buf = bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/opencsg/flows/" + contentID + hreq, err := http.NewRequestWithContext(ctx, http.MethodPatch, path, buf) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("User-UUID", userUUID) + hreq.Header.Set("Token", c.token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to update agent instance in agenthub", "error", err, "rpc_error_ctx", rpcErrorCtx) + return errorx.RemoteSvcFail(errors.New("failed to update agent instance in agenthub"), rpcErrorCtx) + } + defer hresp.Body.Close() + if hresp.StatusCode != http.StatusOK { + slog.ErrorContext(ctx, "failed to update agent instance in agenthub", "status_code", hresp.StatusCode, "rpc_error_ctx", rpcErrorCtx) + return errorx.RemoteSvcFail(errors.New("failed to update agent instance in agenthub"), rpcErrorCtx) + } + + return nil +} + +// POST /api/v1/opencsg/run/{id}?stream=false +func (c *AgentHubSvcClientImpl) RunAgentInstance(ctx context.Context, userUUID string, instanceID string, req *RunAgentInstanceRequest) (*RunAgentInstanceResponse, error) { + if req == nil { + return nil, errorx.BadRequest(errors.New("run agent instance request is nil"), nil) + } + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "service": "agenthub", + "api": "/api/v1/opencsg/run/" + instanceID + "?stream=false", + } + var buf io.Reader + jsonData, err := json.Marshal(req) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + buf = bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/opencsg/run/" + instanceID + "?stream=false" + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, path, buf) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("User-UUID", userUUID) + hreq.Header.Set("Token", c.token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to run agent instance in agenthub", "error", err, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to run agent instance in agenthub"), rpcErrorCtx) + } + defer hresp.Body.Close() + if hresp.StatusCode != http.StatusOK { + slog.ErrorContext(ctx, "failed to run agent instance in agenthub", "status_code", hresp.StatusCode, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to run agent instance in agenthub"), rpcErrorCtx) + } + + // handle non-stream response + body, err := io.ReadAll(hresp.Body) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + var resp RunAgentInstanceResponse + err = json.Unmarshal(body, &resp) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + return &resp, nil +} + +// RunAgentInstanceStream runs an agent instance and returns a streaming channel +// POST /api/v1/opencsg/run/{id}?stream=true +func (c *AgentHubSvcClientImpl) RunAgentInstanceStream(ctx context.Context, userUUID string, contentID string, req *RunAgentInstanceRequest) (<-chan types.AgentStreamEvent, error) { + if req == nil { + return nil, errorx.BadRequest(errors.New("run agent instance request is nil"), nil) + } + + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "service": "agenthub", + "api": "/api/v1/opencsg/run/" + contentID + "?stream=true", + } + + var buf io.Reader + jsonData, err := json.Marshal(req) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + buf = bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/opencsg/run/" + contentID + "?stream=true" + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, path, buf) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("User-UUID", userUUID) + hreq.Header.Set("Token", c.token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + return nil, errorx.RemoteSvcFail(errors.New("failed to run agent instance in agenthub"), rpcErrorCtx) + } + + if hresp.StatusCode != http.StatusOK { + defer hresp.Body.Close() + return nil, errorx.RemoteSvcFail(errors.New("failed to run agent instance in agenthub"), rpcErrorCtx) + } + + // Create a channel for streaming responses + streamChan := make(chan types.AgentStreamEvent, 100) + + // Start a goroutine to handle the streaming response + go func(body io.ReadCloser) { + defer close(streamChan) + defer body.Close() + + scanner := bufio.NewScanner(body) + scanner.Buffer(make([]byte, 0, 64*1024), 1024*1024) // allow up to 1MB buffer + for scanner.Scan() { + select { + case <-ctx.Done(): + slog.Debug("stream cancelled", slog.String("session_id", contentID)) + return + default: + } + + line := strings.TrimSpace(scanner.Text()) + if line == "" { + continue + } + + var streamEvent types.AgentStreamEvent + if err := json.Unmarshal([]byte(line), &streamEvent); err != nil { + slog.Warn("could not unmarshal stream line into event, skipping. session_id: %s, error: %v, line: %s", slog.String("session_id", contentID), slog.Any("error", err), slog.String("line", line)) + continue + } + + // Process the event based on its type + switch streamEvent.Event { + case "token": + sendEvent(ctx, streamChan, streamEvent) + case "add_message": + sendEvent(ctx, streamChan, streamEvent) + case "end": + var endData EndData + if err := json.Unmarshal(streamEvent.Data, &endData); err != nil { + slog.Error("Error: could not unmarshal 'end' event data: %v", slog.String("session_id", contentID), slog.Any("error", err)) + continue + } + sendEvent(ctx, streamChan, streamEvent) + + // extract the message text from the event data, and send it as a separate event "output-message" + if len(endData.Result.Outputs) > 0 && len(endData.Result.Outputs[0].Outputs) > 0 && endData.Result.Outputs[0].Outputs[0].Results != nil && endData.Result.Outputs[0].Outputs[0].Results.Message.Text != "" { + sendEvent(ctx, streamChan, types.AgentStreamEvent{ + Event: "output-message", + Data: []byte(endData.Result.Outputs[0].Outputs[0].Results.Message.Text), + }) + } + default: + slog.Warn("unknown event type", slog.String("session_id", contentID), slog.String("event", streamEvent.Event)) + } + } + + if err := scanner.Err(); err != nil { + slog.Error("scanner error", slog.String("session_id", contentID), slog.Any("error", err)) + } + }(hresp.Body) + + return streamChan, nil +} + +func sendEvent(ctx context.Context, ch chan<- types.AgentStreamEvent, msg types.AgentStreamEvent) { + select { + case ch <- msg: + case <-ctx.Done(): + slog.Debug("stream channel closed", slog.Any("error", ctx.Err())) + } +} diff --git a/builder/rpc/agenthub_svc_client_test.go b/builder/rpc/agenthub_svc_client_test.go new file mode 100644 index 000000000..58211b8e7 --- /dev/null +++ b/builder/rpc/agenthub_svc_client_test.go @@ -0,0 +1,264 @@ +package rpc + +import ( + "context" + "encoding/json" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func setupTestAgentHubClient(server *httptest.Server) *AgentHubSvcClientImpl { + return &AgentHubSvcClientImpl{ + hc: &HttpClient{ + endpoint: server.URL, + hc: server.Client(), + logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), + }, + token: "test-token", + } +} + +func TestAgentHubSvcClientImpl_DeleteAgentInstance_Success(t *testing.T) { + contentID := "test-content-id" + userUUID := "test-user-uuid" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/v1/opencsg/flows/delete", r.URL.String()) + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, userUUID, r.Header.Get("User-UUID")) + assert.Equal(t, "test-token", r.Header.Get("Token")) + + var req DeleteAgentInstanceRequest + err := json.NewDecoder(r.Body).Decode(&req) + require.NoError(t, err) + assert.Equal(t, []string{contentID}, req.IDs) + + resp := DeleteAgentInstanceResponse{ + IDs: []string{contentID}, + Total: 1, + } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(resp) + require.NoError(t, err) + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + err := client.DeleteAgentInstance(context.Background(), userUUID, contentID) + require.NoError(t, err) +} + +func TestAgentHubSvcClientImpl_DeleteAgentInstance_Non200Status(t *testing.T) { + contentID := "test-content-id" + userUUID := "test-user-uuid" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + err := client.DeleteAgentInstance(context.Background(), userUUID, contentID) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to delete agent instance in agenthub, status code: 500") +} + +func TestAgentHubSvcClientImpl_DeleteAgentInstance_ReadBodyError(t *testing.T) { + contentID := "test-content-id" + userUUID := "test-user-uuid" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Set Content-Length to force an error when reading + w.Header().Set("Content-Length", "100") + w.WriteHeader(http.StatusOK) + // Close the connection immediately to cause a read error + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + err := client.DeleteAgentInstance(context.Background(), userUUID, contentID) + require.Error(t, err) +} + +func TestAgentHubSvcClientImpl_CreateAgentInstance_Success(t *testing.T) { + userUUID := "test-user-uuid" + req := &CreateAgentInstanceRequest{ + Name: "Test Instance", + Description: "Test Description", + Data: json.RawMessage(`{"nodes": [], "edges": []}`), + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/v1/opencsg/flows/", r.URL.String()) + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, userUUID, r.Header.Get("User-UUID")) + assert.Equal(t, "test-token", r.Header.Get("Token")) + + var requestBody CreateAgentInstanceRequest + err := json.NewDecoder(r.Body).Decode(&requestBody) + require.NoError(t, err) + assert.Equal(t, req.Name, requestBody.Name) + assert.Equal(t, req.Description, requestBody.Description) + + resp := CreateAgentInstanceResponse{ + ID: "new-flow-id", + Name: "Test Instance", + Description: "Test Description", + Data: json.RawMessage(`{"nodes": [], "edges": []}`), + } + w.Header().Set("Content-Type", "application/json") + err = json.NewEncoder(w).Encode(resp) + require.NoError(t, err) + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + resp, err := client.CreateAgentInstance(context.Background(), userUUID, req) + require.NoError(t, err) + require.NotNil(t, resp) + assert.Equal(t, "new-flow-id", resp.ID) + assert.Equal(t, "Test Instance", resp.Name) + assert.Equal(t, "Test Description", resp.Description) +} + +func TestAgentHubSvcClientImpl_CreateAgentInstance_NilRequest(t *testing.T) { + userUUID := "test-user-uuid" + + client := setupTestAgentHubClient(httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))) + defer client.hc.hc.CloseIdleConnections() + + resp, err := client.CreateAgentInstance(context.Background(), userUUID, nil) + require.Error(t, err) + require.Nil(t, resp) + assert.Contains(t, err.Error(), "create agent instance request is nil") +} + +func TestAgentHubSvcClientImpl_CreateAgentInstance_Non200Status(t *testing.T) { + userUUID := "test-user-uuid" + req := &CreateAgentInstanceRequest{ + Name: "Test Instance", + Description: "Test Description", + Data: json.RawMessage(`{}`), + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + resp, err := client.CreateAgentInstance(context.Background(), userUUID, req) + require.Error(t, err) + require.Nil(t, resp) + assert.Contains(t, err.Error(), "failed to create agent instance in agenthub") +} + +func TestAgentHubSvcClientImpl_CreateAgentInstance_ReadBodyError(t *testing.T) { + userUUID := "test-user-uuid" + req := &CreateAgentInstanceRequest{ + Name: "Test Instance", + Description: "Test Description", + Data: json.RawMessage(`{}`), + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Set Content-Length to force an error when reading + w.Header().Set("Content-Length", "100") + w.WriteHeader(http.StatusOK) + // Close the connection immediately to cause a read error + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + resp, err := client.CreateAgentInstance(context.Background(), userUUID, req) + require.Error(t, err) + require.Nil(t, resp) +} + +func TestAgentHubSvcClientImpl_UpdateAgentInstance_Success(t *testing.T) { + contentID := "test-content-id" + userUUID := "test-user-uuid" + mcpEnabled := true + locked := false + req := &UpdateAgentInstanceRequest{ + Name: "Updated Instance", + Description: "Updated Description", + Data: json.RawMessage(`{"nodes": [{"id": "1"}], "edges": []}`), + FolderID: "folder-id", + MCPEnabled: &mcpEnabled, + Locked: &locked, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, "/api/v1/opencsg/flows/"+contentID, r.URL.String()) + assert.Equal(t, http.MethodPatch, r.Method) + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, userUUID, r.Header.Get("User-UUID")) + assert.Equal(t, "test-token", r.Header.Get("Token")) + + var requestBody UpdateAgentInstanceRequest + err := json.NewDecoder(r.Body).Decode(&requestBody) + require.NoError(t, err) + assert.Equal(t, req.Name, requestBody.Name) + assert.Equal(t, req.Description, requestBody.Description) + assert.Equal(t, req.FolderID, requestBody.FolderID) + assert.NotNil(t, requestBody.MCPEnabled) + assert.True(t, *requestBody.MCPEnabled) + assert.NotNil(t, requestBody.Locked) + assert.False(t, *requestBody.Locked) + + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + err := client.UpdateAgentInstance(context.Background(), userUUID, contentID, req) + require.NoError(t, err) +} + +func TestAgentHubSvcClientImpl_UpdateAgentInstance_NilRequest(t *testing.T) { + contentID := "test-content-id" + userUUID := "test-user-uuid" + + client := setupTestAgentHubClient(httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))) + defer client.hc.hc.CloseIdleConnections() + + err := client.UpdateAgentInstance(context.Background(), userUUID, contentID, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), "update agent instance request is nil") +} + +func TestAgentHubSvcClientImpl_UpdateAgentInstance_Non200Status(t *testing.T) { + contentID := "test-content-id" + userUUID := "test-user-uuid" + req := &UpdateAgentInstanceRequest{ + Name: "Updated Instance", + Description: "Updated Description", + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := setupTestAgentHubClient(server) + + err := client.UpdateAgentInstance(context.Background(), userUUID, contentID, req) + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to update agent instance in agenthub") +} diff --git a/builder/rpc/csgbot_svc_client.go b/builder/rpc/csgbot_svc_client.go new file mode 100644 index 000000000..a6b0f5cc9 --- /dev/null +++ b/builder/rpc/csgbot_svc_client.go @@ -0,0 +1,247 @@ +package rpc + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io" + "log/slog" + "net/http" + "strconv" + "time" + + "opencsg.com/csghub-server/common/errorx" + "opencsg.com/csghub-server/common/types" +) + +type CsgbotSvcClient interface { + DeleteWorkspaceFiles(ctx context.Context, userUUID string, username string, token string, agentName string) error + CreateKnowledgeBase(ctx context.Context, userUUID string, username string, token string, req *CreateKnowledgeBaseRequest) (*CreateKnowledgeBaseResponse, error) + DeleteKnowledgeBase(ctx context.Context, userUUID string, username string, token string, contentID string) error + UpdateKnowledgeBase(ctx context.Context, userUUID string, username string, token string, contentID string, req *types.UpdateAgentKnowledgeBaseRequest) error +} + +type CreateKnowledgeBaseRequest struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + ContentID string `json:"content_id"` + Public *bool `json:"public,omitempty"` +} + +type CreateKnowledgeBaseResponse struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description"` + Data json.RawMessage `json:"data"` + IsComponent bool `json:"is_component"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` + Webhook bool `json:"webhook"` + Tags []string `json:"tags"` + Locked bool `json:"locked"` + McpEnabled bool `json:"mcp_enabled"` + AccessType string `json:"access_type"` + UserUUID string `json:"user_id"` // user uuid + FolderID string `json:"folder_id"` +} + +type DeleteKnowledgeBaseRequest struct { + IDs []string `json:"ids"` +} + +type DeleteKnowledgeBaseResponse struct { + IDs []string `json:"ids"` + Total int `json:"total"` +} + +type CsgbotSvcHttpClientImpl struct { + hc *HttpClient +} + +func NewCsgbotSvcHttpClient(endpoint string, opts ...RequestOption) CsgbotSvcClient { + return &CsgbotSvcHttpClientImpl{ + hc: NewHttpClient(endpoint, opts...), + } +} + +// Delete workspace files for a code agent +// DELETE /api/v1/csgbot/codeAgent/{agent_name} +func (c *CsgbotSvcHttpClientImpl) DeleteWorkspaceFiles(ctx context.Context, userUUID string, username string, token string, agentName string) error { + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "service": "csgbot", + "api": "DELETE /api/v1/csgbot/codeAgent/{agent_name}", + } + + path := c.hc.endpoint + "/api/v1/csgbot/codeAgent/" + agentName + hreq, err := http.NewRequestWithContext(ctx, http.MethodDelete, path, nil) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("user_uuid", userUUID) + hreq.Header.Set("user_name", username) + hreq.Header.Set("user_token", token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to delete workspace files for code agent", "error", err, "rpc_error_ctx", rpcErrorCtx) + return errorx.RemoteSvcFail(errors.New("failed to delete workspace files for code agent"), rpcErrorCtx) + } + defer hresp.Body.Close() + + if hresp.StatusCode != http.StatusOK { + slog.ErrorContext(ctx, "failed to delete workspace files for code agent", "status_code", hresp.StatusCode, "rpc_error_ctx", rpcErrorCtx) + return errorx.RemoteSvcFail(errors.New("failed to delete workspace files for code agent"), rpcErrorCtx) + } + + return nil +} + +// Create knowledge base +// POST /api/v1/csgbot/langflow/flows/rag +func (c *CsgbotSvcHttpClientImpl) CreateKnowledgeBase(ctx context.Context, userUUID string, username string, token string, req *CreateKnowledgeBaseRequest) (*CreateKnowledgeBaseResponse, error) { + if req == nil { + return nil, errorx.BadRequest(errors.New("create knowledge base request is nil"), nil) + } + + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "service": "csgbot", + "api": "POST /api/v1/csgbot/langflow/flows/rag", + } + + jsonData, err := json.Marshal(req) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + buf := bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/csgbot/langflow/flows/rag" + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, path, buf) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("user_uuid", userUUID) + hreq.Header.Set("user_name", username) + hreq.Header.Set("user_token", token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to create knowledge base in csgbot service", "error", err, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to create knowledge base in csgbot service"), rpcErrorCtx) + } + defer hresp.Body.Close() + if hresp.StatusCode != http.StatusOK { + slog.ErrorContext(ctx, "failed to create knowledge base in csgbot service", "status_code", hresp.StatusCode, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.RemoteSvcFail(errors.New("failed to create knowledge base in csgbot service"), rpcErrorCtx) + } + + body, err := io.ReadAll(hresp.Body) + if err != nil { + slog.ErrorContext(ctx, "failed to create knowledge base in csgbot service", "error", err, "rpc_error_ctx", rpcErrorCtx) + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + var resp CreateKnowledgeBaseResponse + err = json.Unmarshal(body, &resp) + if err != nil { + return nil, errorx.InternalServerError(err, rpcErrorCtx) + } + return &resp, nil +} + +// Delete knowledge base +// POST /api/v1/csgbot/langflow/flows/rag/delete +func (c *CsgbotSvcHttpClientImpl) DeleteKnowledgeBase(ctx context.Context, userUUID string, username string, token string, contentID string) error { + rpcErrorCtx := map[string]any{ + "user_uuid": userUUID, + "content_id": contentID, + "service": "csgbot", + "api": "POST /api/v1/csgbot/langflow/flows/rag/delete", + } + var resp DeleteKnowledgeBaseResponse + + req := DeleteKnowledgeBaseRequest{ + IDs: []string{contentID}, + } + jsonData, err := json.Marshal(req) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + buf := bytes.NewBuffer(jsonData) + path := c.hc.endpoint + "/api/v1/csgbot/langflow/flows/rag/delete" + hreq, err := http.NewRequestWithContext(ctx, http.MethodPost, path, buf) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + + hreq.Header.Set("Content-Type", "application/json") + hreq.Header.Set("user_uuid", userUUID) + hreq.Header.Set("user_name", username) + hreq.Header.Set("user_token", token) + + hresp, err := c.hc.Do(hreq) + if err != nil { + slog.ErrorContext(ctx, "failed to delete knowledge base in csgbot service", "error", err, "rpc_error_ctx", rpcErrorCtx) + return errorx.RemoteSvcFail(errors.New("failed to delete knowledge base in csgbot service"), rpcErrorCtx) + } + defer hresp.Body.Close() + + if hresp.StatusCode != http.StatusOK { + return errorx.RemoteSvcFail(errors.New("failed to delete knowledge base in csgbot service, status code: "+strconv.Itoa(hresp.StatusCode)), rpcErrorCtx) + } + + body, err := io.ReadAll(hresp.Body) + if err != nil { + return errorx.InternalServerError(err, rpcErrorCtx) + } + if err := json.Unmarshal(body, &resp); err != nil { + return errorx.RemoteSvcFail(errors.New("failed to delete knowledge base in csgbot service, unmarshal response error: "+err.Error()), rpcErrorCtx) + } + + if resp.Total != 1 { + return errorx.RemoteSvcFail(errors.New("failed to delete knowledge base in csgbot service, total: "+strconv.Itoa(resp.Total)), rpcErrorCtx) + } + + if len(resp.IDs) == 0 { + return errorx.RemoteSvcFail(errors.New("failed to delete knowledge base in csgbot service, response IDs is empty"), rpcErrorCtx) + } + + if resp.IDs[0] != contentID { + return errorx.RemoteSvcFail(errors.New("failed to delete knowledge base in csgbot service, content ID mismatch: "+contentID+" != "+resp.IDs[0]), rpcErrorCtx) + } + return nil +} + +// Update knowledge base +// PUT /api/v1/csgbot/langflow/flows/rag/{content_id} +func (c *CsgbotSvcHttpClientImpl) UpdateKnowledgeBase(_ context.Context, _ string, _ string, _ string, _ string, _ *types.UpdateAgentKnowledgeBaseRequest) error { + return nil +} + +func NewCsgbotSvcHttpClientBuilder(endpoint string, opts ...RequestOption) CsgbotSvcClientBuilder { + return &CsgbotSvcHttpClientImpl{ + hc: NewHttpClient(endpoint, opts...), + } +} + +type CsgbotSvcClientBuilder interface { + WithRetry(attempts uint) CsgbotSvcClientBuilder + WithDelay(delay time.Duration) CsgbotSvcClientBuilder + Build() CsgbotSvcClient +} + +func (c *CsgbotSvcHttpClientImpl) WithRetry(attempts uint) CsgbotSvcClientBuilder { + c.hc = c.hc.WithRetry(attempts) + return c +} + +func (c *CsgbotSvcHttpClientImpl) WithDelay(delay time.Duration) CsgbotSvcClientBuilder { + c.hc = c.hc.WithDelay(delay) + return c +} + +func (c *CsgbotSvcHttpClientImpl) Build() CsgbotSvcClient { + return c +} diff --git a/builder/rpc/csgbot_svc_client_test.go b/builder/rpc/csgbot_svc_client_test.go new file mode 100644 index 000000000..1fd712af7 --- /dev/null +++ b/builder/rpc/csgbot_svc_client_test.go @@ -0,0 +1,416 @@ +package rpc + +import ( + "context" + "encoding/json" + "errors" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "opencsg.com/csghub-server/common/errorx" +) + +func setupTestCsgbotClient(server *httptest.Server) *CsgbotSvcHttpClientImpl { + return &CsgbotSvcHttpClientImpl{ + hc: &HttpClient{ + endpoint: server.URL, + hc: server.Client(), + logger: slog.New(slog.NewJSONHandler(os.Stdout, nil)), + }, + } +} + +func TestDeleteWorkspaceFiles_Success(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-agent-name" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify the request path + expectedPath := "/api/v1/csgbot/codeAgent/" + contentID + assert.Equal(t, expectedPath, r.URL.Path) + assert.Equal(t, http.MethodDelete, r.Method) + + // Verify headers + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, userUUID, r.Header.Get("user_uuid")) + assert.Equal(t, username, r.Header.Get("user_name")) + assert.Equal(t, token, r.Header.Get("user_token")) + + // Return success + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteWorkspaceFiles(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.NoError(t, err) +} + +func TestDeleteWorkspaceFiles_InternalServerError(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-agent-name" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteWorkspaceFiles(context.Background(), userUUID, username, token, contentID) + + // Verify result - should return RemoteSvcFail error + assert.Error(t, err) + assert.True(t, errors.Is(err, errorx.ErrRemoteServiceFail)) +} + +func TestNewCsgbotSvcHttpClient(t *testing.T) { + endpoint := "http://test-endpoint.com" + client := NewCsgbotSvcHttpClient(endpoint) + + // Verify the client is created + assert.NotNil(t, client) + + // Verify it implements the interface + var _ CsgbotSvcClient = client +} + +func TestCreateKnowledgeBase_Success(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + req := &CreateKnowledgeBaseRequest{ + Name: "Test KB", + Description: "Test description", + } + + expectedResponse := CreateKnowledgeBaseResponse{ + ID: "test-content-id", + Name: "Test KB", + Description: "Test description", + IsComponent: false, + Webhook: false, + Tags: []string{"tag1"}, + Locked: false, + McpEnabled: false, + AccessType: "PRIVATE", + UserUUID: userUUID, + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify the request path + expectedPath := "/api/v1/csgbot/langflow/flows/rag" + assert.Equal(t, expectedPath, r.URL.Path) + assert.Equal(t, http.MethodPost, r.Method) + + // Verify headers + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, userUUID, r.Header.Get("user_uuid")) + assert.Equal(t, username, r.Header.Get("user_name")) + assert.Equal(t, token, r.Header.Get("user_token")) + + // Return success with response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{ + "id": "test-content-id", + "name": "Test KB", + "description": "Test description", + "data": {}, + "is_component": false, + "created_at": "2025-01-01T00:00:00Z", + "updated_at": "2025-01-01T00:00:00Z", + "webhook": false, + "tags": ["tag1"], + "locked": false, + "mcp_enabled": false, + "access_type": "PRIVATE", + "user_id": "test-user-uuid", + "folder_id": "test-folder-id" + }`)) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + resp, err := client.CreateKnowledgeBase(context.Background(), userUUID, username, token, req) + + // Verify result + assert.NoError(t, err) + assert.NotNil(t, resp) + assert.Equal(t, expectedResponse.ID, resp.ID) + assert.Equal(t, expectedResponse.Name, resp.Name) + assert.Equal(t, expectedResponse.Description, resp.Description) + assert.Equal(t, expectedResponse.UserUUID, resp.UserUUID) +} + +func TestCreateKnowledgeBase_NilRequest(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + resp, err := client.CreateKnowledgeBase(context.Background(), userUUID, username, token, nil) + + // Verify result + assert.Error(t, err) + assert.Nil(t, resp) + assert.True(t, errors.Is(err, errorx.ErrBadRequest)) +} + +func TestCreateKnowledgeBase_Non200Status(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + req := &CreateKnowledgeBaseRequest{ + Name: "Test KB", + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + resp, err := client.CreateKnowledgeBase(context.Background(), userUUID, username, token, req) + + // Verify result + assert.Error(t, err) + assert.Nil(t, resp) + assert.True(t, errors.Is(err, errorx.ErrRemoteServiceFail)) +} + +func TestCreateKnowledgeBase_ReadBodyError(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + req := &CreateKnowledgeBaseRequest{ + Name: "Test KB", + } + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "1") + w.WriteHeader(http.StatusOK) + // Close the connection immediately to cause read error + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + resp, err := client.CreateKnowledgeBase(context.Background(), userUUID, username, token, req) + + // Verify result + assert.Error(t, err) + assert.Nil(t, resp) +} + +func TestDeleteKnowledgeBase_Success(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-content-id" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Verify the request path + expectedPath := "/api/v1/csgbot/langflow/flows/rag/delete" + assert.Equal(t, expectedPath, r.URL.Path) + assert.Equal(t, http.MethodPost, r.Method) + + // Verify headers + assert.Equal(t, "application/json", r.Header.Get("Content-Type")) + assert.Equal(t, userUUID, r.Header.Get("user_uuid")) + assert.Equal(t, username, r.Header.Get("user_name")) + assert.Equal(t, token, r.Header.Get("user_token")) + + // Verify request body + var reqBody DeleteKnowledgeBaseRequest + err := json.NewDecoder(r.Body).Decode(&reqBody) + assert.NoError(t, err) + assert.Equal(t, []string{contentID}, reqBody.IDs) + + // Return success with response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{ + "ids": ["test-content-id"], + "total": 1 + }`)) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteKnowledgeBase(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.NoError(t, err) +} + +func TestDeleteKnowledgeBase_Non200Status(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-content-id" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteKnowledgeBase(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.Error(t, err) + assert.True(t, errors.Is(err, errorx.ErrRemoteServiceFail)) +} + +func TestDeleteKnowledgeBase_ReadBodyError(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-content-id" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Length", "1") + w.WriteHeader(http.StatusOK) + // Close the connection immediately to cause read error + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteKnowledgeBase(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.Error(t, err) +} + +func TestDeleteKnowledgeBase_UnmarshalError(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-content-id" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`invalid json`)) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteKnowledgeBase(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.Error(t, err) + assert.Contains(t, err.Error(), "unmarshal response error") +} + +func TestDeleteKnowledgeBase_TotalMismatch(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-content-id" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{ + "ids": ["test-content-id"], + "total": 2 + }`)) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteKnowledgeBase(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.Error(t, err) + assert.Contains(t, err.Error(), "total: 2") +} + +func TestDeleteKnowledgeBase_EmptyIDs(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-content-id" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{ + "ids": [], + "total": 1 + }`)) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteKnowledgeBase(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.Error(t, err) + assert.Contains(t, err.Error(), "response IDs is empty") +} + +func TestDeleteKnowledgeBase_ContentIDMismatch(t *testing.T) { + userUUID := "test-user-uuid" + username := "test-username" + token := "test-token" + contentID := "test-content-id" + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{ + "ids": ["different-content-id"], + "total": 1 + }`)) + })) + defer server.Close() + + client := setupTestCsgbotClient(server) + + // Execute the test + err := client.DeleteKnowledgeBase(context.Background(), userUUID, username, token, contentID) + + // Verify result + assert.Error(t, err) + assert.Contains(t, err.Error(), "content ID mismatch") +}