|
7 | 7 | @desc: |
8 | 8 | """ |
9 | 9 | import os |
10 | | -import shutil |
| 10 | + |
| 11 | +from redis.sentinel import Sentinel |
11 | 12 |
|
12 | 13 | from maxkb.const import CONFIG, PROJECT_DIR, LOG_DIR |
13 | 14 |
|
14 | 15 | # celery相关配置 |
15 | 16 | celery_data_dir = os.path.join(PROJECT_DIR, 'data', 'celery_task') |
16 | 17 | if not os.path.exists(celery_data_dir) or not os.path.isdir(celery_data_dir): |
17 | 18 | os.makedirs(celery_data_dir) |
18 | | -broker_path = os.path.join(celery_data_dir, "celery_db.sqlite3") |
19 | | -backend_path = os.path.join(celery_data_dir, "celery_results.sqlite3") |
20 | | -# 使用sql_lite 当做broker 和 响应接收 |
21 | | -CELERY_BROKER_URL = f'sqla+sqlite:///{broker_path}' |
22 | | -CELERY_result_backend = f'db+sqlite:///{backend_path}' |
| 19 | +# Celery using redis as broker |
| 20 | +redis_celery_once_db = CONFIG.get("REDIS_CELERY_ONCE_DB", 3) |
| 21 | +redis_celery_db = CONFIG.get('REDIS_CELERY_DB', 2) |
| 22 | +CELERY_BROKER_URL_FORMAT = '%(protocol)s://:%(password)s@%(host)s:%(port)s/%(db)s' |
| 23 | +if CONFIG.get('REDIS_SENTINEL_MASTER') and CONFIG.get('REDIS_SENTINEL_SENTINELS'): |
| 24 | + sentinels_str = CONFIG.get('REDIS_SENTINEL_SENTINELS') |
| 25 | + sentinels = [ |
| 26 | + (host.strip(), int(port)) |
| 27 | + for hostport in sentinels_str.split(',') |
| 28 | + for host, port in [hostport.strip().split(':')] |
| 29 | + ] |
| 30 | + CELERY_BROKER_URL = ';'.join([CELERY_BROKER_URL_FORMAT % { |
| 31 | + 'protocol': 'sentinel', 'password': CONFIG.get('REDIS_PASSWORD'), |
| 32 | + 'host': item[0], 'port': item[1], 'db': redis_celery_db |
| 33 | + } for item in sentinels]) |
| 34 | + SENTINEL_OPTIONS = { |
| 35 | + 'master_name': CONFIG.get('REDIS_SENTINEL_MASTER'), |
| 36 | + } |
| 37 | + CELERY_BROKER_TRANSPORT_OPTIONS = CELERY_RESULT_BACKEND_TRANSPORT_OPTIONS = SENTINEL_OPTIONS |
| 38 | + |
| 39 | + # celery-once 哨兵模式配置 |
| 40 | + sentinel = Sentinel( |
| 41 | + sentinels, |
| 42 | + socket_timeout=5, |
| 43 | + password=CONFIG.get('REDIS_SENTINEL_PASSWORD', CONFIG.get('REDIS_PASSWORD')) |
| 44 | + ) |
| 45 | + master_host, master_port = sentinel.discover_master(CONFIG.get('REDIS_SENTINEL_MASTER')) |
| 46 | + celery_once_settings = { |
| 47 | + 'url': f"redis://:{CONFIG.get('REDIS_PASSWORD')}@{master_host}:{master_port}/{redis_celery_once_db}", |
| 48 | + 'master_name': CONFIG.get('REDIS_SENTINEL_MASTER'), |
| 49 | + 'password': CONFIG.get('REDIS_PASSWORD'), |
| 50 | + 'db': redis_celery_once_db, |
| 51 | + } |
| 52 | +else: |
| 53 | + CELERY_BROKER_URL = CELERY_BROKER_URL_FORMAT % { |
| 54 | + 'protocol': 'redis', |
| 55 | + 'password': CONFIG.get('REDIS_PASSWORD'), |
| 56 | + 'host': CONFIG.get('REDIS_HOST'), |
| 57 | + 'port': CONFIG.get('REDIS_PORT'), |
| 58 | + 'db': redis_celery_db |
| 59 | + } |
| 60 | + # celery-once 常规模式配置 |
| 61 | + celery_once_settings = { |
| 62 | + 'url': CELERY_BROKER_URL_FORMAT % { |
| 63 | + 'protocol': 'redis', |
| 64 | + 'password': CONFIG.get('REDIS_PASSWORD'), |
| 65 | + 'host': CONFIG.get('REDIS_HOST'), |
| 66 | + 'port': CONFIG.get('REDIS_PORT'), |
| 67 | + 'db': redis_celery_once_db, |
| 68 | + } |
| 69 | + } |
| 70 | +CELERY_result_backend = CELERY_BROKER_URL |
23 | 71 | CELERY_timezone = CONFIG.TIME_ZONE |
24 | 72 | CELERY_ENABLE_UTC = False |
25 | 73 | CELERY_task_serializer = 'pickle' |
|
33 | 81 | CELERY_WORKER_REDIRECT_STDOUTS_LEVEL = "INFO" |
34 | 82 | CELERY_TASK_SOFT_TIME_LIMIT = 3600 |
35 | 83 | CELERY_WORKER_CANCEL_LONG_RUNNING_TASKS_ON_CONNECTION_LOSS = True |
36 | | -CELERY_TASK_ACKS_LATE = True |
37 | | -celery_once_path = os.path.join(celery_data_dir, "celery_once") |
38 | | -try: |
39 | | - if os.path.exists(celery_once_path) and os.path.isdir(celery_once_path): |
40 | | - shutil.rmtree(celery_once_path) |
41 | | -except Exception as e: |
42 | | - pass |
| 84 | +# celery-once 配置 |
| 85 | +celery_once_settings['default_timeout'] = 3600 # 锁的默认超时时间(秒) |
43 | 86 | CELERY_ONCE = { |
44 | | - 'backend': 'celery_once.backends.File', |
45 | | - 'settings': {'location': celery_once_path} |
| 87 | + 'backend': 'celery_once.backends.Redis', |
| 88 | + 'settings': celery_once_settings |
46 | 89 | } |
47 | 90 | CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True |
48 | 91 | CELERY_LOG_DIR = os.path.join(LOG_DIR, 'celery') |
0 commit comments