diff --git a/go.mod b/go.mod index f6329ec4f..b5ac161b3 100644 --- a/go.mod +++ b/go.mod @@ -37,6 +37,8 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.27.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.51.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 // indirect + github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect + github.com/alicebob/miniredis/v2 v2.30.5 // indirect github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible // indirect github.com/atotto/clipboard v0.1.4 // indirect github.com/aws/aws-sdk-go-v2 v1.36.3 // indirect @@ -115,6 +117,7 @@ require ( github.com/x448/float16 v0.8.4 // indirect github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect + github.com/yuin/gopher-lua v1.1.0 // indirect github.com/zeebo/errs v1.4.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.35.0 // indirect diff --git a/go.sum b/go.sum index f3144ba79..85fecfbe3 100644 --- a/go.sum +++ b/go.sum @@ -41,6 +41,10 @@ github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/cloudmock v0 github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0 h1:6/0iUd0xrnX7qt+mLNRwg5c0PGv8wpE8K90ryANQwMI= github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.51.0/go.mod h1:otE2jQekW/PqXk1Awf5lmfokJx4uwuqcj1ab5SpGeW0= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a h1:HbKu58rmZpUGpz5+4FfNmIU+FmZg2P3Xaj2v2bfNWmk= +github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a/go.mod h1:SGnFV6hVsYE877CKEZ6tDNTjaSXYUk6QqoIK6PrAtcc= +github.com/alicebob/miniredis/v2 v2.30.5 h1:3r6kTHdKnuP4fkS8k2IrvSfxpxUTcW1SOL0wN7b7Dt0= +github.com/alicebob/miniredis/v2 v2.30.5/go.mod h1:b25qWj4fCEsBeAAR2mlb0ufImGC6uH3VlUfb/HS5zKg= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible h1:8psS8a+wKfiLt1iVDX79F7Y6wUM49Lcha2FMXt4UM8g= github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= github.com/atotto/clipboard v0.1.4 h1:EH0zSVneZPSuFR11BlR9YppQTVDbh5+16AmcJi4g1z4= @@ -103,6 +107,9 @@ github.com/charmbracelet/x/ansi v0.2.3 h1:VfFN0NUpcjBRd4DnKfRaIRo53KRgey/nhOoEqo github.com/charmbracelet/x/ansi v0.2.3/go.mod h1:dk73KoMTT5AX5BsX0KrqhsTqAnhZZoCBjs7dGWp4Ktw= github.com/charmbracelet/x/term v0.2.0 h1:cNB9Ot9q8I711MyZ7myUR5HFWL/lc3OpU8jZ4hwm0x0= github.com/charmbracelet/x/term v0.2.0/go.mod h1:GVxgxAbjUrmpvIINHIQnJJKpMlHiZ4cktEQCN6GWyF0= +github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= +github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= +github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I= github.com/clbanning/mxj v1.8.4/go.mod h1:BVjHeAH+rl9rs6f+QIpeRl0tfu10SXn1pUSa5PVGJng= github.com/cloudwego/base64x v0.1.4 h1:jwCgWpFanWmN8xoIUHa2rtzmkd5J2plF/dnLS6Xd/0Y= @@ -352,6 +359,8 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74= github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y= +github.com/yuin/gopher-lua v1.1.0 h1:BojcDhfyDWgU2f2TOzYK/g5p2gxMrku8oupLDqlnSqE= +github.com/yuin/gopher-lua v1.1.0/go.mod h1:GBR0iDaNXjAgGg9zfCvksxSRnQx76gclCIb7kdAd1Pw= github.com/zeebo/errs v1.4.0 h1:XNdoD/RRMKP7HD0UhJnIzUy74ISdGGxURlYG8HSWSfM= github.com/zeebo/errs v1.4.0/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= @@ -397,6 +406,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20210809222454-d867a43fc93e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/core/control_panel/launcher_local.go b/internal/core/control_panel/launcher_local.go index d498d9eb7..23a772618 100644 --- a/internal/core/control_panel/launcher_local.go +++ b/internal/core/control_panel/launcher_local.go @@ -4,10 +4,13 @@ import ( "errors" "fmt" "sync" + "time" "github.com/langgenius/dify-plugin-daemon/internal/core/local_runtime" "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities" routinepkg "github.com/langgenius/dify-plugin-daemon/pkg/routine" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/cache" + "github.com/langgenius/dify-plugin-daemon/pkg/utils/log" "github.com/langgenius/dify-plugin-daemon/pkg/utils/routine" ) @@ -62,15 +65,41 @@ func (c *ControlPanel) LaunchLocalPlugin( // init environment // whatever it's a user request to launch a plugin or a new plugin was found // by watch dog, initialize environment is a must - if err := runtime.InitEnvironment(decoder); err != nil { - err = errors.Join(err, fmt.Errorf("failed to init environment")) - // notify new runtime launch failed - c.WalkNotifiers(func(notifier ControlPanelNotifier) { - notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err) - }) - // release semaphore - releaseLockAndSemaphore() - return nil, nil, err + // To avoid cross-pod races on Python venv creation, guard InitEnvironment with a Redis-based distributed lock. + { + lockKey := fmt.Sprintf("env_init_lock:%s", pluginUniqueIdentifier.String()) + // expire: generous upper bound for env initialization; tryLockTimeout: wait up to the same duration + expire := 15 * time.Minute + tryTimeout := 2 * time.Minute + log.Info("acquiring distributed init lock", "plugin", pluginUniqueIdentifier.String(), "expire", expire.String()) + if err := cache.Lock(lockKey, expire, tryTimeout); err != nil { + // failed to acquire the lock within timeout + err = errors.Join(err, fmt.Errorf("failed to acquire distributed env-init lock")) + c.WalkNotifiers(func(notifier ControlPanelNotifier) { + notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err) + }) + // release semaphore and local lock + releaseLockAndSemaphore() + return nil, nil, err + } + defer func() { + if unlockErr := cache.Unlock(lockKey); unlockErr != nil { + log.Warn("failed to release distributed init lock", "plugin", pluginUniqueIdentifier.String(), "error", unlockErr.Error()) + } else { + log.Info("released distributed init lock", "plugin", pluginUniqueIdentifier.String()) + } + }() + + if err := runtime.InitEnvironment(decoder); err != nil { + err = errors.Join(err, fmt.Errorf("failed to init environment")) + // notify new runtime launch failed + c.WalkNotifiers(func(notifier ControlPanelNotifier) { + notifier.OnLocalRuntimeStartFailed(pluginUniqueIdentifier, err) + }) + // release semaphore + releaseLockAndSemaphore() + return nil, nil, err + } } once := sync.Once{} diff --git a/internal/core/local_runtime/environment_python.go b/internal/core/local_runtime/environment_python.go index 9ed3399eb..054236f99 100644 --- a/internal/core/local_runtime/environment_python.go +++ b/internal/core/local_runtime/environment_python.go @@ -19,6 +19,7 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error { switch err { case ErrVirtualEnvironmentInvalid: // remove the venv and rebuild it + log.Warn("virtual environment for %s is invalid; deleting and recreating", p.Config.Identity()) p.deleteVirtualEnvironment() // create virtual environment diff --git a/internal/core/local_runtime/setup_python_environment.go b/internal/core/local_runtime/setup_python_environment.go index d3d503159..84433cb1c 100644 --- a/internal/core/local_runtime/setup_python_environment.go +++ b/internal/core/local_runtime/setup_python_environment.go @@ -277,11 +277,15 @@ func (p *LocalPluginRuntime) checkPythonVirtualEnvironment() (*PythonVirtualEnvi func (p *LocalPluginRuntime) deleteVirtualEnvironment() error { // check if virtual environment exists - if _, err := os.Stat(path.Join(p.State.WorkingPath, envPath)); err != nil { - return nil + venvDir := path.Join(p.State.WorkingPath, envPath) + if _, err := os.Stat(venvDir); err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil + } + return err } - - return os.RemoveAll(path.Join(p.State.WorkingPath, envPath)) + log.Warn("deleting existing Python virtual environment", "plugin", p.Config.Identity(), "path", venvDir) + return os.RemoveAll(venvDir) } func (p *LocalPluginRuntime) createVirtualEnvironment( diff --git a/pkg/utils/cache/helper/keys.go b/pkg/utils/cache/helper/keys.go index 816d8cc2c..32fd92891 100644 --- a/pkg/utils/cache/helper/keys.go +++ b/pkg/utils/cache/helper/keys.go @@ -22,4 +22,4 @@ func EndpointCacheKey(hookId string) string { }, ":", ) -} \ No newline at end of file +} diff --git a/pkg/utils/mapping/sync.go b/pkg/utils/mapping/sync.go index 58fe48d83..f46b64f1a 100644 --- a/pkg/utils/mapping/sync.go +++ b/pkg/utils/mapping/sync.go @@ -55,7 +55,7 @@ func (m *Map[K, V]) Range(f func(key K, value V) bool) { func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { m.mu.Lock() defer m.mu.Unlock() - + v, loaded := m.store.LoadOrStore(key, value) actual = v.(V) if !loaded { diff --git a/pkg/utils/mapping/sync_test.go b/pkg/utils/mapping/sync_test.go index 6676d36cf..be3435d1d 100644 --- a/pkg/utils/mapping/sync_test.go +++ b/pkg/utils/mapping/sync_test.go @@ -56,7 +56,7 @@ func TestConcurrentAccess(t *testing.T) { var wg sync.WaitGroup wg.Add(workers) - + for i := 0; i < workers; i++ { go func(i int) { defer wg.Done() @@ -78,7 +78,7 @@ func TestLoadOrStore(t *testing.T) { m := Map[string, interface{}]{} // First store - val, loaded := m.LoadOrStore("data", []byte{1,2,3}) + val, loaded := m.LoadOrStore("data", []byte{1, 2, 3}) if loaded || val.([]byte)[0] != 1 { t.Error("Initial LoadOrStore failed") } @@ -90,8 +90,6 @@ func TestLoadOrStore(t *testing.T) { } } - - // TestEdgeCases covers special scenarios func TestEdgeCases(t *testing.T) { t.Parallel() @@ -108,4 +106,4 @@ func TestEdgeCases(t *testing.T) { if m.Len() != 0 { t.Error("Clear failed to reset map") } -} \ No newline at end of file +}