|  | 
| 1 | 1 | import gc | 
| 2 | 2 | import inspect | 
| 3 | 3 | import logging as std_logging | 
|  | 4 | +import os | 
|  | 5 | +import queue | 
|  | 6 | +import threading | 
| 4 | 7 | from contextlib import nullcontext | 
| 5 | 8 | from dataclasses import dataclass | 
| 6 | 9 | from typing import Any, Callable, Dict, Optional, Union | 
| @@ -171,18 +174,34 @@ def run_benchmark(self, scenario: BenchmarkScenario): | 
| 171 | 174 |     def run_bencmarks_and_collate(self, scenarios: Union[BenchmarkScenario, list[BenchmarkScenario]], filename: str): | 
| 172 | 175 |         if not isinstance(scenarios, list): | 
| 173 | 176 |             scenarios = [scenarios] | 
| 174 |  | -        records = [] | 
|  | 177 | +        record_queue = queue.Queue() | 
|  | 178 | +        stop_signal = object() | 
|  | 179 | + | 
|  | 180 | +        def _writer_thread(): | 
|  | 181 | +            while True: | 
|  | 182 | +                item = record_queue.get() | 
|  | 183 | +                if item is stop_signal: | 
|  | 184 | +                    break | 
|  | 185 | +                df_row = pd.DataFrame([item]) | 
|  | 186 | +                write_header = not os.path.exists(filename) | 
|  | 187 | +                df_row.to_csv(filename, mode="a", header=write_header, index=False) | 
|  | 188 | +                record_queue.task_done() | 
|  | 189 | + | 
|  | 190 | +            record_queue.task_done() | 
|  | 191 | + | 
|  | 192 | +        writer = threading.Thread(target=_writer_thread, daemon=True) | 
|  | 193 | +        writer.start() | 
|  | 194 | + | 
| 175 | 195 |         for s in scenarios: | 
| 176 | 196 |             try: | 
| 177 | 197 |                 record = self.run_benchmark(s) | 
| 178 | 198 |                 if record: | 
| 179 |  | -                    records.append(record) | 
|  | 199 | +                    record_queue.put(record) | 
| 180 | 200 |                 else: | 
| 181 | 201 |                     logger.info(f"Record empty from scenario: {s.name}.") | 
| 182 | 202 |             except Exception as e: | 
| 183 | 203 |                 logger.info(f"Running scenario ({s.name}) led to error:\n{e}") | 
| 184 |  | -        df = pd.DataFrame.from_records([r for r in records if r]) | 
| 185 |  | -        df.to_csv(filename, index=False) | 
|  | 204 | +        record_queue.put(stop_signal) | 
| 186 | 205 |         logger.info(f"Results serialized to {filename=}.") | 
| 187 | 206 | 
 | 
| 188 | 207 |     def _run_phase( | 
|  | 
0 commit comments