Skip to content

Commit 8b94670

Browse files
refactor: split utils
1 parent 8d83f96 commit 8b94670

File tree

4 files changed

+749
-662
lines changed

4 files changed

+749
-662
lines changed

server/utils/__init__.py

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
2+
import logging
3+
import threading
4+
import fcntl
5+
import queue
6+
import atexit
7+
8+
# from .code_task_v1 import run_ai_code_task, _run_ai_code_task_internal
9+
from .code_task_v2 import run_ai_code_task_v2, _run_ai_code_task_v2_internal
10+
11+
# Configure logging
12+
logging.basicConfig(level=logging.INFO)
13+
logger = logging.getLogger(__name__)
14+
15+
16+
# Global Codex execution queue and lock for sequential processing
17+
codex_execution_queue = queue.Queue()
18+
codex_execution_lock = threading.Lock()
19+
codex_worker_thread = None
20+
codex_lock_file = '/tmp/codex_global_lock'
21+
22+
def init_codex_sequential_processor():
23+
"""Initialize the sequential Codex processor"""
24+
global codex_worker_thread
25+
26+
def codex_worker():
27+
"""Worker thread that processes Codex tasks sequentially"""
28+
logger.info("🔄 Codex sequential worker thread started")
29+
30+
while True:
31+
try:
32+
# Get the next task from the queue (blocks if empty)
33+
task_data = codex_execution_queue.get(timeout=1.0)
34+
if task_data is None: # Poison pill to stop the thread
35+
logger.info("🛑 Codex worker thread stopping")
36+
break
37+
38+
task_id, user_id, github_token, is_v2 = task_data
39+
logger.info(f"🎯 Processing Codex task {task_id} sequentially")
40+
41+
# Acquire file-based lock for additional safety
42+
try:
43+
with open(codex_lock_file, 'w') as lock_file:
44+
fcntl.flock(lock_file.fileno(), fcntl.LOCK_EX)
45+
logger.info(f"🔒 Global Codex lock acquired for task {task_id}")
46+
47+
# Execute the task
48+
if is_v2:
49+
_execute_codex_task_v2(task_id, user_id, github_token)
50+
# else:
51+
# _execute_codex_task_legacy(task_id)
52+
53+
logger.info(f"✅ Codex task {task_id} completed")
54+
55+
except Exception as e:
56+
logger.error(f"❌ Error executing Codex task {task_id}: {e}")
57+
finally:
58+
codex_execution_queue.task_done()
59+
60+
except queue.Empty:
61+
continue
62+
except Exception as e:
63+
logger.error(f"❌ Error in Codex worker thread: {e}")
64+
65+
# Start the worker thread if not already running
66+
with codex_execution_lock:
67+
if codex_worker_thread is None or not codex_worker_thread.is_alive():
68+
codex_worker_thread = threading.Thread(target=codex_worker, daemon=True)
69+
codex_worker_thread.start()
70+
logger.info("🚀 Codex sequential processor initialized")
71+
72+
def queue_codex_task(task_id, user_id=None, github_token=None, is_v2=True):
73+
"""Queue a Codex task for sequential execution"""
74+
init_codex_sequential_processor()
75+
76+
logger.info(f"📋 Queuing Codex task {task_id} for sequential execution")
77+
codex_execution_queue.put((task_id, user_id, github_token, is_v2))
78+
79+
# Wait for the task to be processed
80+
logger.info(f"⏳ Waiting for Codex task {task_id} to be processed...")
81+
codex_execution_queue.join()
82+
83+
def _execute_codex_task_v2(task_id: int, user_id: str, github_token: str):
84+
"""Execute Codex task v2 - internal method called by sequential processor"""
85+
# This will contain the actual execution logic
86+
return _run_ai_code_task_v2_internal(task_id, user_id, github_token)
87+
88+
# def _execute_codex_task_legacy(task_id):
89+
# """Execute legacy Codex task - internal method called by sequential processor"""
90+
# # This will contain the actual execution logic
91+
# return _run_ai_code_task_internal(task_id)
92+
93+
# Cleanup function to stop the worker thread
94+
def cleanup_codex_processor():
95+
"""Clean up the Codex processor on exit"""
96+
global codex_worker_thread
97+
if codex_worker_thread and codex_worker_thread.is_alive():
98+
logger.info("🧹 Shutting down Codex sequential processor")
99+
codex_execution_queue.put(None) # Poison pill
100+
codex_worker_thread.join(timeout=5.0)
101+
102+
atexit.register(cleanup_codex_processor)
103+

0 commit comments

Comments
 (0)