1- from collections import defaultdict
21from logging import Logger
32import re
43import time
54from abc import ABC , abstractmethod
6- from typing import Any , Generator , Iterable , Mapping , TypedDict
5+ from typing import Any , Generator , Iterable , TypedDict
6+ from concurrent .futures import ThreadPoolExecutor , as_completed
77
88from rptest .clients .kubectl import KubectlTool
99
@@ -62,7 +62,7 @@ def elapsed(self) -> float:
6262 return time .time () - self ._start
6363
6464 def elapseds (self ) -> str :
65- return f"{ self .elapsed :.2f } s"
65+ return f"{ self .elapsed :.3f } s"
6666
6767 def elapsedf (self , note : str ) -> str :
6868 return f"{ note } : { self .elapseds ()} "
@@ -150,7 +150,7 @@ def __init__(
150150 )
151151
152152 # Prepare matching terms
153- self .match_terms : list [str ] = self .DEFAULT_MATCH_TERMS
153+ self .match_terms : list [str ] = list ( self .DEFAULT_MATCH_TERMS )
154154 if self ._raise_on_errors :
155155 self .match_terms .append ("^ERROR" )
156156 self .match_expr : str = " " .join (f'-e "{ t } "' for t in self .match_terms )
@@ -203,20 +203,18 @@ def _check_oversized_allocations(self, line: str) -> bool:
203203 return False
204204
205205 def _search (self , versioned_nodes : VersionedNodes ) -> NodeToLines :
206- def make_vl () -> VersionAndLines :
207- return {"version" : None , "lines" : []}
208-
209- bad_lines : defaultdict [ClusterNode | CloudBroker , VersionAndLines ] = (
210- defaultdict (make_vl )
211- )
212206 test_name = self ._context .function_name
213- sw = Stopwatch ()
214- for version , node in versioned_nodes :
215- sw .start ()
207+ overall_sw = Stopwatch ()
208+ overall_sw .start ()
209+
210+ def scan_one (
211+ version : str | None , node : ClusterNode | CloudBroker
212+ ) -> tuple [ClusterNode | CloudBroker , VersionAndLines ]:
213+ node_sw = Stopwatch ()
214+ node_sw .start ()
216215 hostname = self ._get_hostname (node )
217216 self .logger .info (f"Scanning node { hostname } log for errors..." )
218- # Prepare/Build capture func shortcut
219- # Iterate
217+ vl : VersionAndLines = {"version" : version , "lines" : []}
220218 for line in self ._capture_log (node , self .match_expr ):
221219 line = line .strip ()
222220 # Check if this line holds error
@@ -229,15 +227,27 @@ def make_vl() -> VersionAndLines:
229227 allowed = self ._check_oversized_allocations (line )
230228 # If detected bad lines, log it and add to the list
231229 if not allowed :
232- bad_lines [node ]["version" ] = version
233- bad_lines [node ]["lines" ].append (line )
230+ vl ["lines" ].append (line )
234231 self .logger .warning (
235232 f"[{ test_name } ] Unexpected log line on { hostname } : { line } "
236233 )
237234 self .logger .info (
238- sw .elapsedf (f"##### Time spent to scan bad logs on '{ hostname } '" )
235+ node_sw .elapsedf (f"Time spent to scan bad logs on '{ hostname } '" )
239236 )
240- return dict (bad_lines )
237+ return node , vl
238+
239+ bad_lines : NodeToLines = {}
240+
241+ # Run scans in parallel
242+ with ThreadPoolExecutor () as executor :
243+ futures = [executor .submit (scan_one , v , n ) for v , n in versioned_nodes ]
244+ for fut in as_completed (futures ):
245+ node , vl = fut .result ()
246+ if vl ["lines" ]:
247+ bad_lines [node ] = vl
248+
249+ self .logger .info (overall_sw .elapsedf ("Time spent to scan bad logs overall" ))
250+ return bad_lines
241251
242252 def search_logs (self , versioned_nodes : VersionedNodes ) -> None :
243253 """
0 commit comments