22import itertools
33import logging
44import sys
5+ import time
56import typing
67import random
78import weakref
2122from ..utils .deprecated import deprecated
2223from ..utils .enums import StateLists , MProcessingType
2324from ..utils .event import Eventful
24- from ..utils .helpers import PickleSerializer , pretty_print_state_descriptors
25+ from ..utils .helpers import PickleSerializer , pretty_print_state_descriptors , deque
2526from ..utils .log import set_verbosity
2627from ..utils .nointerrupt import WithKeyboardInterruptAs
2728from .workspace import Workspace , Testcase
28- from .worker import WorkerSingle , WorkerThread , WorkerProcess , DaemonThread
29+ from .worker import (
30+ WorkerSingle ,
31+ WorkerThread ,
32+ WorkerProcess ,
33+ DaemonThread ,
34+ LogCaptureWorker ,
35+ state_monitor ,
36+ )
2937
3038from multiprocessing .managers import SyncManager
3139import threading
@@ -88,6 +96,7 @@ def wait_for(self, condition, *args, **kwargs):
8896 self ._terminated_states = []
8997 self ._busy_states = []
9098 self ._killed_states = []
99+ self ._log_queue = deque (maxlen = 5000 )
91100 self ._shared_context = {}
92101
93102 def _manticore_threading (self ):
@@ -99,6 +108,7 @@ def _manticore_threading(self):
99108 self ._terminated_states = []
100109 self ._busy_states = []
101110 self ._killed_states = []
111+ self ._log_queue = deque (maxlen = 5000 )
102112 self ._shared_context = {}
103113
104114 def _manticore_multiprocessing (self ):
@@ -120,6 +130,9 @@ def raise_signal():
120130 self ._terminated_states = self ._manager .list ()
121131 self ._busy_states = self ._manager .list ()
122132 self ._killed_states = self ._manager .list ()
133+ # The multiprocessing queue is much slower than the deque when it gets full, so we
134+ # triple the size in order to prevent that from happening.
135+ self ._log_queue = self ._manager .Queue (15000 )
123136 self ._shared_context = self ._manager .dict ()
124137 self ._context_value_types = {list : self ._manager .list , dict : self ._manager .dict }
125138
@@ -370,8 +383,10 @@ def __init__(
370383 # Workers will use manticore __dict__ So lets spawn them last
371384 self ._workers = [self ._worker_type (id = i , manticore = self ) for i in range (consts .procs )]
372385
373- # We won't create the daemons until .run() is called
374- self ._daemon_threads : typing .List [DaemonThread ] = []
386+ # Create log capture worker. We won't create the rest of the daemons until .run() is called
387+ self ._daemon_threads : typing .Dict [int , DaemonThread ] = {
388+ - 1 : LogCaptureWorker (id = - 1 , manticore = self )
389+ }
375390 self ._daemon_callbacks : typing .List [typing .Callable ] = []
376391
377392 self ._snapshot = None
@@ -1102,21 +1117,27 @@ def run(self):
11021117 # User subscription to events is disabled from now on
11031118 self .subscribe = None
11041119
1120+ self .register_daemon (state_monitor )
1121+ self ._daemon_threads [- 1 ].start () # Start log capture worker
1122+
11051123 # Passing generators to callbacks is a bit hairy because the first callback would drain it if we didn't
11061124 # clone the iterator in event.py. We're preserving the old API here, but it's something to avoid in the future.
11071125 self ._publish ("will_run" , self .ready_states )
11081126 self ._running .value = True
1127+
11091128 # start all the workers!
11101129 for w in self ._workers :
11111130 w .start ()
11121131
11131132 # Create each daemon thread and pass it `self`
1114- if not self ._daemon_threads : # Don't recreate the threads if we call run multiple times
1115- for i , cb in enumerate (self ._daemon_callbacks ):
1133+ for i , cb in enumerate (self ._daemon_callbacks ):
1134+ if (
1135+ i not in self ._daemon_threads
1136+ ): # Don't recreate the threads if we call run multiple times
11161137 dt = DaemonThread (
11171138 id = i , manticore = self
11181139 ) # Potentially duplicated ids with workers. Don't mix!
1119- self ._daemon_threads . append ( dt )
1140+ self ._daemon_threads [ dt . id ] = dt
11201141 dt .start (cb )
11211142
11221143 # Main process. Lets just wait and capture CTRL+C at main
@@ -1173,6 +1194,17 @@ def finalize(self):
11731194 self .generate_testcase (state )
11741195 self .remove_all ()
11751196
1197+ def wait_for_log_purge (self ):
1198+ """
1199+ If a client has accessed the log server, and there are still buffered logs,
1200+ waits up to 2 seconds for the client to retrieve the logs.
1201+ """
1202+ if self ._daemon_threads [- 1 ].activated :
1203+ for _ in range (8 ):
1204+ if self ._log_queue .empty ():
1205+ break
1206+ time .sleep (0.25 )
1207+
11761208 ############################################################################
11771209 ############################################################################
11781210 ############################################################################
@@ -1188,6 +1220,7 @@ def save_run_data(self):
11881220 config .save (f )
11891221
11901222 logger .info ("Results in %s" , self ._output .store .uri )
1223+ self .wait_for_log_purge ()
11911224
11921225 def introspect (self ) -> typing .Dict [int , StateDescriptor ]:
11931226 """
0 commit comments