Skip to content

Commit f71071f

Browse files
authored
Fix the celery beat distributed lock timeout (#779)
* Fix the celery beat distributed lock timeout * Fix redis client not close after plugin parse * Optimize code layout and lock extend
1 parent 34c9c39 commit f71071f

File tree

1 file changed

+69
-110
lines changed

1 file changed

+69
-110
lines changed

backend/app/task/utils/schedulers.py

Lines changed: 69 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -31,36 +31,7 @@
3131
DEFAULT_MAX_INTERVAL = 5 # seconds
3232

3333
# 计划锁时长,避免重复创建
34-
DEFAULT_MAX_LOCK_TIMEOUT = 300 # seconds
35-
36-
# 锁检测周期,应小于计划锁时长
37-
DEFAULT_LOCK_INTERVAL = 60 # seconds
38-
39-
# Copied from:
40-
# https://github.com/andymccurdy/redis-py/blob/master/redis/lock.py#L33
41-
# Changes:
42-
# The second line from the bottom: The original Lua script intends
43-
# to extend time to (lock remaining time + additional time); while
44-
# the script here extend time to an expected expiration time.
45-
# KEYS[1] - lock name
46-
# ARGS[1] - token
47-
# ARGS[2] - additional milliseconds
48-
# return 1 if the locks time was extended, otherwise 0
49-
LUA_EXTEND_TO_SCRIPT = """
50-
local token = redis.call('get', KEYS[1])
51-
if not token or token ~= ARGV[1] then
52-
return 0
53-
end
54-
local expiration = redis.call('pttl', KEYS[1])
55-
if not expiration then
56-
expiration = 0
57-
end
58-
if expiration < 0 then
59-
return 0
60-
end
61-
redis.call('pexpire', KEYS[1], ARGV[2])
62-
return 1
63-
"""
34+
DEFAULT_MAX_LOCK_TIMEOUT = DEFAULT_MAX_INTERVAL * 5 # seconds
6435

6536
logger = get_logger('fba.schedulers')
6637

@@ -313,39 +284,26 @@ def __init__(self, *args, **kwargs):
313284
self._finalize = Finalize(self, self.sync, exitpriority=5)
314285
self.max_interval = kwargs.get('max_interval') or self.app.conf.beat_max_loop_interval or DEFAULT_MAX_INTERVAL
315286

316-
def setup_schedule(self):
287+
def install_default_entries(self, data):
317288
"""重写父函数"""
318-
logger.info('setup_schedule')
319-
tasks = self.schedule
320-
self.install_default_entries(tasks)
321-
self.update_from_dict(self.app.conf.beat_schedule)
322-
323-
async def get_all_task_schedulers(self):
324-
"""获取所有任务调度"""
325-
async with async_db_session() as db:
326-
logger.debug('DatabaseScheduler: Fetching database schedule')
327-
stmt = select(TaskScheduler).where(TaskScheduler.enabled == 1)
328-
query = await db.execute(stmt)
329-
tasks = query.scalars().all()
330-
s = {}
331-
for task in tasks:
332-
s[task.name] = self.Entry(task, app=self.app)
333-
return s
289+
entries = {}
290+
if self.app.conf.result_expires:
291+
entries.setdefault(
292+
'celery.backend_cleanup',
293+
{
294+
'task': 'celery.backend_cleanup',
295+
'schedule': schedules.crontab('0', '4', '*'),
296+
'options': {'expire_seconds': 12 * 3600},
297+
},
298+
)
299+
self.update_from_dict(entries)
334300

335-
def schedule_changed(self) -> bool:
336-
"""任务调度变更状态"""
337-
now = timezone.now()
338-
last_update = run_await(redis_client.get)(f'{settings.CELERY_REDIS_PREFIX}:last_update')
339-
if not last_update:
340-
run_await(redis_client.set)(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now))
301+
def schedules_equal(self, *args, **kwargs):
302+
"""重写父函数"""
303+
if self._heap_invalidated:
304+
self._heap_invalidated = False
341305
return False
342-
343-
last, ts = self._last_update, timezone.from_str(last_update)
344-
try:
345-
if ts and ts > (last if last else ts):
346-
return True
347-
finally:
348-
self._last_update = now
306+
return super().schedules_equal(*args, **kwargs)
349307

350308
def reserve(self, entry):
351309
"""重写父函数"""
@@ -354,15 +312,12 @@ def reserve(self, entry):
354312
self._dirty.add(new_entry.name)
355313
return new_entry
356314

357-
def close(self):
315+
def setup_schedule(self):
358316
"""重写父函数"""
359-
if self.lock:
360-
logger.info('beat: Releasing lock')
361-
if run_await(self.lock.owned)():
362-
run_await(self.lock.release)()
363-
self.lock = None
364-
365-
super().close()
317+
logger.info('setup_schedule')
318+
tasks = self.schedule
319+
self.install_default_entries(tasks)
320+
self.update_from_dict(self.app.conf.beat_schedule)
366321

367322
def sync(self):
368323
"""重写父函数"""
@@ -387,6 +342,25 @@ def sync(self):
387342
# 请稍后重试(仅针对失败的)
388343
self._dirty |= _failed
389344

345+
def tick(self, **kwargs):
346+
"""重写父函数"""
347+
if self.lock:
348+
logger.debug('beat: Extending lock...')
349+
run_await(self.lock.extend)(DEFAULT_MAX_LOCK_TIMEOUT, replace_ttl=True)
350+
351+
result = super().tick(**kwargs)
352+
return result
353+
354+
def close(self):
355+
"""重写父函数"""
356+
if self.lock:
357+
logger.info('beat: Releasing lock')
358+
if run_await(self.lock.owned)():
359+
run_await(self.lock.release)()
360+
self.lock = None
361+
362+
super().close()
363+
390364
def update_from_dict(self, beat_dict: dict):
391365
"""重写父函数"""
392366
s = {}
@@ -402,26 +376,32 @@ def update_from_dict(self, beat_dict: dict):
402376
tasks = self.schedule
403377
tasks.update(s)
404378

405-
def install_default_entries(self, data):
406-
"""重写父函数"""
407-
entries = {}
408-
if self.app.conf.result_expires:
409-
entries.setdefault(
410-
'celery.backend_cleanup',
411-
{
412-
'task': 'celery.backend_cleanup',
413-
'schedule': schedules.crontab('0', '4', '*'),
414-
'options': {'expire_seconds': 12 * 3600},
415-
},
416-
)
417-
self.update_from_dict(entries)
418-
419-
def schedules_equal(self, *args, **kwargs):
420-
"""重写父函数"""
421-
if self._heap_invalidated:
422-
self._heap_invalidated = False
379+
def schedule_changed(self) -> bool:
380+
"""任务调度变更状态"""
381+
now = timezone.now()
382+
last_update = run_await(redis_client.get)(f'{settings.CELERY_REDIS_PREFIX}:last_update')
383+
if not last_update:
384+
run_await(redis_client.set)(f'{settings.CELERY_REDIS_PREFIX}:last_update', timezone.to_str(now))
423385
return False
424-
return super().schedules_equal(*args, **kwargs)
386+
387+
last, ts = self._last_update, timezone.from_str(last_update)
388+
try:
389+
if ts and ts > (last if last else ts):
390+
return True
391+
finally:
392+
self._last_update = now
393+
394+
async def get_all_task_schedulers(self):
395+
"""获取所有任务调度"""
396+
async with async_db_session() as db:
397+
logger.debug('DatabaseScheduler: Fetching database schedule')
398+
stmt = select(TaskScheduler).where(TaskScheduler.enabled == 1)
399+
query = await db.execute(stmt)
400+
tasks = query.scalars().all()
401+
s = {}
402+
for task in tasks:
403+
s[task.name] = self.Entry(task, app=self.app)
404+
return s
425405

426406
@property
427407
def schedule(self) -> dict[str, ModelEntry]:
@@ -452,24 +432,8 @@ def schedule(self) -> dict[str, ModelEntry]:
452432
return self._schedule
453433

454434

455-
async def extend_scheduler_lock(lock):
456-
"""
457-
延长调度程序锁
458-
459-
:param lock: 计划程序锁
460-
:return:
461-
"""
462-
while True:
463-
await asyncio.sleep(DEFAULT_LOCK_INTERVAL)
464-
if lock:
465-
try:
466-
await lock.extend(DEFAULT_MAX_LOCK_TIMEOUT)
467-
except Exception as e:
468-
logger.error(f'Failed to extend lock: {e}')
469-
470-
471435
@beat_init.connect
472-
def acquire_distributed_beat_lock(sender=None, *args, **kwargs):
436+
def acquire_distributed_beat_lock(sender=None, **kwargs):
473437
"""
474438
尝试在启动时获取锁
475439
@@ -486,12 +450,7 @@ def acquire_distributed_beat_lock(sender=None, *args, **kwargs):
486450
timeout=DEFAULT_MAX_LOCK_TIMEOUT,
487451
sleep=scheduler.max_interval,
488452
)
489-
# overwrite redis-py's extend script
490-
# which will add additional timeout instead of extend to a new timeout
491-
lock.lua_extend = redis_client.register_script(LUA_EXTEND_TO_SCRIPT)
453+
492454
run_await(lock.acquire)()
493455
logger.info('beat: Acquired lock')
494456
scheduler.lock = lock
495-
496-
loop = asyncio.get_event_loop()
497-
loop.create_task(extend_scheduler_lock(scheduler.lock))

0 commit comments

Comments
 (0)