diff --git a/.env.example b/.env.example index d248ef563..94166eb60 100644 --- a/.env.example +++ b/.env.example @@ -91,6 +91,12 @@ REDIS_SENTINEL_USERNAME= REDIS_SENTINEL_PASSWORD= REDIS_SENTINEL_SOCKET_TIMEOUT=0.1 +# redis cluster +# Cluster Format: `:,` +REDIS_USE_CLUSTERS=false +REDIS_CLUSTERS= +REDIS_CLUSTERS_PASSWORD= + DB_TYPE=postgresql DB_USERNAME=postgres DB_PASSWORD=difyai123456 diff --git a/internal/core/plugin_manager/manager.go b/internal/core/plugin_manager/manager.go index e0858eef5..793fc71ec 100644 --- a/internal/core/plugin_manager/manager.go +++ b/internal/core/plugin_manager/manager.go @@ -119,7 +119,17 @@ func (p *PluginManager) Launch(configuration *app.Config) { ); err != nil { log.Panic("init redis sentinel client failed: %s", err.Error()) } - } else { + } else if configuration.RedisUseClusters { + // use redis cluster mode + if err := cache.InitRedisClusterClient( + configuration.RedisClusters, + configuration.RedisClustersPassword, + configuration.RedisUseSsl, + ); err != nil { + log.Panic("init redis cluster client failed: %s", err.Error()) + } + log.Info("redis cluster client initialized") + }else { if err := cache.InitRedisClient( fmt.Sprintf("%s:%d", configuration.RedisHost, configuration.RedisPort), configuration.RedisUser, @@ -129,6 +139,7 @@ func (p *PluginManager) Launch(configuration *app.Config) { ); err != nil { log.Panic("init redis client failed: %s", err.Error()) } + log.Info("redis standalone client initialized") } invocation, err := calldify.NewDifyInvocationDaemon( diff --git a/internal/types/app/config.go b/internal/types/app/config.go index 1808c1e68..15840091e 100644 --- a/internal/types/app/config.go +++ b/internal/types/app/config.go @@ -121,6 +121,11 @@ type Config struct { RedisSentinelPassword string `envconfig:"REDIS_SENTINEL_PASSWORD"` RedisSentinelSocketTimeout float64 `envconfig:"REDIS_SENTINEL_SOCKET_TIMEOUT"` + // redis clusters + RedisUseClusters bool `envconfig:"REDIS_USE_CLUSTERS"` + RedisClusters []string `envconfig:"REDIS_CLUSTERS"` + RedisClustersPassword string `envconfig:"REDIS_CLUSTERS_PASSWORD"` + // database DBType string `envconfig:"DB_TYPE" default:"postgresql"` DBUsername string `envconfig:"DB_USERNAME" validate:"required"` diff --git a/pkg/utils/cache/redis.go b/pkg/utils/cache/redis.go index d0130d6b0..3333489a9 100644 --- a/pkg/utils/cache/redis.go +++ b/pkg/utils/cache/redis.go @@ -73,6 +73,25 @@ func InitRedisSentinelClient(sentinels []string, masterName, username, password, return nil } +// InitRedisClusterClient 初始化集群 Redis 客户端 +func InitRedisClusterClient(addrs []string, password string, useSsl bool) error { + opts := &redis.ClusterOptions{ + Addrs: addrs, + Password: password, + } + + if useSsl { + opts.TLSConfig = &tls.Config{} + } + + client = redis.NewClusterClient(opts) + + if _, err := client.Ping(context.Background()).Result(); err != nil { + return err + } + return nil +} + // Close the redis client func Close() error { if client == nil { @@ -508,11 +527,26 @@ func Expire(key string, time time.Duration, context ...redis.Cmdable) (bool, err return getCmdable(context...).Expire(ctx, serialKey(key), time).Result() } -func Transaction(fn func(redis.Pipeliner) error) error { +func Transaction(fn func(redis.Pipeliner) error, keys ...string) error { if client == nil { return ErrDBNotInit } + // Fix: If no keys provided, use plain Pipeline instead of Watch transaction + if len(keys) == 0 { + _, err := client.TxPipelined(ctx, fn) + if err == redis.Nil { + return nil + } + return err + } + + // Serialize watch keys + watchKeys := make([]string, len(keys)) + for i, key := range keys { + watchKeys[i] = serialKey(key) + } + return client.Watch(ctx, func(tx *redis.Tx) error { _, err := tx.TxPipelined(ctx, func(p redis.Pipeliner) error { return fn(p) @@ -521,7 +555,7 @@ func Transaction(fn func(redis.Pipeliner) error) error { return nil } return err - }) + }, watchKeys...) } func Publish(channel string, message any, context ...redis.Cmdable) error { @@ -537,8 +571,15 @@ func Publish(channel string, message any, context ...redis.Cmdable) error { } func Subscribe[T any](channel string) (<-chan T, func()) { - pubsub := client.Subscribe(ctx, channel) ch := make(chan T) + + if client == nil { + log.Error("redis client not initialized") + close(ch) + return ch, func() {} + } + + pubsub := client.Subscribe(ctx, channel) connectionEstablished := make(chan bool) go func() { diff --git a/pkg/utils/cache/redis_test.go b/pkg/utils/cache/redis_test.go index fae22d56b..3448f06e7 100644 --- a/pkg/utils/cache/redis_test.go +++ b/pkg/utils/cache/redis_test.go @@ -57,7 +57,7 @@ func TestRedisTransaction(t *testing.T) { } return errors.New("test transaction error") - }) + }, strings.Join([]string{TEST_PREFIX, "key"}, ":")) if err == nil { t.Errorf("transaction should return error") @@ -93,7 +93,7 @@ func TestRedisTransaction(t *testing.T) { } return nil - }) + }, strings.Join([]string{TEST_PREFIX, "key"}, ":")) if err != nil { t.Errorf("transaction should not return error")