diff --git a/internal/cluster/plugin_test.go b/internal/cluster/plugin_test.go index b7ebc890c..c1c3a332d 100644 --- a/internal/cluster/plugin_test.go +++ b/internal/cluster/plugin_test.go @@ -5,6 +5,7 @@ import ( "time" "github.com/google/uuid" + "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime" "github.com/langgenius/dify-plugin-daemon/pkg/entities" @@ -45,7 +46,8 @@ func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionM return nil } -func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) { +func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) error { + return nil } func getRandomPluginRuntime() fakePlugin { diff --git a/internal/core/plugin_daemon/generic.go b/internal/core/plugin_daemon/generic.go index 023d04caf..bdbcb5ec8 100644 --- a/internal/core/plugin_daemon/generic.go +++ b/internal/core/plugin_daemon/generic.go @@ -85,7 +85,7 @@ func GenericInvokePlugin[Req any, Rsp any]( listener.Close() }) - session.Write( + err := session.Write( session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST, session.Action, generic_invoke.GetInvokePluginMap( @@ -94,5 +94,5 @@ func GenericInvokePlugin[Req any, Rsp any]( ), ) - return response, nil + return response, err } diff --git a/internal/core/plugin_manager/debugging_runtime/io.go b/internal/core/plugin_manager/debugging_runtime/io.go index fc2d11554..462ea67ce 100644 --- a/internal/core/plugin_manager/debugging_runtime/io.go +++ b/internal/core/plugin_manager/debugging_runtime/io.go @@ -3,6 +3,8 @@ package debugging_runtime import ( "encoding/json" + "github.com/panjf2000/gnet/v2" + "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types" "github.com/langgenius/dify-plugin-daemon/internal/types/exception" "github.com/langgenius/dify-plugin-daemon/internal/utils/log" @@ -10,7 +12,6 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" "github.com/langgenius/dify-plugin-daemon/pkg/entities" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" - "github.com/panjf2000/gnet/v2" ) func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plugin_entities.SessionMessage] { @@ -52,8 +53,8 @@ func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plug return listener } -func (r *RemotePluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) { - r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error { +func (r *RemotePluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) error { + return r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error { return nil }) } diff --git a/internal/core/plugin_manager/local_runtime/io.go b/internal/core/plugin_manager/local_runtime/io.go index 339a51697..0cd3803d8 100644 --- a/internal/core/plugin_manager/local_runtime/io.go +++ b/internal/core/plugin_manager/local_runtime/io.go @@ -26,6 +26,6 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugi return listener } -func (r *LocalPluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) { - r.stdioHolder.write(append(data, '\n')) +func (r *LocalPluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) error { + return r.stdioHolder.write(append(data, '\n')) } diff --git a/internal/core/plugin_manager/serverless_runtime/io.go b/internal/core/plugin_manager/serverless_runtime/io.go index cf333088a..a957e3129 100644 --- a/internal/core/plugin_manager/serverless_runtime/io.go +++ b/internal/core/plugin_manager/serverless_runtime/io.go @@ -9,7 +9,6 @@ import ( "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types" "github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests" - "github.com/langgenius/dify-plugin-daemon/internal/utils/log" "github.com/langgenius/dify-plugin-daemon/internal/utils/parser" "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" "github.com/langgenius/dify-plugin-daemon/pkg/entities" @@ -24,11 +23,10 @@ func (r *ServerlessPluginRuntime) Listen(sessionId string) *entities.Broadcast[p } // For Serverless, write is equivalent to http request, it's not a normal stream like stdio and tcp -func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.PluginAccessAction, data []byte) { +func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.PluginAccessAction, data []byte) error { l, ok := r.listeners.Load(sessionId) if !ok { - log.Error("session %s not found", sessionId) - return + return fmt.Errorf("session %s not found", sessionId) } url, err := url.JoinPath(r.LambdaURL, "invoke") @@ -41,8 +39,7 @@ func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.Pl }), }) l.Close() - r.Error(fmt.Sprintf("Error creating request: %v", err)) - return + return fmt.Errorf("error creating request: %v", err) } routine.Submit(map[string]string{ @@ -138,4 +135,5 @@ func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.Pl }) } }) + return nil } diff --git a/internal/core/plugin_manager/watcher_test.go b/internal/core/plugin_manager/watcher_test.go index 8efcd2391..79db2e3ee 100644 --- a/internal/core/plugin_manager/watcher_test.go +++ b/internal/core/plugin_manager/watcher_test.go @@ -8,6 +8,7 @@ import ( cloudoss "github.com/langgenius/dify-cloud-kit/oss" "github.com/langgenius/dify-cloud-kit/oss/factory" + "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types" "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime" "github.com/langgenius/dify-plugin-daemon/internal/types/app" @@ -50,7 +51,8 @@ func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionM return nil } -func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) { +func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) error { + return nil } func (r *fakePlugin) WaitStarted() <-chan bool { diff --git a/internal/core/session_manager/session.go b/internal/core/session_manager/session.go index 00ee0607b..da1afed36 100644 --- a/internal/core/session_manager/session.go +++ b/internal/core/session_manager/session.go @@ -7,6 +7,7 @@ import ( "time" "github.com/google/uuid" + "github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation" "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types" "github.com/langgenius/dify-plugin-daemon/internal/utils/cache" @@ -184,6 +185,5 @@ func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, action access_types.Plugin if s.runtime == nil { return errors.New("runtime not bound") } - s.runtime.Write(s.ID, action, s.Message(event, data)) - return nil + return s.runtime.Write(s.ID, action, s.Message(event, data)) } diff --git a/pkg/entities/plugin_entities/runtime.go b/pkg/entities/plugin_entities/runtime.go index a898eee40..84a21e0ea 100644 --- a/pkg/entities/plugin_entities/runtime.go +++ b/pkg/entities/plugin_entities/runtime.go @@ -72,7 +72,7 @@ type ( // Listen listens for messages from the plugin Listen(session_id string) *entities.Broadcast[SessionMessage] // Write writes a message to the plugin - Write(session_id string, action access_types.PluginAccessAction, data []byte) + Write(session_id string, action access_types.PluginAccessAction, data []byte) error // Log adds a log to the plugin runtime state Log(string) // Warn adds a warning to the plugin runtime state