Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ require (
github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 // indirect
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/alicebob/miniredis/v2 v2.30.5 // indirect
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect
Expand Down Expand Up @@ -115,6 +117,7 @@ require (
github.com/x448/float16 v0.8.4 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/yuin/gopher-lua v1.1.0 // indirect
github.com/zeebo/errs v1.4.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/detectors/gcp v1.35.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 h1:6/0iUd0xrnX7qt+mLNRwg5c0PGv8wpE8K90ryANQwMI=
github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0=
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk=
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc=
github.com/alicebob/miniredis/v2 v2.30.5 h1:3r6kTHdKnuP4fkS8k2IrvSfxpxUTcW1SOL0wN7b7Dt0=
github.com/alicebob/miniredis/v2 v2.30.5/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g=
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4=
Expand Down Expand Up @@ -103,6 +107,9 @@ github.com/charmbracelet/x/ansi v0.2.3 h1:VfFN0NUpcjBRd4DnKfRaIRo53KRgey/nhOoEqo
github.com/charmbracelet/x/ansi v0.2.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw=
github.com/charmbracelet/x/term v0.2.0 h1:cNB9Ot9q8I711MyZ7myUR5HFWL/lc3OpU8jZ4hwm0x0=
github.com/charmbracelet/x/term v0.2.0/go.mod h1:GVxgxAbjUrmpvIINHIQnJJKpMlHiZ4cktEQCN6GWyF0=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I=
github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng=
github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y=
Expand Down Expand Up @@ -352,6 +359,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE=
github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw=
github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM=
github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA=
Expand Down Expand Up @@ -397,6 +406,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
47 changes: 38 additions & 9 deletions internal/core/control_panel/launcher_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/langgenius/dify-plugin-daemon/internal/core/local_runtime"
"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine"
"github.com/langgenius/dify-plugin-daemon/pkg/utils/cache"
"github.com/langgenius/dify-plugin-daemon/pkg/utils/log"
"github.com/langgenius/dify-plugin-daemon/pkg/utils/routine"
)

Expand Down Expand Up @@ -62,15 +65,41 @@ func (c *ControlPanel) LaunchLocalPlugin(
// init environment
// whatever it's a user request to launch a plugin or a new plugin was found
// by watch dog, initialize environment is a must
if err := runtime.InitEnvironment(decoder); err != nil {
err = errors.Join(err, fmt.Errorf("failed to init environment"))
// notify new runtime launch failed
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err)
})
// release semaphore
releaseLockAndSemaphore()
return nil, nil, err
// To avoid cross-pod races on Python venv creation, guard InitEnvironment with a Redis-based distributed lock.
{
lockKey := fmt.Sprintf("env_init_lock:%s", pluginUniqueIdentifier.String())
// expire: generous upper bound for env initialization; tryLockTimeout: wait up to the same duration
expire := 15 * time.Minute
tryTimeout := 2 * time.Minute
log.Info("acquiring distributed init lock", "plugin", pluginUniqueIdentifier.String(), "expire", expire.String())
if err := cache.Lock(lockKey, expire, tryTimeout); err != nil {
// failed to acquire the lock within timeout
err = errors.Join(err, fmt.Errorf("failed to acquire distributed env-init lock"))
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err)
})
// release semaphore and local lock
releaseLockAndSemaphore()
return nil, nil, err
}
defer func() {
if unlockErr := cache.Unlock(lockKey); unlockErr != nil {
log.Warn("failed to release distributed init lock", "plugin", pluginUniqueIdentifier.String(), "error", unlockErr.Error())
} else {
log.Info("released distributed init lock", "plugin", pluginUniqueIdentifier.String())
}
}()

if err := runtime.InitEnvironment(decoder); err != nil {
err = errors.Join(err, fmt.Errorf("failed to init environment"))
// notify new runtime launch failed
c.WalkNotifiers(func(notifier ControlPanelNotifier) {
notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err)
})
// release semaphore
releaseLockAndSemaphore()
return nil, nil, err
}
}

once := sync.Once{}
Expand Down
1 change: 1 addition & 0 deletions internal/core/local_runtime/environment_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
switch err {
case ErrVirtualEnvironmentInvalid:
// remove the venv and rebuild it
log.Warn("virtual environment for %s is invalid; deleting and recreating", p.Config.Identity())
p.deleteVirtualEnvironment()

// create virtual environment
Expand Down
12 changes: 8 additions & 4 deletions internal/core/local_runtime/setup_python_environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,15 @@ func (p *LocalPluginRuntime) checkPythonVirtualEnvironment() (*PythonVirtualEnvi

func (p *LocalPluginRuntime) deleteVirtualEnvironment() error {
// check if virtual environment exists
if _, err := os.Stat(path.Join(p.State.WorkingPath, envPath)); err != nil {
return nil
venvDir := path.Join(p.State.WorkingPath, envPath)
if _, err := os.Stat(venvDir); err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil
}
return err
}

return os.RemoveAll(path.Join(p.State.WorkingPath, envPath))
log.Warn("deleting existing Python virtual environment", "plugin", p.Config.Identity(), "path", venvDir)
return os.RemoveAll(venvDir)
}

func (p *LocalPluginRuntime) createVirtualEnvironment(
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/cache/helper/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ func EndpointCacheKey(hookId string) string {
},
":",
)
}
}
2 changes: 1 addition & 1 deletion pkg/utils/mapping/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) {
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
m.mu.Lock()
defer m.mu.Unlock()

v, loaded := m.store.LoadOrStore(key, value)
actual = v.(V)
if !loaded {
Expand Down
8 changes: 3 additions & 5 deletions pkg/utils/mapping/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestConcurrentAccess(t *testing.T) {

var wg sync.WaitGroup
wg.Add(workers)

for i := 0; i < workers; i++ {
go func(i int) {
defer wg.Done()
Expand All @@ -78,7 +78,7 @@ func TestLoadOrStore(t *testing.T) {
m := Map[string, interface{}]{}

// First store
val, loaded := m.LoadOrStore("data", []byte{1,2,3})
val, loaded := m.LoadOrStore("data", []byte{1, 2, 3})
if loaded || val.([]byte)[0] != 1 {
t.Error("Initial LoadOrStore failed")
}
Expand All @@ -90,8 +90,6 @@ func TestLoadOrStore(t *testing.T) {
}
}



// TestEdgeCases covers special scenarios
func TestEdgeCases(t *testing.T) {
t.Parallel()
Expand All @@ -108,4 +106,4 @@ func TestEdgeCases(t *testing.T) {
if m.Len() != 0 {
t.Error("Clear failed to reset map")
}
}
}
Loading