88import multiprocessing
99import os
1010import pickle
11- import queue
1211import re
1312import shutil
1413import subprocess
1817import yaml
1918from multiprocessing import Lock , Process , Value
2019from multiprocessing .managers import BaseManager
21- from typing import List
20+ from collections import deque
21+ from typing import List , Dict
2222from packaging import version
2323
2424from colorama import Fore
@@ -538,7 +538,7 @@ def __init__(self, instance: TestInstance, env: TwisterEnv, jobserver, **kwargs)
538538 super ().__init__ (instance .testsuite , instance .platform , instance .testsuite .source_dir , instance .build_dir , jobserver )
539539
540540 self .log = "build.log"
541- self .instance = instance
541+ self .instance : TestInstance = instance
542542 self .filtered_tests = 0
543543 self .options = env .options
544544 self .env = env
@@ -601,20 +601,19 @@ def log_info_file(self, inline_logs):
601601 else :
602602 self .log_info ("{}" .format (b_log ), inline_logs )
603603
604-
605- def _add_to_pipeline (self , pipeline , op : str , additionals : dict = {}):
604+ def _add_to_processing_queue (self , processing_queue : deque , op : str , additionals : dict = {}):
606605 try :
607606 if op :
608607 task = dict ({'op' : op , 'test' : self .instance }, ** additionals )
609- pipeline . put (task )
610- # Only possible RuntimeError source here is a mutation of the pipeline during iteration.
611- # If that happens, we ought to consider the whole pipeline corrupted.
608+ processing_queue . append (task )
609+ # Only possible RuntimeError source here is a mutation of the processing_queue during iteration.
610+ # If that happens, we ought to consider the whole processing_queue corrupted.
612611 except RuntimeError as e :
613612 logger .error (f"RuntimeError: { e } " )
614613 traceback .print_exc ()
615614
616-
617- def process ( self , pipeline , done , message , lock , results ):
615+ def process ( self , processing_queue : deque , ready_instances : Dict [ str , TestInstance ],
616+ message , lock : Lock , results : ExecutionCounter ):
618617 next_op = None
619618 additionals = {}
620619
@@ -646,7 +645,7 @@ def process(self, pipeline, done, message, lock, results):
646645 self .instance .add_missing_case_status (TwisterStatus .BLOCK , reason )
647646 next_op = 'report'
648647 finally :
649- self ._add_to_pipeline ( pipeline , next_op )
648+ self ._add_to_processing_queue ( processing_queue , next_op )
650649
651650 # The build process, call cmake and build with configured generator
652651 elif op == "cmake" :
@@ -677,7 +676,7 @@ def process(self, pipeline, done, message, lock, results):
677676 self .instance .add_missing_case_status (TwisterStatus .BLOCK , reason )
678677 next_op = 'report'
679678 finally :
680- self ._add_to_pipeline ( pipeline , next_op )
679+ self ._add_to_processing_queue ( processing_queue , next_op )
681680
682681 elif op == "build" :
683682 try :
@@ -718,7 +717,7 @@ def process(self, pipeline, done, message, lock, results):
718717 self .instance .add_missing_case_status (TwisterStatus .BLOCK , reason )
719718 next_op = 'report'
720719 finally :
721- self ._add_to_pipeline ( pipeline , next_op )
720+ self ._add_to_processing_queue ( processing_queue , next_op )
722721
723722 elif op == "gather_metrics" :
724723 try :
@@ -739,7 +738,7 @@ def process(self, pipeline, done, message, lock, results):
739738 self .instance .add_missing_case_status (TwisterStatus .BLOCK , reason )
740739 next_op = 'report'
741740 finally :
742- self ._add_to_pipeline ( pipeline , next_op )
741+ self ._add_to_processing_queue ( processing_queue , next_op )
743742
744743 # Run the generated binary using one of the supported handlers
745744 elif op == "run" :
@@ -766,13 +765,13 @@ def process(self, pipeline, done, message, lock, results):
766765 next_op = 'report'
767766 additionals = {}
768767 finally :
769- self ._add_to_pipeline ( pipeline , next_op , additionals )
768+ self ._add_to_processing_queue ( processing_queue , next_op , additionals )
770769
771770 # Report results and output progress to screen
772771 elif op == "report" :
773772 try :
774773 with lock :
775- done . put ( self .instance )
774+ ready_instances . update ({ self .instance . name : self . instance } )
776775 self .report_out (results )
777776
778777 if not self .options .coverage :
@@ -794,7 +793,7 @@ def process(self, pipeline, done, message, lock, results):
794793 next_op = None
795794 additionals = {}
796795 finally :
797- self ._add_to_pipeline ( pipeline , next_op , additionals )
796+ self ._add_to_processing_queue ( processing_queue , next_op , additionals )
798797
799798 elif op == "cleanup" :
800799 try :
@@ -1279,11 +1278,10 @@ def calc_size(instance: TestInstance, from_buildlog: bool):
12791278class TwisterRunner :
12801279
12811280 def __init__ (self , instances , suites , env = None ) -> None :
1282- self .pipeline = None
12831281 self .options = env .options
12841282 self .env = env
1285- self .instances = instances
1286- self .suites = suites
1283+ self .instances : Dict [ str , TestInstance ] = instances
1284+ self .suites : Dict [ str , TestSuite ] = suites
12871285 self .duts = None
12881286 self .jobs = 1
12891287 self .results = None
@@ -1293,14 +1291,15 @@ def run(self):
12931291
12941292 retries = self .options .retry_failed + 1
12951293
1296- BaseManager .register ('LifoQueue' , queue .LifoQueue )
1294+ BaseManager .register ('deque' , deque , exposed = ['append' , 'appendleft' , 'pop' ])
1295+ BaseManager .register ('get_dict' , dict )
12971296 manager = BaseManager ()
12981297 manager .start ()
12991298
13001299 self .results = ExecutionCounter (total = len (self .instances ))
13011300 self .iteration = 0
1302- pipeline = manager .LifoQueue ()
1303- done_queue = manager .LifoQueue ()
1301+ processing_queue : deque = manager .deque ()
1302+ ready_instances : Dict [ str , TestInstance ] = manager .get_dict ()
13041303
13051304 # Set number of jobs
13061305 if self .options .jobs :
@@ -1338,18 +1337,13 @@ def run(self):
13381337 else :
13391338 self .results .done = self .results .skipped_filter
13401339
1341- self .execute (pipeline , done_queue )
1340+ self .execute (processing_queue , ready_instances )
13421341
1343- while True :
1344- try :
1345- inst = done_queue .get_nowait ()
1346- except queue .Empty :
1347- break
1348- else :
1349- inst .metrics .update (self .instances [inst .name ].metrics )
1350- inst .metrics ["handler_time" ] = inst .execution_time
1351- inst .metrics ["unrecognized" ] = []
1352- self .instances [inst .name ] = inst
1342+ for inst in ready_instances .values ():
1343+ inst .metrics .update (self .instances [inst .name ].metrics )
1344+ inst .metrics ["handler_time" ] = inst .execution_time
1345+ inst .metrics ["unrecognized" ] = []
1346+ self .instances [inst .name ] = inst
13531347
13541348 print ("" )
13551349
@@ -1386,7 +1380,8 @@ def show_brief(self):
13861380 self .results .skipped_filter ,
13871381 self .results .skipped_configs - self .results .skipped_filter ))
13881382
1389- def add_tasks_to_queue (self , pipeline , build_only = False , test_only = False , retry_build_errors = False ):
1383+ def add_tasks_to_queue (self , processing_queue : deque , build_only = False ,
1384+ test_only = False , retry_build_errors = False ):
13901385 for instance in self .instances .values ():
13911386 if build_only :
13921387 instance .run = False
@@ -1406,61 +1401,115 @@ def add_tasks_to_queue(self, pipeline, build_only=False, test_only=False, retry_
14061401 if instance .testsuite .filter :
14071402 instance .filter_stages = self .get_cmake_filter_stages (instance .testsuite .filter , expr_parser .reserved .keys ())
14081403
1409- if test_only and instance .run :
1410- pipeline . put ({"op" : "run" , "test" : instance })
1404+ if ( test_only and instance .run ) or instance . no_own_image :
1405+ processing_queue . append ({"op" : "run" , "test" : instance })
14111406 elif instance .filter_stages and "full" not in instance .filter_stages :
1412- pipeline . put ({"op" : "filter" , "test" : instance })
1407+ processing_queue . append ({"op" : "filter" , "test" : instance })
14131408 else :
14141409 cache_file = os .path .join (instance .build_dir , "CMakeCache.txt" )
14151410 if os .path .exists (cache_file ) and self .env .options .aggressive_no_clean :
1416- pipeline . put ({"op" : "build" , "test" : instance })
1411+ processing_queue . append ({"op" : "build" , "test" : instance })
14171412 else :
1418- pipeline .put ({"op" : "cmake" , "test" : instance })
1413+ processing_queue .append ({"op" : "cmake" , "test" : instance })
1414+
1415+ def _required_images_are_ready (self , instance : TestInstance , ready_instances : Dict [str , TestInstance ]):
1416+ return all ([required_image in ready_instances .keys () for required_image in instance .required_images ])
1417+
1418+ def _required_images_failed (self , instance : TestInstance , ready_instances : Dict [str , TestInstance ]):
1419+ # Verify that all required images were successfully built
1420+ found_failed_image = False
1421+ for required_image in instance .required_images :
1422+ inst = ready_instances .get (required_image )
1423+ if inst .status != TwisterStatus .PASS :
1424+ logger .debug (f"{ required_image } : Required image failed: { inst .reason } " )
1425+ found_failed_image = True
1426+ return found_failed_image
1427+
1428+ def required_images_processed (self , instance : TestInstance , processing_queue : deque ,
1429+ ready_instances : Dict [str , TestInstance ], task ):
1430+ if not instance .required_images :
1431+ return True
1432+
1433+ if not self ._required_images_are_ready (instance , ready_instances ):
1434+ # required image not ready yet,
1435+ # add the task back to the pipeline to process it later
1436+ if self .jobs > 1 :
1437+ # to avoid busy waiting
1438+ time .sleep (1 )
1439+ processing_queue .appendleft (task )
1440+ return False
1441+
1442+ if self ._required_images_failed (instance , ready_instances ):
1443+ instance .status = TwisterStatus .SKIP
1444+ for tc in instance .testcases :
1445+ tc .status = TwisterStatus .SKIP
1446+ instance .reason = "Required image failed"
1447+ instance .required_images = []
1448+ processing_queue .append ({"op" : "report" , "test" : instance })
1449+ return False
1450+
1451+ if instance .no_own_image :
1452+ # copy build_dir from the first required image to the current instance
1453+ origin_build_dir = self .instances [instance .required_images [0 ]].build_dir
1454+ shutil .copytree (origin_build_dir , instance .build_dir , dirs_exist_ok = True )
1455+ logger .debug (f"Copied build_dir from { origin_build_dir } " )
1456+ if not instance .run or self .options .build_only :
1457+ instance .status = TwisterStatus .SKIP
1458+ instance .reason = "not run"
1459+ instance .required_images = []
1460+ processing_queue .append ({"op" : "report" , "test" : instance })
1461+ return False
1462+
1463+ # keep paths to required build dirs for further processing
1464+ for required_image in instance .required_images :
1465+ instance .required_build_dirs .append (self .instances [required_image ].build_dir )
1466+
1467+ # required images are ready, clear to not process them later
1468+ instance .required_images = []
1469+ return True
1470+
1471+ def _pipeline_mgr (self , processing_queue : deque , ready_instances : Dict [str , TestInstance ],
1472+ lock : Lock , results : ExecutionCounter ):
1473+ while True :
1474+ try :
1475+ task = processing_queue .pop ()
1476+ except IndexError :
1477+ break
1478+ else :
1479+ instance : TestInstance = task ['test' ]
1480+
1481+ if not self .required_images_processed (instance , processing_queue , ready_instances , task ):
1482+ # postpone processing task if required images are not ready
1483+ continue
14191484
1485+ pb = ProjectBuilder (instance , self .env , self .jobserver )
1486+ pb .duts = self .duts
1487+ pb .process (processing_queue , ready_instances , task , lock , results )
1488+ return True
14201489
1421- def pipeline_mgr (self , pipeline , done_queue , lock , results ):
1490+ def pipeline_mgr (self , processing_queue : deque , ready_instances : Dict [str , TestInstance ],
1491+ lock : Lock , results : ExecutionCounter ):
14221492 try :
14231493 if sys .platform == 'linux' :
14241494 with self .jobserver .get_job ():
1425- while True :
1426- try :
1427- task = pipeline .get_nowait ()
1428- except queue .Empty :
1429- break
1430- else :
1431- instance = task ['test' ]
1432- pb = ProjectBuilder (instance , self .env , self .jobserver )
1433- pb .duts = self .duts
1434- pb .process (pipeline , done_queue , task , lock , results )
1435-
1436- return True
1495+ return self ._pipeline_mgr (processing_queue , ready_instances , lock , results )
14371496 else :
1438- while True :
1439- try :
1440- task = pipeline .get_nowait ()
1441- except queue .Empty :
1442- break
1443- else :
1444- instance = task ['test' ]
1445- pb = ProjectBuilder (instance , self .env , self .jobserver )
1446- pb .duts = self .duts
1447- pb .process (pipeline , done_queue , task , lock , results )
1448- return True
1497+ return self ._pipeline_mgr (processing_queue , ready_instances , lock , results )
14491498 except Exception as e :
1450- logger .error (f"General exception: { e } " )
1499+ logger .exception (f"General exception: { e } " )
14511500 sys .exit (1 )
14521501
1453- def execute (self , pipeline , done ):
1502+ def execute (self , processing_queue : deque , ready_instances : Dict [ str , TestInstance ] ):
14541503 lock = Lock ()
14551504 logger .info ("Adding tasks to the queue..." )
1456- self .add_tasks_to_queue (pipeline , self .options .build_only , self .options .test_only ,
1505+ self .add_tasks_to_queue (processing_queue , self .options .build_only , self .options .test_only ,
14571506 retry_build_errors = self .options .retry_build_errors )
14581507 logger .info ("Added initial list of jobs to queue" )
14591508
14601509 processes = []
14611510
14621511 for _ in range (self .jobs ):
1463- p = Process (target = self .pipeline_mgr , args = (pipeline , done , lock , self .results , ))
1512+ p = Process (target = self .pipeline_mgr , args = (processing_queue , ready_instances , lock , self .results , ))
14641513 processes .append (p )
14651514 p .start ()
14661515 logger .debug (f"Launched { self .jobs } jobs" )
0 commit comments