1+ # =================
2+ # 线程管理器
3+ # Version: 1.0.0
4+ # =================
5+ from concurrent .futures import ThreadPoolExecutor , Future
6+ from typing import Callable , Any , Optional , Dict
7+ from core .log .log_manager import log
8+ import threading
9+ import queue
10+ import time
11+ import psutil
12+ import os
13+
14+
15+ class ThreadManager :
16+ _instance = None
17+ _lock = threading .Lock ()
18+
19+ def __new__ (cls ):
20+ with cls ._lock :
21+ if cls ._instance is None :
22+ cls ._instance = super (ThreadManager , cls ).__new__ (cls )
23+ return cls ._instance
24+
25+ def __init__ (self ):
26+ if not hasattr (self , 'initialized' ):
27+ self .max_workers = self ._calculate_optimal_thread_count ()
28+ self .executor = ThreadPoolExecutor (max_workers = self .max_workers )
29+ self .tasks : Dict [str , Future ] = {}
30+ self .task_queue = queue .Queue ()
31+ self .results = {}
32+ self .task_stats = {}
33+ self .initialized = True
34+ self ._start_monitoring ()
35+ log .info (f"线程管理器初始化完成,当前线程数: { self .max_workers } " )
36+
37+ def _calculate_optimal_thread_count (self ) -> int :
38+ cpu_count = psutil .cpu_count (logical = True )
39+ memory = psutil .virtual_memory ()
40+
41+ # 基础线程数:CPU核心数 * 2
42+ base_threads = cpu_count * 2
43+
44+ # 根据内存使用情况调整
45+ memory_factor = 1.0
46+ if memory .percent > 80 :
47+ memory_factor = 0.5
48+ elif memory .percent > 60 :
49+ memory_factor = 0.7
50+
51+ optimal_threads = int (base_threads * memory_factor )
52+ return max (4 , min (optimal_threads , 256 ))
53+
54+ def _start_monitoring (self ):
55+ def monitor ():
56+ while True :
57+ try :
58+ self ._adjust_thread_pool ()
59+ time .sleep (30 )
60+ except Exception as e :
61+ log .error (f"线程池监控异常: { str (e )} " )
62+
63+ monitor_thread = threading .Thread (target = monitor , daemon = True )
64+ monitor_thread .start ()
65+
66+ def _adjust_thread_pool (self ):
67+ active_tasks = len ([t for t in self .tasks .values () if not t .done ()])
68+ current_workers = self .executor ._max_workers
69+ optimal_count = self ._calculate_optimal_thread_count ()
70+
71+ # 根据任务负载调整线程数
72+ if active_tasks > current_workers * 0.8 :
73+ new_workers = min (current_workers * 2 , 256 )
74+ elif active_tasks < current_workers * 0.2 :
75+ new_workers = max (current_workers // 2 , optimal_count )
76+ else :
77+ new_workers = optimal_count
78+
79+ if new_workers != current_workers :
80+ log .info (f"调整线程池大小: { current_workers } -> { new_workers } " )
81+ new_executor = ThreadPoolExecutor (max_workers = new_workers )
82+ old_executor = self .executor
83+ self .executor = new_executor
84+ old_executor .shutdown (wait = False )
85+
86+ def submit_task (self , task_id : str , func : Callable , * args , ** kwargs ) -> Future :
87+ try :
88+ self .task_stats [task_id ] = {
89+ 'start_time' : time .time (),
90+ 'status' : 'running'
91+ }
92+
93+ def wrapped_func (* args , ** kwargs ):
94+ try :
95+ result = func (* args , ** kwargs )
96+ self .task_stats [task_id ]['status' ] = 'completed'
97+ self .task_stats [task_id ]['end_time' ] = time .time ()
98+ return result
99+ except Exception as e :
100+ self .task_stats [task_id ]['status' ] = 'failed'
101+ self .task_stats [task_id ]['error' ] = str (e )
102+ raise
103+
104+ future = self .executor .submit (wrapped_func , * args , ** kwargs )
105+ self .tasks [task_id ] = future
106+ log .debug (f"提交任务: { task_id } " )
107+ return future
108+
109+ except Exception as e :
110+ log .error (f"提交任务失败 { task_id } : { str (e )} " )
111+ raise
112+
113+ def run_in_thread (self , func : Callable ) -> Callable :
114+ def wrapper (* args , ** kwargs ):
115+ return self .submit_task (
116+ f"task_{ time .time ()} _{ threading .get_ident ()} " ,
117+ func ,
118+ * args ,
119+ ** kwargs
120+ )
121+ return wrapper
122+
123+ def get_result (self , task_id : str , timeout : Optional [float ] = None ) -> Any :
124+ try :
125+ if task_id in self .tasks :
126+ return self .tasks [task_id ].result (timeout = timeout )
127+ return None
128+ except Exception as e :
129+ log .error (f"获取任务结果失败 { task_id } : { str (e )} " )
130+ return None
131+
132+ def cancel_task (self , task_id : str ) -> bool :
133+ if task_id in self .tasks :
134+ self .task_stats [task_id ]['status' ] = 'cancelled'
135+ return self .tasks [task_id ].cancel ()
136+ return False
137+
138+ def is_task_running (self , task_id : str ) -> bool :
139+ return task_id in self .tasks and not self .tasks [task_id ].done ()
140+
141+ def wait_for_task (self , task_id : str , timeout : Optional [float ] = None ) -> bool :
142+ try :
143+ if task_id in self .tasks :
144+ self .tasks [task_id ].result (timeout = timeout )
145+ return True
146+ return False
147+ except Exception :
148+ return False
149+
150+ def get_task_stats (self , task_id : str = None ) -> Dict :
151+ if task_id :
152+ return self .task_stats .get (task_id )
153+ return self .task_stats
154+
155+ def shutdown (self , wait : bool = True ):
156+ try :
157+ self .executor .shutdown (wait = wait )
158+ log .info ("线程管理器关闭" )
159+ except Exception as e :
160+ log .error (f"线程管理器关闭失败: { str (e )} " )
161+
162+ # 全局实例
163+ thread_manager = ThreadManager ()
0 commit comments