-
Notifications
You must be signed in to change notification settings - Fork 600
Resource cleanup issues in stream_events_optimized (executor shutdown & queue race condition) #73
Copy link
Copy link
Open
Description
Hey! Found a couple of cleanup issues in stream_events_optimized that could cause resource leaks or dropped messages:
1. ThreadPoolExecutor not properly shut down
Current code (line 177 & 247):
- Global
cleanup_executoris never shut down - Local
executor.shutdown(wait=False)doesn't wait for threads to finish
Impact: Resource leaks and potentially abrupt thread termination
Suggested fix:
# At the top of the file, after cleanup_executor definition
import atexit
atexit.register(cleanup_executor.shutdown)
# Inside stream_events_optimized, modify the finally block:
finally:
cancel_event.set()
stream_queue.close()
try:
future.result(timeout=5.0) # Give more time for graceful shutdown
except Exception:
pass
finally:
executor.shutdown(wait=True, cancel_futures=True)2. Race condition in stream queue closure
Current code (line 247):
finally:
stream_queue.close()
future.result(timeout=1.0) # Queue closed BEFORE thread finishesImpact: Pipeline thread might try to write to closed queue → dropped messages or errors
Suggested fix:
finally:
cancel_event.set() # Signal pipeline to stop
try:
# Wait longer for pipeline thread to finish
future.result(timeout=10.0)
except Exception:
pass # Thread may have been cancelled
finally:
stream_queue.close() # Close queue after thread is done
executor.shutdown(wait=True, cancel_futures=True)Both are in the same cleanup flow, so fixing them together would make sense. Happy to help test or submit a PR if useful!
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels