1- from typing import Any , Union
1+ import logging
2+ from typing import Any , Optional , Union
23
34import redis
45from redis .cache import CacheConfig
6+ from redis .client import Pipeline
57from redis .cluster import ClusterNode , RedisCluster
68from redis .connection import Connection , SSLConnection
9+ from redis .exceptions import LockNotOwnedError
10+ from redis .lock import Lock
711from redis .sentinel import Sentinel
812
913from configs import dify_config
@@ -101,6 +105,22 @@ def init_app(app: DifyApp):
101105 cache_config = clientside_cache_config ,
102106 )
103107 )
108+ elif dify_config .REDIS_USE_OBKV :
109+ assert dify_config .REDIS_SERIALIZATION_PROTOCOL < 3 , (
110+ "OBKV does not support RESP version 3. "
111+ "Please specify REDIS_SERIALIZATION_PROTOCOL=2 in the configuration file."
112+ )
113+ redis_params .update (
114+ {
115+ "host" : dify_config .REDIS_HOST ,
116+ "port" : dify_config .REDIS_PORT ,
117+ "username" : dify_config .REDIS_USERNAME ,
118+ "password" : dify_config .REDIS_PASSWORD ,
119+ "connection_class" : connection_class ,
120+ }
121+ )
122+ pool = redis .ConnectionPool (** redis_params )
123+ redis_client .initialize (OBKVClient (redis .Redis (connection_pool = pool )))
104124 else :
105125 redis_params .update (
106126 {
@@ -115,3 +135,51 @@ def init_app(app: DifyApp):
115135 redis_client .initialize (redis .Redis (connection_pool = pool ))
116136
117137 app .extensions ["redis" ] = redis_client
138+
139+
140+ class OceanBaseRedisLock (Lock ):
141+ def do_acquire (self , token : str ) -> bool :
142+ try :
143+ result = self .redis .execute_command ("SETNX" , self .name , token )
144+ except Exception as e :
145+ logging .info (f"Failed to acquire redis lock { self .name } , error: { e } " )
146+ result = False
147+ if result == 1 :
148+ self .redis .execute_command ("EXPIRE" , self .name , 60 )
149+ return True
150+ else :
151+ return False
152+
153+ def do_release (self , expected_token : str ) -> None :
154+ current_value = self .redis .get (self .name )
155+ if current_value == expected_token :
156+ redis_client .delete (self .name )
157+ logging .info (f"Lock released: { self .name } " )
158+ else :
159+ raise LockNotOwnedError (
160+ "Cannot release a lock that's no longer owned" ,
161+ lock_name = self .name ,
162+ )
163+
164+
165+ class OceanBasePipeLine (Pipeline ):
166+ def execute (self , raise_on_error = True ):
167+ """do nothing, OBKV cannot support redis pipeline"""
168+ logging .info ("executing pipeline: do nothing, OBKV cannot support redis pipeline" )
169+
170+
171+ class OBKVClient :
172+ def __init__ (self , client : redis .Redis ):
173+ self ._client = client
174+
175+ def lock (self , name : str , timeout : Optional [int ] = None , ** kwargs ):
176+ # Implementation using OceanBaseRedisLock
177+ return self ._client .lock (name , timeout = timeout , lock_class = OceanBaseRedisLock , ** kwargs )
178+
179+ def pipeline (self , transaction = True , shard_hint = None ) -> "Pipeline" :
180+ return OceanBasePipeLine (self .connection_pool , self .response_callbacks , transaction , shard_hint )
181+
182+ def __getattr__ (self , item ):
183+ if self ._client is None :
184+ raise RuntimeError ("Redis client is not initialized. Call init_app first." )
185+ return getattr (self ._client , item )
0 commit comments