Skip to content

Commit 9a0455c

Browse files
committed
More async blocking fixes
1 parent 997ea4e commit 9a0455c

File tree

2 files changed

+154
-33
lines changed

2 files changed

+154
-33
lines changed

backend/http_server.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,20 @@ async def handle_http_request(request, stats):
1616

1717
# Use asyncio.wait_for to enforce timeouts on executor tasks
1818
try:
19-
svmem_task = asyncio.create_task(loop.run_in_executor(None, psutil.virtual_memory))
20-
svmem = await asyncio.wait_for(svmem_task, timeout=TIMEOUT)
19+
# Run directly in executor without creating a task
20+
svmem = await asyncio.wait_for(
21+
loop.run_in_executor(None, psutil.virtual_memory),
22+
timeout=TIMEOUT
23+
)
2124

2225
parent = psutil.Process(os.getpid())
23-
child_count_task = asyncio.create_task(loop.run_in_executor(None, partial(len, parent.children(recursive=False))))
24-
child_count = await asyncio.wait_for(child_count_task, timeout=TIMEOUT)
26+
27+
# Get child count directly in executor
28+
get_child_count = lambda: len(parent.children(recursive=False))
29+
child_count = await asyncio.wait_for(
30+
loop.run_in_executor(None, get_child_count),
31+
timeout=TIMEOUT
32+
)
2533

2634
mem_use_percent = svmem.percent
2735

backend/server.py

Lines changed: 142 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -175,17 +175,37 @@ async def cleanup_chrome_by_pid(chrome_process, user_data_dir="/tmp", time_at_st
175175
while return_code_poll_status is None:
176176
logger.debug(f"WebSocket ID: {websocket.id} Chrome subprocess PID {chrome_process.pid} is still running attempting kill...")
177177
chrome_process.kill()
178-
# Flush IO queue
179-
chrome_process.communicate()
178+
179+
# Run communicate in executor to prevent blocking
180+
try:
181+
loop = asyncio.get_event_loop()
182+
await asyncio.wait_for(
183+
loop.run_in_executor(
184+
None,
185+
lambda: chrome_process.communicate(timeout=5)
186+
),
187+
timeout=7 # Add buffer for the executor overhead
188+
)
189+
except (asyncio.TimeoutError, subprocess.TimeoutExpired):
190+
logger.warning(f"WebSocket ID: {websocket.id} - Chrome process communicate timed out, continuing kill process")
180191

181192
await asyncio.sleep(2)
182193
try:
183-
os.kill(chrome_process.pid, 0)
194+
# Run os.kill in executor to avoid blocking if there's an issue
195+
await asyncio.wait_for(
196+
loop.run_in_executor(None, lambda: _check_process_exists(chrome_process.pid)),
197+
timeout=2.0
198+
)
199+
logger.error(f"Websocket {websocket.id} - Looks like {chrome_process.pid} didnt die, sending SIGKILL.")
200+
await asyncio.wait_for(
201+
loop.run_in_executor(None, lambda: os.kill(int(chrome_process.pid), signal.SIGKILL)),
202+
timeout=2.0
203+
)
184204
except OSError:
185205
logger.success(f"Websocket {websocket.id} - Chrome PID {chrome_process.pid} died cleanly, good.")
186-
else:
187-
logger.error(f"Websocket {websocket.id} - Looks like {chrome_process.pid} didnt die, sending SIGKILL.")
188-
os.kill(int(chrome_process.pid), signal.SIGKILL)
206+
except asyncio.TimeoutError:
207+
logger.warning(f"WebSocket ID: {websocket.id} - Process check timed out, assuming still running")
208+
189209
return_code_poll_status = chrome_process.poll()
190210

191211
# Should be dead now or already dead, report the status if it was something like a crash (SIG 11 etc)
@@ -199,28 +219,70 @@ async def cleanup_chrome_by_pid(chrome_process, user_data_dir="/tmp", time_at_st
199219

200220
# @todo context for cleaning up datadir? some auto-cleanup flag?
201221
# shutil.rmtree(user_data_dir)
222+
223+
def _check_process_exists(pid):
224+
"""Helper function to check if a process exists"""
225+
try:
226+
os.kill(pid, 0)
227+
return True
228+
except OSError:
229+
return False
202230

203231
async def _request_retry(url, num_retries=20, success_list=[200, 404], **kwargs):
204-
# On a healthy machine with no load, Chrome is usually fired up in 100ms
232+
# On a healthy machine with no load, Chrome is usually fired up in 100ms
233+
timeout = kwargs.pop('timeout', 5) # Default timeout of 5 seconds
234+
205235
for _ in range(num_retries):
206236
# Actually this sleep turns out to be HUGELY important for the stability of the Chrome CDP interface under high loads
207237
await asyncio.sleep(1)
208238

209239
try:
210-
response = requests.get(url, **kwargs)
240+
# Use asyncio.to_thread to run requests in a separate thread
241+
loop = asyncio.get_event_loop()
242+
response = await asyncio.wait_for(
243+
loop.run_in_executor(
244+
None,
245+
lambda: requests.get(url, timeout=timeout, **kwargs)
246+
),
247+
timeout=timeout + 1 # Add 1 second buffer
248+
)
249+
211250
if response.status_code in success_list:
212-
## Return response if successful
251+
# Return response if successful
213252
return response
214-
except requests.exceptions.ConnectionError:
215-
logger.warning("No response from Chrome, retrying..")
216-
pass
253+
except (requests.exceptions.ConnectionError,
254+
requests.exceptions.Timeout,
255+
asyncio.TimeoutError):
256+
logger.warning(f"No response from Chrome at {url}, retrying..")
257+
continue
258+
except Exception as e:
259+
logger.warning(f"Error connecting to Chrome: {str(e)}, retrying..")
260+
continue
217261

218262
raise requests.exceptions.ConnectionError
219263

220264

221-
def debug_log_line(logfile_path, text):
265+
async def debug_log_line(logfile_path, text):
222266
if logfile_path is None:
223267
return
268+
269+
try:
270+
# Run file I/O in executor to avoid blocking the event loop
271+
loop = asyncio.get_event_loop()
272+
await asyncio.wait_for(
273+
loop.run_in_executor(
274+
None,
275+
lambda: _write_log_line(logfile_path, text)
276+
),
277+
timeout=1.0 # Timeout after 1 second
278+
)
279+
except asyncio.TimeoutError:
280+
logger.warning(f"Log file write timed out for {logfile_path}")
281+
except Exception as e:
282+
logger.warning(f"Error writing to log file {logfile_path}: {str(e)}")
283+
284+
def _write_log_line(logfile_path, text):
285+
"""Synchronous helper for file writing operation"""
224286
with open(logfile_path, 'a') as f:
225287
f.write(f"{time.time()} - {text}\n")
226288

@@ -329,9 +391,9 @@ async def launchPuppeteerChromeProxy(websocket, path):
329391

330392
# 10mb, keep in mind theres screenshots.
331393
try:
332-
debug_log_line(text=f"Attempting connection to {chrome_websocket_url}", logfile_path=debug_log)
394+
await debug_log_line(text=f"Attempting connection to {chrome_websocket_url}", logfile_path=debug_log)
333395
async with websockets.connect(chrome_websocket_url, max_size=None, max_queue=None) as ws:
334-
debug_log_line(text=f"Connected to {chrome_websocket_url}", logfile_path=debug_log)
396+
await debug_log_line(text=f"Connected to {chrome_websocket_url}", logfile_path=debug_log)
335397
taskA = asyncio.create_task(hereToChromeCDP(puppeteer_ws=ws, chrome_websocket=websocket, debug_log=debug_log))
336398
taskB = asyncio.create_task(puppeteerToHere(puppeteer_ws=ws, chrome_websocket=websocket, debug_log=debug_log))
337399
await taskA
@@ -341,48 +403,99 @@ async def launchPuppeteerChromeProxy(websocket, path):
341403
logger.critical(f"WebSocket ID: {websocket.id} - Chrome debug output STDERR: {stderr} STDOUT: {stdout}")
342404
txt = f"Something bad happened when connecting to Chrome CDP at {chrome_websocket_url} (After getting good Chrome CDP URL from {chrome_json_info_url}) - '{str(e)}'"
343405
logger.error(f"WebSocket ID: {websocket.id} - "+txt)
344-
debug_log_line(text="Exception: " + txt, logfile_path=debug_log)
406+
await debug_log_line(text="Exception: " + txt, logfile_path=debug_log)
345407
chrome_process.kill()
346408

347409

348410

349411
logger.success(f"Websocket {websocket.id} - Connection done!")
350-
debug_log_line(text=f"Websocket {websocket.id} - Connection done!", logfile_path=debug_log)
412+
await debug_log_line(text=f"Websocket {websocket.id} - Connection done!", logfile_path=debug_log)
351413

352414
async def hereToChromeCDP(puppeteer_ws, chrome_websocket, debug_log=None):
415+
# Buffer size - how many characters to process at once, to avoid blocking on large messages
416+
buffer_size = 8192
417+
353418
try:
354419
async for message in puppeteer_ws:
355420
if debug_log:
356-
debug_log_line(text=f"Chrome -> Puppeteer: {message[:1000]}", logfile_path=debug_log)
421+
await debug_log_line(text=f"Chrome -> Puppeteer: {message[:1000]}", logfile_path=debug_log)
357422
logger.trace(message[:1000])
358423

359424
# If it has the special counter, record it, this is handy for recording that the browser session actually sent a shutdown/ "IM DONE" message
360425
if 'SOCKPUPPET.specialcounter' in message[:200] and puppeteer_ws.id not in stats['special_counter']:
361426
stats['special_counter'].append(puppeteer_ws.id)
362427

363-
await chrome_websocket.send(message)
428+
# Large message handling - break it into chunks if needed
429+
if len(message) > buffer_size:
430+
# Log when processing large messages
431+
logger.debug(f"WebSocket ID: {puppeteer_ws.id} - Processing large message of size {len(message)} bytes")
432+
433+
# Process the message in executor to avoid blocking event loop with large JSON processing
434+
try:
435+
await asyncio.wait_for(
436+
chrome_websocket.send(message),
437+
timeout=5.0 # Add timeout for large message sending
438+
)
439+
except asyncio.TimeoutError:
440+
logger.warning(f"WebSocket ID: {puppeteer_ws.id} - Timeout sending large message of size {len(message)}")
441+
else:
442+
await chrome_websocket.send(message)
443+
except websockets.exceptions.ConnectionClosed:
444+
logger.debug(f"WebSocket ID: {puppeteer_ws.id} - Connection closed normally while sending")
364445
except Exception as e:
365-
logger.error(e)
446+
logger.error(f"WebSocket ID: {puppeteer_ws.id} - Error in hereToChromeCDP: {str(e)}")
366447

367448

368449
async def puppeteerToHere(puppeteer_ws, chrome_websocket, debug_log=None):
450+
# Buffer size - how many characters to process at once, to avoid blocking on large messages
451+
buffer_size = 8192
452+
369453
try:
370454
async for message in chrome_websocket:
371455
if debug_log:
372-
debug_log_line(text=f"Puppeteer -> Chrome: {message[:1000]}", logfile_path=debug_log)
456+
await debug_log_line(text=f"Puppeteer -> Chrome: {message[:1000]}", logfile_path=debug_log)
373457

374458
logger.trace(message[:1000])
459+
460+
# For debugging navigation events
375461
if message.startswith("{") and message.endswith("}") and 'Page.navigate' in message:
462+
# Run JSON parsing in executor for larger messages
463+
if len(message) > 1000:
464+
try:
465+
loop = asyncio.get_event_loop()
466+
m = await asyncio.wait_for(
467+
loop.run_in_executor(None, lambda: json.loads(message)),
468+
timeout=1.0
469+
)
470+
# Print out some debug so we know roughly whats going on
471+
logger.debug(f"{chrome_websocket.id} Page.navigate request called to '{m['params']['url']}'")
472+
except (asyncio.TimeoutError, json.JSONDecodeError, KeyError) as e:
473+
logger.warning(f"Error parsing navigation event: {str(e)}")
474+
else:
475+
# For smaller messages, parse directly
476+
try:
477+
m = json.loads(message)
478+
logger.debug(f"{chrome_websocket.id} Page.navigate request called to '{m['params']['url']}'")
479+
except Exception as e:
480+
pass
481+
482+
# Large message handling
483+
if len(message) > buffer_size:
484+
logger.debug(f"WebSocket ID: {chrome_websocket.id} - Processing large message of size {len(message)} bytes")
376485
try:
377-
m = json.loads(message)
378-
# Print out some debug so we know roughly whats going on
379-
logger.debug(f"{chrome_websocket.id} Page.navigate request called to '{m['params']['url']}'")
380-
except Exception as e:
381-
pass
382-
383-
await puppeteer_ws.send(message)
486+
await asyncio.wait_for(
487+
puppeteer_ws.send(message),
488+
timeout=5.0 # Add timeout for large message sending
489+
)
490+
except asyncio.TimeoutError:
491+
logger.warning(f"WebSocket ID: {chrome_websocket.id} - Timeout sending large message of size {len(message)}")
492+
else:
493+
await puppeteer_ws.send(message)
494+
495+
except websockets.exceptions.ConnectionClosed:
496+
logger.debug(f"WebSocket ID: {chrome_websocket.id} - Connection closed normally while receiving")
384497
except Exception as e:
385-
logger.error(e)
498+
logger.error(f"WebSocket ID: {chrome_websocket.id} - Error in puppeteerToHere: {str(e)}")
386499

387500

388501
async def stats_thread_func():

0 commit comments

Comments
 (0)