Skip to content

Commit 915d25f

Browse files
committed
fix: prevent potential MEA hang when too many traces are processed
If a verification result produces a large number of error traces, the per-process MEA resource queue may overflow, causing the entire process to hang.
1 parent 7f668f1 commit 915d25f

File tree

1 file changed

+25
-17
lines changed

1 file changed

+25
-17
lines changed

cv/components/mea.py

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
TAG_UNZIP = "unzip"
4242
TAG_DRY_RUN = "dry run"
4343
TAG_SOURCE_DIR = "source dir"
44+
TAG_PROCESSED_TRACE = "cet"
4445

4546
DO_NOT_FILTER = "do not filter"
4647

@@ -106,8 +107,9 @@ def filter(self) -> list:
106107

107108
start_time = time.time()
108109
process_pool = []
110+
memory_usage_all = []
109111
queue = multiprocessing.Queue()
110-
converted_error_traces = multiprocessing.Manager().dict()
112+
converted_error_traces = {}
111113
for i in range(self.parallel_processes):
112114
process_pool.append(None)
113115
for error_trace_file in self.error_traces:
@@ -121,10 +123,12 @@ def filter(self) -> list:
121123
process_pool[i] = multiprocessing.Process(target=self.__process_trace,
122124
name=error_trace_file,
123125
args=(error_trace_file,
124-
converted_error_traces,
125126
queue))
126127
process_pool[i].start()
127128
raise NestedLoop
129+
else:
130+
if not queue.empty():
131+
self.__drain_processing_queue(queue, converted_error_traces, memory_usage_all)
128132
time.sleep(BUSY_WAITING_INTERVAL)
129133
except NestedLoop:
130134
pass
@@ -133,7 +137,8 @@ def filter(self) -> list:
133137
kill_launches(process_pool)
134138

135139
wait_for_launches(process_pool)
136-
self.__count_resource_usage(queue)
140+
self.__drain_processing_queue(queue, converted_error_traces, memory_usage_all)
141+
self.__compute_max_memory_usage(memory_usage_all)
137142
self.package_processing_time = time.time() - start_time
138143

139144
# Need to sort traces for deterministic results.
@@ -211,22 +216,20 @@ def process_traces_without_filtering(self) -> tuple:
211216
is_exported = False
212217
witness_type = WitnessType.VIOLATION
213218
for error_trace_file in self.error_traces:
214-
converted_error_traces = {}
215-
is_exported, witness_type = self.__process_trace(error_trace_file,
216-
converted_error_traces)
219+
is_exported, witness_type = self.__process_trace(error_trace_file)
217220
if is_exported:
218221
self.__print_trace_archive(error_trace_file, witness_type)
219222
self.memory = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) * 1024
220223
return is_exported, witness_type
221224

222-
def __process_trace(self, error_trace_file: str, converted_error_traces: dict,
223-
queue: multiprocessing.Queue = None):
225+
def __process_trace(self, error_trace_file: str, queue: multiprocessing.Queue = None):
224226
# TODO: if we receive several witnesses they are considered to be violation witnesses only.
225227
if queue and not self.is_standalone:
226228
supported_types = {WitnessType.VIOLATION}
227229
else:
228230
supported_types = {WitnessType.VIOLATION, WitnessType.CORRECTNESS}
229231
parsed_error_trace = self.__parse_trace(error_trace_file, supported_types)
232+
converted_error_trace = []
230233
if parsed_error_trace:
231234
self.__process_parsed_trace(parsed_error_trace)
232235
if self.clean:
@@ -241,13 +244,12 @@ def __process_trace(self, error_trace_file: str, converted_error_traces: dict,
241244
self.conversion_function_args)
242245
self.__print_parsed_error_trace(parsed_error_trace, converted_error_trace,
243246
error_trace_file)
244-
converted_error_traces[error_trace_file] = converted_error_trace
245-
246247
if queue:
247248
user_time, system_time, memory = resource.getrusage(resource.RUSAGE_SELF)[0:3]
248249
queue.put({
249250
Resource.CPU_TIME: float(user_time + system_time),
250-
Resource.MEMORY_USAGE: int(memory) * 1024
251+
Resource.MEMORY_USAGE: int(memory) * 1024,
252+
TAG_PROCESSED_TRACE: [error_trace_file, converted_error_trace]
251253
})
252254
sys.exit(0)
253255
else:
@@ -401,13 +403,19 @@ def __get_aux_file_names(error_trace_file: str) -> tuple:
401403
converted_traces_files = common_part + CONVERTED_ERROR_TRACES
402404
return json_trace_name, source_files, converted_traces_files
403405

404-
def __count_resource_usage(self, queue: multiprocessing.Queue):
405-
memory_usage_all = []
406-
children_memory = 0
406+
def __drain_processing_queue(self, queue: multiprocessing.Queue,
407+
converted_error_traces: dict,
408+
memory_usage_all: list):
407409
while not queue.empty():
408-
resources = queue.get()
409-
memory_usage_all.append(resources.get(Resource.MEMORY_USAGE, 0))
410-
self.cpu_time += resources.get(Resource.CPU_TIME, 0.0)
410+
proc_result = queue.get()
411+
memory_usage_all.append(proc_result.get(Resource.MEMORY_USAGE, 0))
412+
self.cpu_time += proc_result.get(Resource.CPU_TIME, 0.0)
413+
cet = proc_result.get(TAG_PROCESSED_TRACE, [])
414+
if cet and len(cet) == 2 and cet[0] and cet[1]:
415+
converted_error_traces[cet[0]] = cet[1]
416+
417+
def __compute_max_memory_usage(self, memory_usage_all: list):
418+
children_memory = 0
411419
for memory_usage in sorted(memory_usage_all)[:self.parallel_processes]:
412420
children_memory += memory_usage
413421
process_memory = int(resource.getrusage(resource.RUSAGE_SELF).ru_maxrss) * 1024

0 commit comments

Comments
 (0)