diff --git a/README.md b/README.md index db18f0013..523c89a79 100644 --- a/README.md +++ b/README.md @@ -93,6 +93,10 @@ For developers working on this codebase, see our comprehensive development docum - **[SRI Docs](docs/runtime/sri.md)** - Serverless Runtime Interface documentation +### Storage Migration + +- **[Storage Migration Guide](docs/storage-migration/guide.md)** - Migrate local plugin storage to cloud + ## Benchmark Refer to [Benchmark](https://langgenius.github.io/dify-plugin-daemon/benchmark-data/) diff --git a/cmd/migrate_storage/main.go b/cmd/migrate_storage/main.go new file mode 100644 index 000000000..638565441 --- /dev/null +++ b/cmd/migrate_storage/main.go @@ -0,0 +1,189 @@ +package main + +import ( + "flag" + "fmt" + "os" + "path" + "strings" + "time" + + "github.com/joho/godotenv" + "github.com/kelseyhightower/envconfig" + "github.com/langgenius/dify-cloud-kit/oss" + "github.com/langgenius/dify-plugin-daemon/internal/storagefactory" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" + "github.com/langgenius/dify-plugin-daemon/internal/utils/log" +) + +// migrateCategory represents a named subpath that we copy +type migrateCategory struct { + name string + path string +} + +// copyPrefix recursively copies files under a given prefix from src to dst. +func copyPrefix(src, dst oss.OSS, prefix string, dryRun bool) (files, skipped int, err error) { + // simple BFS traversal using a queue of prefixes + queue := []string{prefix} + + for len(queue) > 0 { + current := queue[0] + queue = queue[1:] + + // List current prefix + entries, listErr := src.List(current) + if listErr != nil { + return files, skipped, fmt.Errorf("list %s failed: %w", current, listErr) + } + + for _, e := range entries { + // e.Path is the full path relative to the storage root + if e.IsDir { + next := e.Path + if !strings.HasPrefix(next, current+"/") && next != current { + next = path.Join(current, next) + } + queue = append(queue, next) + continue + } + + // skip dot files + base := e.Path + if strings.HasPrefix(base, ".") || strings.Contains(base, "/.") { + skipped++ + continue + } + + // check if exists at destination + key := e.Path + if !strings.HasPrefix(key, current+"/") && key != current { + key = path.Join(current, key) + } + exists, exErr := dst.Exists(key) + if exErr == nil && exists { + skipped++ + continue + } + + if dryRun { + log.Info("DRYRUN copy %s", key) + files++ + continue + } + + // load and save + data, loadErr := src.Load(key) + if loadErr != nil { + return files, skipped, fmt.Errorf("load %s failed: %w", key, loadErr) + } + if saveErr := dst.Save(key, data); saveErr != nil { + return files, skipped, fmt.Errorf("save %s failed: %w", key, saveErr) + } + files++ + } + } + + return files, skipped, nil +} + +func main() { + // Load .env if present + _ = godotenv.Load() + + // CLI flags + var ( + sourceRootOverride string + only string + dryRun bool + ) + flag.StringVar(&sourceRootOverride, "source-root", "", "override PLUGIN_STORAGE_LOCAL_ROOT (default reads from .env)") + flag.StringVar(&only, "only", "", "comma-separated categories to migrate: packages,assets,installed") + flag.BoolVar(&dryRun, "dry-run", false, "list actions without uploading") + flag.Parse() + + // Read config from env + var cfg app.Config + if err := envconfig.Process("", &cfg); err != nil { + log.Panic("Error processing environment: %s", err.Error()) + } + cfg.SetDefault() + + // We don't need full Validate here; allow PLATFORM local/serverless etc. + // But ensure required pieces exist for destination + if cfg.PluginStorageType == "" { + log.Panic("DEST PLUGIN_STORAGE_TYPE is empty in env") + } + // Restrict: source must be local and destination must be cloud (non-local) + if cfg.PluginStorageType == oss.OSS_TYPE_LOCAL { + log.Panic("Destination PLUGIN_STORAGE_TYPE must be non-local (cloud). Local→Local migration is not allowed") + } + + // Override local root if provided + if sourceRootOverride != "" { + cfg.PluginStorageLocalRoot = sourceRootOverride + } + if cfg.PluginStorageLocalRoot == "" { + cfg.PluginStorageLocalRoot = "storage" + } + + // Build source (local) and destination (cloud) storage + // Storage construction is centralized in internal/storagefactory.New + src, err := storagefactory.New(oss.OSS_TYPE_LOCAL, &cfg) + if err != nil { + log.Panic("Init source(local) storage failed: %s", err.Error()) + } + dst, err := storagefactory.New(cfg.PluginStorageType, &cfg) + if err != nil { + log.Panic("Init destination(%s) storage failed: %s", cfg.PluginStorageType, err.Error()) + } + + // categories + cats := []migrateCategory{ + {name: "packages", path: cfg.PluginPackageCachePath}, + {name: "assets", path: cfg.PluginMediaCachePath}, + {name: "installed", path: cfg.PluginInstalledPath}, + } + + // filter by --only if provided + if only != "" { + allow := map[string]bool{} + for _, p := range strings.Split(only, ",") { + p = strings.TrimSpace(p) + if p != "" { + allow[p] = true + } + } + filtered := make([]migrateCategory, 0, len(cats)) + for _, c := range cats { + if allow[c.name] { + filtered = append(filtered, c) + } + } + cats = filtered + } + + if len(cats) == 0 { + fmt.Fprintln(os.Stderr, "nothing to migrate; check --only") + os.Exit(1) + } + + start := time.Now() + log.Info("Starting migration from local '%s' to '%s' bucket '%s'...", cfg.PluginStorageLocalRoot, cfg.PluginStorageType, cfg.PluginStorageOSSBucket) + + totalFiles := 0 + totalSkipped := 0 + for _, c := range cats { + log.Info("Migrating %s (%s)...", c.name, c.path) + n, s, err := copyPrefix(src, dst, c.path, dryRun) + if err != nil { + log.Panic("migrate %s failed: %s", c.name, err.Error()) + } + totalFiles += n + totalSkipped += s + log.Info("Done %s: copied=%d skipped=%d", c.name, n, s) + } + + dur := time.Since(start) + log.Info("Migration completed in %s. Copied=%d Skipped=%d", dur.String(), totalFiles, totalSkipped) +} diff --git a/docs/storage-migration/guide.md b/docs/storage-migration/guide.md new file mode 100644 index 000000000..2d6c744ac --- /dev/null +++ b/docs/storage-migration/guide.md @@ -0,0 +1,53 @@ +# Storage Migration Guide (Local → Cloud) + +This guide explains how to use the migration CLI to copy your local plugin storage to a cloud object storage (S3, COS, OSS, Azure Blob, GCS, OBS, TOS). + +- Prerequisites + - A target bucket/container that already exists and grants write access. + - Cloud storage configuration is provided via environment variables or `.env` (same variables as the daemon). + - Only “local → cloud” is supported; “local → local” is not allowed. + +- Required environment variables (examples) + - Basics + - `PLUGIN_STORAGE_TYPE`: Target storage type, e.g., `s3`, `tencent` (COS), `aliyun_oss`, `azure_blob`, `gcs`, `huawei_obs`, `volcengine_tos` + - `PLUGIN_STORAGE_OSS_BUCKET`: Target bucket/container name + - `PLUGIN_STORAGE_LOCAL_ROOT`: Local storage root, default `./storage` + - Provider credentials (as needed) + - AWS S3: `AWS_ACCESS_KEY`, `AWS_SECRET_KEY`, `AWS_REGION`, `S3_ENDPOINT` (optional), `S3_USE_PATH_STYLE`, `S3_USE_AWS` + - Tencent COS: `TENCENT_COS_SECRET_ID`, `TENCENT_COS_SECRET_KEY`, `TENCENT_COS_REGION` + - Other providers: see fields in `internal/server/server.go`. + +- What gets migrated + - `plugin_packages`: Plugin package cache + - `assets`: Plugin media/icons cache + - `plugin`: Installed plugin archives + +- How to run + - Direct (reads `.env`) + - `go run ./cmd/migrate_storage --dry-run` to preview + - `go run ./cmd/migrate_storage` to execute + - Build a binary + - `go build -o migrate-storage ./cmd/migrate_storage` + - `./migrate-storage --only packages,assets,installed` + +- Useful flags + - `--dry-run`: Print planned copies without uploading + - `--only`: Limit scope (comma-separated): `packages,assets,installed` + - `--source-root`: Override local storage root (default from `PLUGIN_STORAGE_LOCAL_ROOT`) + +- Behavior + - Idempotent: existing destination objects are skipped; safe to rerun + - Restriction: if `PLUGIN_STORAGE_TYPE=local`, the tool exits (local → cloud only) + +- Troubleshooting + - DNS/network errors: check connectivity, proxy, or private network policies + - Access denied: verify AccessKey/Secret, IAM/STS, container permissions, and bucket existence + - Local read failures: ensure `PLUGIN_STORAGE_LOCAL_ROOT` points to the correct directory structure + +- Directory layout reference + - Expected subdirectories under local root: + - `plugin_packages/` + - `assets/` + - `plugin/` + + diff --git a/docs/storage-migration/guide_cn.md b/docs/storage-migration/guide_cn.md new file mode 100644 index 000000000..e3cc24466 --- /dev/null +++ b/docs/storage-migration/guide_cn.md @@ -0,0 +1,52 @@ +# 存储迁移使用教程(本地 → 云) + +本教程介绍如何使用迁移 CLI 将本地插件存储迁移到云对象存储(S3、COS、OSS、Azure Blob、GCS、OBS、TOS)。 + +- 前提条件 + - 已存在可用的目标存储桶/容器,并具备写权限。 + - 在环境变量或 `.env` 中正确配置云存储信息(与守护进程相同的变量)。 + - 当前仅支持“本地 → 云”,不支持“本地 → 本地”。 + +- 需要的环境变量(示例) + - 基本 + - `PLUGIN_STORAGE_TYPE`: 目标存储类型,如 `s3`、`tencent`(COS)、`aliyun_oss`、`azure_blob`、`gcs`、`huawei_obs`、`volcengine_tos` + - `PLUGIN_STORAGE_OSS_BUCKET`: 目标桶/容器名 + - `PLUGIN_STORAGE_LOCAL_ROOT`: 本地存储根目录,默认 `./storage` + - 云厂商凭证(按需) + - AWS S3: `AWS_ACCESS_KEY`、`AWS_SECRET_KEY`、`AWS_REGION`、`S3_ENDPOINT`(可选)、`S3_USE_PATH_STYLE`、`S3_USE_AWS` + - 腾讯云 COS: `TENCENT_COS_SECRET_ID`、`TENCENT_COS_SECRET_KEY`、`TENCENT_COS_REGION` + - 其他云参见 `internal/server/server.go` 对应字段。 + +- 迁移内容 + - `plugin_packages`:插件包缓存 + - `assets`:插件媒体/图标缓存 + - `plugin`:已安装插件归档 + +- 运行方式 + - 直接运行(读取 `.env`) + - `go run ./cmd/migrate_storage --dry-run` 先预览 + - `go run ./cmd/migrate_storage` 正式迁移 + - 构建可执行文件 + - `go build -o migrate-storage ./cmd/migrate_storage` + - `./migrate-storage --only packages,assets,installed` + +- 常用参数 + - `--dry-run`:仅打印将要复制的对象,不实际上传 + - `--only`:限制迁移范围,逗号分隔:`packages,assets,installed` + - `--source-root`:覆盖本地存储根(默认取 `PLUGIN_STORAGE_LOCAL_ROOT`) + +- 行为说明 + - 幂等:目标端已存在的对象会跳过,可多次执行 + - 限制:若 `PLUGIN_STORAGE_TYPE=local`,程序将直接退出(仅支持本地 → 云) + +- 排障指引 + - DNS 或网络错误:检查本机网络、代理或云厂商私网策略 + - 权限拒绝:确认 AccessKey/Secret、IAM/STS、容器权限、桶/容器是否存在 + - 读取失败(本地文件不存在):确认 `PLUGIN_STORAGE_LOCAL_ROOT` 指向正确存储目录结构 + +- 文件结构参考 + - 本地根目录下的关键子目录: + - `plugin_packages/` + - `assets/` + - `plugin/` + diff --git a/internal/server/server.go b/internal/server/server.go index f6e7fcedf..cef6cc4ab 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1,79 +1,26 @@ package server import ( - "github.com/getsentry/sentry-go" - "github.com/langgenius/dify-cloud-kit/oss" - "github.com/langgenius/dify-cloud-kit/oss/factory" - "github.com/langgenius/dify-plugin-daemon/internal/cluster" - "github.com/langgenius/dify-plugin-daemon/internal/core/persistence" - "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager" - "github.com/langgenius/dify-plugin-daemon/internal/db" - "github.com/langgenius/dify-plugin-daemon/internal/types/app" - "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd" - "github.com/langgenius/dify-plugin-daemon/internal/utils/log" - "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" + "github.com/getsentry/sentry-go" + "github.com/langgenius/dify-cloud-kit/oss" + "github.com/langgenius/dify-plugin-daemon/internal/cluster" + "github.com/langgenius/dify-plugin-daemon/internal/core/persistence" + "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager" + "github.com/langgenius/dify-plugin-daemon/internal/db" + "github.com/langgenius/dify-plugin-daemon/internal/storagefactory" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" + "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd" + "github.com/langgenius/dify-plugin-daemon/internal/utils/log" + "github.com/langgenius/dify-plugin-daemon/internal/utils/routine" ) func initOSS(config *app.Config) oss.OSS { - // init storage - var storage oss.OSS - var err error - storage, err = factory.Load(config.PluginStorageType, oss.OSSArgs{ - Local: &oss.Local{ - Path: config.PluginStorageLocalRoot, - }, - S3: &oss.S3{ - UseAws: config.S3UseAWS, - Endpoint: config.S3Endpoint, - UsePathStyle: config.S3UsePathStyle, - AccessKey: config.AWSAccessKey, - SecretKey: config.AWSSecretKey, - Bucket: config.PluginStorageOSSBucket, - Region: config.AWSRegion, - UseIamRole: config.S3UseAwsManagedIam, - }, - TencentCOS: &oss.TencentCOS{ - Region: config.TencentCOSRegion, - SecretID: config.TencentCOSSecretId, - SecretKey: config.TencentCOSSecretKey, - Bucket: config.PluginStorageOSSBucket, - }, - AzureBlob: &oss.AzureBlob{ - ConnectionString: config.AzureBlobStorageConnectionString, - ContainerName: config.AzureBlobStorageContainerName, - }, - GoogleCloudStorage: &oss.GoogleCloudStorage{ - Bucket: config.PluginStorageOSSBucket, - CredentialsB64: config.GoogleCloudStorageCredentialsB64, - }, - AliyunOSS: &oss.AliyunOSS{ - Region: config.AliyunOSSRegion, - Endpoint: config.AliyunOSSEndpoint, - AccessKey: config.AliyunOSSAccessKeyID, - SecretKey: config.AliyunOSSAccessKeySecret, - AuthVersion: config.AliyunOSSAuthVersion, - Path: config.AliyunOSSPath, - Bucket: config.PluginStorageOSSBucket, - }, - HuaweiOBS: &oss.HuaweiOBS{ - AccessKey: config.HuaweiOBSAccessKey, - SecretKey: config.HuaweiOBSSecretKey, - Server: config.HuaweiOBSServer, - Bucket: config.PluginStorageOSSBucket, - }, - VolcengineTOS: &oss.VolcengineTOS{ - Region: config.VolcengineTOSRegion, - Endpoint: config.VolcengineTOSEndpoint, - AccessKey: config.VolcengineTOSAccessKey, - SecretKey: config.VolcengineTOSSecretKey, - Bucket: config.PluginStorageOSSBucket, - }, - }) + // build storage via shared factory + s, err := storagefactory.New(config.PluginStorageType, config) if err != nil { log.Panic("Failed to create storage: %s", err) } - - return storage + return s } func (app *App) Run(config *app.Config) { diff --git a/internal/storagefactory/factory.go b/internal/storagefactory/factory.go new file mode 100644 index 000000000..748547713 --- /dev/null +++ b/internal/storagefactory/factory.go @@ -0,0 +1,70 @@ +package storagefactory + +import ( + "github.com/langgenius/dify-cloud-kit/oss" + "github.com/langgenius/dify-cloud-kit/oss/factory" + "github.com/langgenius/dify-plugin-daemon/internal/types/app" +) + +// argsFromConfig maps app.Config to oss.OSSArgs. +func argsFromConfig(cfg *app.Config) oss.OSSArgs { + return oss.OSSArgs{ + Local: &oss.Local{Path: cfg.PluginStorageLocalRoot}, + S3: &oss.S3{ + UseAws: cfg.S3UseAWS, + Endpoint: cfg.S3Endpoint, + UsePathStyle: cfg.S3UsePathStyle, + AccessKey: cfg.AWSAccessKey, + SecretKey: cfg.AWSSecretKey, + Bucket: cfg.PluginStorageOSSBucket, + Region: cfg.AWSRegion, + UseIamRole: cfg.S3UseAwsManagedIam, + }, + TencentCOS: &oss.TencentCOS{ + Region: cfg.TencentCOSRegion, + SecretID: cfg.TencentCOSSecretId, + SecretKey: cfg.TencentCOSSecretKey, + Bucket: cfg.PluginStorageOSSBucket, + }, + AzureBlob: &oss.AzureBlob{ + ConnectionString: cfg.AzureBlobStorageConnectionString, + ContainerName: cfg.AzureBlobStorageContainerName, + }, + GoogleCloudStorage: &oss.GoogleCloudStorage{ + Bucket: cfg.PluginStorageOSSBucket, + CredentialsB64: cfg.GoogleCloudStorageCredentialsB64, + }, + AliyunOSS: &oss.AliyunOSS{ + Region: cfg.AliyunOSSRegion, + Endpoint: cfg.AliyunOSSEndpoint, + AccessKey: cfg.AliyunOSSAccessKeyID, + SecretKey: cfg.AliyunOSSAccessKeySecret, + AuthVersion: cfg.AliyunOSSAuthVersion, + Path: cfg.AliyunOSSPath, + Bucket: cfg.PluginStorageOSSBucket, + }, + HuaweiOBS: &oss.HuaweiOBS{ + AccessKey: cfg.HuaweiOBSAccessKey, + SecretKey: cfg.HuaweiOBSSecretKey, + Server: cfg.HuaweiOBSServer, + Bucket: cfg.PluginStorageOSSBucket, + }, + VolcengineTOS: &oss.VolcengineTOS{ + Region: cfg.VolcengineTOSRegion, + Endpoint: cfg.VolcengineTOSEndpoint, + AccessKey: cfg.VolcengineTOSAccessKey, + SecretKey: cfg.VolcengineTOSSecretKey, + Bucket: cfg.PluginStorageOSSBucket, + }, + } +} + +// New constructs an oss.OSS for the given type using the provided config. +// Returns the instance and any error encountered. +func New(t string, cfg *app.Config) (oss.OSS, error) { + s, err := factory.Load(t, argsFromConfig(cfg)) + if err != nil { + return nil, err + } + return s, nil +}