Skip to content

Commit dc4abd5

Browse files
committed
fix: make sure leftover traces are flushed before exiting
1 parent 9a52c1f commit dc4abd5

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

traincheck/config/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
# trace dumper configs:
88
BUFFER_SIZE = 1000 # number of events to buffer before dumping
9-
FLUSH_INTERVAL = 5 # seconds
9+
FLUSH_INTERVAL = 0.5 # seconds
1010

1111
# runner configs
1212
RUNNER_DEFAULT_ENV = {

traincheck/instrumentor/dumper.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343

4444
logger = logging.getLogger(__name__)
4545

46-
4746
def serialize(obj_dict: dict[str, object | str]) -> str:
4847
try:
4948
return orjson.dumps(obj_dict).decode("utf-8")
@@ -55,20 +54,23 @@ def serialize(obj_dict: dict[str, object | str]) -> str:
5554
def monitor_main_thread(main_thread, stop_event):
5655
main_thread.join() # Wait for the main thread to finish
5756
print("Main thread has finished or encountered an exception")
57+
print("Flushing all buffers to the trace log file")
5858
stop_event.set() # Signal the logging threads to stop
5959

6060

6161
def trace_dumper(task_queue: Queue, trace_file_name: str, stop_event: threading.Event):
6262
with open(trace_file_name, "w") as f:
6363
while True:
6464
try:
65-
trace = task_queue.get(timeout=0.5)
65+
trace = task_queue.get(timeout=FLUSH_INTERVAL * 2) # wait for 2x the flush interval, this is an arbitrary number, as long as it is larger than the flush interval, it should be fine.
6666
except Empty:
6767
if stop_event.is_set():
68+
print("Trace dumper thread has stopped.")
6869
break
6970
continue
7071
f.write(f"{trace}\n")
7172
task_queue.task_done()
73+
print("Trace dumper thread has finished normally...")
7274

7375

7476
def get_trace_API_dumper_queue():
@@ -178,6 +180,7 @@ def _flush_periodically(self):
178180
while not stop_event.is_set():
179181
time.sleep(self.flush_interval)
180182
self._flush()
183+
self._flush() # flush any remaining traces before exiting
181184

182185

183186
# THREAD_DATA.api_trace_buffer = TraceBuffer(get_trace_API_dumper_queue)

0 commit comments

Comments
 (0)