Skip to content

Commit 3e7ad54

Browse files
committed
refactor: replace try_lock and un_lock with RedisLock for improved locking mechanism
1 parent c96de7a commit 3e7ad54

File tree

9 files changed

+89
-45
lines changed

9 files changed

+89
-45
lines changed

apps/chat/serializers/chat_record.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
ApplicationChatRecordQuerySerializers
2020
from common.db.search import page_search
2121
from common.exception.app_exception import AppApiException
22-
from common.utils.lock import try_lock, un_lock
22+
from common.utils.lock import RedisLock
2323

2424

2525
class VoteRequest(serializers.Serializer):
@@ -48,7 +48,8 @@ def vote(self, instance: Dict, with_valid=True):
4848
if with_valid:
4949
self.is_valid(raise_exception=True)
5050
VoteRequest(data=instance).is_valid(raise_exception=True)
51-
if not try_lock(self.data.get('chat_record_id')):
51+
rlock = RedisLock()
52+
if not rlock.try_lock(self.data.get('chat_record_id')):
5253
raise AppApiException(500,
5354
gettext(
5455
"Voting on the current session minutes, please do not send repeated requests"))
@@ -75,7 +76,7 @@ def vote(self, instance: Dict, with_valid=True):
7576
else:
7677
raise AppApiException(500, gettext("Already voted, please cancel first and then vote again"))
7778
finally:
78-
un_lock(self.data.get('chat_record_id'))
79+
rlock.un_lock(self.data.get('chat_record_id'))
7980
ChatCountSerializer(data={'chat_id': self.data.get('chat_id')}).update_chat()
8081
return True
8182

apps/common/event/__init__.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from .listener_manage import *
1313
from ..constants.cache_version import Cache_Version
1414
from ..db.sql_execute import update_execute
15+
from ..utils.lock import RedisLock
1516

1617
update_document_status_sql = """
1718
UPDATE "public"."document"
@@ -22,8 +23,8 @@
2223

2324
def run():
2425
from models_provider.models import Model, Status
25-
26-
if try_lock('event_init', 30 * 30):
26+
rlock = RedisLock()
27+
if rlock.try_lock('event_init', 30 * 30):
2728
try:
2829
# 修改Model状态为ERROR
2930
QuerySet(Model).filter(
@@ -36,4 +37,4 @@ def run():
3637
version, get_key = Cache_Version.SYSTEM.value
3738
cache.delete(get_key(key='rsa_key'), version=version)
3839
finally:
39-
un_lock('event_init')
40+
rlock.un_lock('event_init')

apps/common/event/listener_manage.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from common.config.embedding_config import VectorStore
2121
from common.db.search import native_search, get_dynamics_model, native_update
2222
from common.utils.common import get_file_content
23-
from common.utils.lock import try_lock, un_lock
23+
from common.utils.lock import RedisLock
2424
from common.utils.logger import maxkb_logger
2525
from common.utils.page_utils import page_desc
2626
from knowledge.models import Paragraph, Status, Document, ProblemParagraphMapping, TaskType, State,SourceType, SearchMode
@@ -253,7 +253,8 @@ def embedding_by_document(document_id, embedding_model: Embeddings, state_list=N
253253
"""
254254
if state_list is None:
255255
state_list = [State.PENDING, State.SUCCESS, State.FAILURE, State.REVOKE, State.REVOKED]
256-
if not try_lock('embedding:' + str(document_id)):
256+
rlock = RedisLock()
257+
if not rlock.try_lock('embedding:' + str(document_id)):
257258
return
258259
try:
259260
def is_the_task_interrupted():
@@ -290,7 +291,7 @@ def is_the_task_interrupted():
290291
ListenerManagement.post_update_document_status(document_id, TaskType.EMBEDDING)
291292
ListenerManagement.get_aggregation_document_status(document_id)()
292293
maxkb_logger.info(_('End--->Embedding document: {document_id}').format(document_id=document_id))
293-
un_lock('embedding:' + str(document_id))
294+
rlock.un_lock('embedding:' + str(document_id))
294295

295296
@staticmethod
296297
def embedding_by_knowledge(knowledge_id, embedding_model: Embeddings):

apps/common/job/clean_chat_job.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from application.models import Application, Chat, ChatRecord
1010
from common.job.scheduler import scheduler
11-
from common.utils.lock import try_lock, un_lock, lock
11+
from common.utils.lock import lock, RedisLock
1212
from common.utils.logger import maxkb_logger
1313
from knowledge.models import File
1414

@@ -70,7 +70,8 @@ def clean_chat_log_job_lock():
7070

7171

7272
def run():
73-
if try_lock('clean_chat_log_job', 30 * 30):
73+
rlock = RedisLock()
74+
if rlock.try_lock('clean_chat_log_job', 30 * 30):
7475
try:
7576
maxkb_logger.debug('get lock clean_chat_log_job')
7677

@@ -79,4 +80,4 @@ def run():
7980
existing_job.remove()
8081
scheduler.add_job(clean_chat_log_job, 'cron', hour='0', minute='5', id='clean_chat_log')
8182
finally:
82-
un_lock('clean_chat_log_job')
83+
rlock.un_lock('clean_chat_log_job')

apps/common/job/clean_debug_file_job.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from django.utils import timezone
66

77
from common.job.scheduler import scheduler
8-
from common.utils.lock import un_lock, try_lock, lock
8+
from common.utils.lock import lock, RedisLock
99
from common.utils.logger import maxkb_logger
1010
from knowledge.models import File, FileSourceType
1111

@@ -25,12 +25,14 @@ def clean_debug_file_lock():
2525
File.objects.filter(
2626
Q(create_time__lt=one_days_ago, source_type=FileSourceType.TEMPORARY_1_DAY.value) |
2727
Q(create_time__lt=two_hours_ago, source_type=FileSourceType.TEMPORARY_120_MINUTE.value) |
28-
Q(create_time__lt=minutes_30_ago, source_type=FileSourceType.TEMPORARY_30_MINUTE.value)).delete()
28+
Q(create_time__lt=minutes_30_ago, source_type=FileSourceType.TEMPORARY_30_MINUTE.value)
29+
).delete()
2930
maxkb_logger.debug(_('end clean debug file'))
3031

3132

3233
def run():
33-
if try_lock('clean_debug_file', 30 * 30):
34+
rlock = RedisLock()
35+
if rlock.try_lock('clean_debug_file', 30 * 30):
3436
try:
3537
maxkb_logger.debug('get lock clean_debug_file')
3638

@@ -39,4 +41,4 @@ def run():
3941
clean_debug_file_job.remove()
4042
scheduler.add_job(clean_debug_file, 'cron', hour='*', minute='*/30', second='0', id='clean_debug_file')
4143
finally:
42-
un_lock('clean_debug_file')
44+
rlock.un_lock('clean_debug_file')

apps/common/job/client_access_num_job.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111

1212
from application.models import ApplicationChatUserStats
1313
from common.job.scheduler import scheduler
14-
from common.utils.lock import try_lock, un_lock, lock
14+
from common.utils.lock import lock, RedisLock
1515
from common.utils.logger import maxkb_logger
1616

1717

@@ -28,7 +28,8 @@ def client_access_num_reset_job_lock():
2828

2929

3030
def run():
31-
if try_lock('access_num_reset', 30 * 30):
31+
rlock = RedisLock()
32+
if rlock.try_lock('access_num_reset', 30 * 30):
3233
try:
3334
maxkb_logger.debug('get lock access_num_reset')
3435

@@ -38,4 +39,4 @@ def run():
3839
scheduler.add_job(client_access_num_reset_job, 'cron', hour='0', minute='0', second='0',
3940
id='access_num_reset')
4041
finally:
41-
un_lock('access_num_reset')
42+
rlock.un_lock('access_num_reset')

apps/common/management/commands/services/services/gunicorn.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import subprocess
2+
13
from .base import BaseService
24
from ..hands import *
35

@@ -35,3 +37,15 @@ def cmd(self):
3537
@property
3638
def cwd(self):
3739
return APPS_DIR
40+
41+
def open_subprocess(self):
42+
# 复制当前环境变量,并设置 ENABLE_SCHEDULER=1
43+
env = os.environ.copy()
44+
env['ENABLE_SCHEDULER'] = '1'
45+
kwargs = {
46+
'cwd': self.cwd,
47+
'stderr': self.log_file,
48+
'stdout': self.log_file,
49+
'env': env
50+
}
51+
self._process = subprocess.Popen(self.cmd, **kwargs)

apps/common/utils/lock.py

Lines changed: 44 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,32 +6,47 @@
66
@date:2023/9/11 11:45
77
@desc:
88
"""
9-
from datetime import timedelta
9+
from functools import wraps
1010

11+
import uuid_utils.compat as uuid
1112
from django.core.cache import caches
13+
from django_redis import get_redis_connection
1214

1315
memory_cache = caches['default']
1416

17+
class RedisLock():
18+
def __init__(self):
19+
self.lock_value = None
1520

16-
def try_lock(key: str, timeout=None):
17-
"""
18-
获取锁
19-
:param key: 获取锁 key
20-
:param timeout 超时时间
21-
:return: 是否获取到锁
22-
"""
23-
if timeout is None:
24-
timeout = 3600 # 默认超时时间为3600秒
25-
return memory_cache.add(key, 'lock', timeout=timeout)
21+
def try_lock(self, key: str, timeout=None):
22+
"""
23+
获取锁
24+
:param key: 获取锁 key
25+
:param timeout 超时时间
26+
:return: 是否获取到锁
27+
"""
28+
redis_client = get_redis_connection("default")
29+
if timeout is None:
30+
timeout = 3600 # 默认超时时间为3600秒
31+
self.lock_value = str(uuid.uuid7())
32+
return redis_client.set(key, self.lock_value, nx=True, ex=timeout)
2633

2734

28-
def un_lock(key: str):
29-
"""
30-
解锁
31-
:param key: 解锁 key
32-
:return: 是否解锁成功
33-
"""
34-
return memory_cache.delete(key)
35+
def un_lock(self, key: str):
36+
"""
37+
解锁
38+
:param key: 解锁 key
39+
:return: 是否解锁成功
40+
"""
41+
redis_client = get_redis_connection("default")
42+
unlock_script = """
43+
if redis.call("get", KEYS[1]) == ARGV[1] then
44+
return redis.call("del", KEYS[1])
45+
else
46+
return 0
47+
end
48+
"""
49+
redis_client.eval(unlock_script, 1, key, self.lock_value)
3550

3651

3752
def lock(lock_key, timeout=None):
@@ -43,15 +58,19 @@ def lock(lock_key, timeout=None):
4358
4459
"""
4560

46-
def inner(func):
47-
def run(*args, **kwargs):
61+
def decorator(func):
62+
@wraps(func)
63+
def wrapper(*args, **kwargs):
4864
key = lock_key(*args, **kwargs) if callable(lock_key) else lock_key
65+
rlock = RedisLock()
66+
if not rlock.try_lock(key, timeout):
67+
# 获取锁失败,可自定义异常或返回
68+
return None
4969
try:
50-
if try_lock(key=key, timeout=timeout):
51-
return func(*args, **kwargs)
70+
return func(*args, **kwargs)
5271
finally:
53-
un_lock(key=key)
72+
rlock.un_lock(key)
5473

55-
return run
74+
return wrapper
5675

57-
return inner
76+
return decorator

apps/maxkb/wsgi.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ def post_handler():
2626
job.run()
2727
DatabaseModelManage.init()
2828

29-
post_handler()
29+
# 仅在web中启动定时任务,local_model celery 不需要
30+
if os.environ.get('ENABLE_SCHEDULER') == '1':
31+
post_handler()
32+
33+

0 commit comments

Comments
 (0)