Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion internal/cluster/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/google/uuid"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
Expand Down Expand Up @@ -45,7 +46,8 @@ func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionM
return nil
}

func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) {
func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) error {
return nil
}

func getRandomPluginRuntime() fakePlugin {
Expand Down
4 changes: 2 additions & 2 deletions internal/core/plugin_daemon/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func GenericInvokePlugin[Req any, Rsp any](
listener.Close()
})

session.Write(
err := session.Write(
session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
session.Action,
generic_invoke.GetInvokePluginMap(
Expand All @@ -94,5 +94,5 @@ func GenericInvokePlugin[Req any, Rsp any](
),
)

return response, nil
return response, err
}
7 changes: 4 additions & 3 deletions internal/core/plugin_manager/debugging_runtime/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package debugging_runtime
import (
"encoding/json"

"github.com/panjf2000/gnet/v2"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
"github.com/langgenius/dify-plugin-daemon/internal/types/exception"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
"github.com/panjf2000/gnet/v2"
)

func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plugin_entities.SessionMessage] {
Expand Down Expand Up @@ -52,8 +53,8 @@ func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plug
return listener
}

func (r *RemotePluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) {
r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
func (r *RemotePluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) error {
return r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
return nil
})
}
4 changes: 2 additions & 2 deletions internal/core/plugin_manager/local_runtime/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,6 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugi
return listener
}

func (r *LocalPluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) {
r.stdioHolder.write(append(data, '\n'))
func (r *LocalPluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) error {
return r.stdioHolder.write(append(data, '\n'))
}
10 changes: 4 additions & 6 deletions internal/core/plugin_manager/serverless_runtime/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
"github.com/langgenius/dify-plugin-daemon/pkg/entities"
Expand All @@ -24,11 +23,10 @@ func (r *ServerlessPluginRuntime) Listen(sessionId string) *entities.Broadcast[p
}

// For Serverless, write is equivalent to http request, it's not a normal stream like stdio and tcp
func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.PluginAccessAction, data []byte) {
func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.PluginAccessAction, data []byte) error {
l, ok := r.listeners.Load(sessionId)
if !ok {
log.Error("session %s not found", sessionId)
return
return fmt.Errorf("session %s not found", sessionId)
}

url, err := url.JoinPath(r.LambdaURL, "invoke")
Expand All @@ -41,8 +39,7 @@ func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.Pl
}),
})
l.Close()
r.Error(fmt.Sprintf("Error creating request: %v", err))
return
return fmt.Errorf("error creating request: %v", err)
}

routine.Submit(map[string]string{
Expand Down Expand Up @@ -138,4 +135,5 @@ func (r *ServerlessPluginRuntime) Write(sessionId string, action access_types.Pl
})
}
})
return nil
}
4 changes: 3 additions & 1 deletion internal/core/plugin_manager/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
cloudoss "github.com/langgenius/dify-cloud-kit/oss"

"github.com/langgenius/dify-cloud-kit/oss/factory"

"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
"github.com/langgenius/dify-plugin-daemon/internal/types/app"
Expand Down Expand Up @@ -50,7 +51,8 @@ func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionM
return nil
}

func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) {
func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) error {
return nil
}

func (r *fakePlugin) WaitStarted() <-chan bool {
Expand Down
4 changes: 2 additions & 2 deletions internal/core/session_manager/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/google/uuid"

"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
Expand Down Expand Up @@ -184,6 +185,5 @@ func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, action access_types.Plugin
if s.runtime == nil {
return errors.New("runtime not bound")
}
s.runtime.Write(s.ID, action, s.Message(event, data))
return nil
return s.runtime.Write(s.ID, action, s.Message(event, data))
}
2 changes: 1 addition & 1 deletion pkg/entities/plugin_entities/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type (
// Listen listens for messages from the plugin
Listen(session_id string) *entities.Broadcast[SessionMessage]
// Write writes a message to the plugin
Write(session_id string, action access_types.PluginAccessAction, data []byte)
Write(session_id string, action access_types.PluginAccessAction, data []byte) error
// Log adds a log to the plugin runtime state
Log(string)
// Warn adds a warning to the plugin runtime state
Expand Down