1- from collections import defaultdict
21from logging import Logger
32import re
43import time
54from abc import ABC , abstractmethod
6- from typing import Any , Generator , Iterable , Mapping , TypedDict
5+ from typing import Any , Generator , Iterable , TypedDict
6+ from concurrent .futures import ThreadPoolExecutor , as_completed
77
88from rptest .clients .kubectl import KubectlTool
99
@@ -62,7 +62,7 @@ def elapsed(self) -> float:
6262 return time .time () - self ._start
6363
6464 def elapseds (self ) -> str :
65- return f"{ self .elapsed :.2f } s"
65+ return f"{ self .elapsed :.3f } s"
6666
6767 def elapsedf (self , note : str ) -> str :
6868 return f"{ note } : { self .elapseds ()} "
@@ -150,7 +150,7 @@ def __init__(
150150 )
151151
152152 # Prepare matching terms
153- self .match_terms : list [str ] = self .DEFAULT_MATCH_TERMS
153+ self .match_terms : list [str ] = list ( self .DEFAULT_MATCH_TERMS )
154154 if self ._raise_on_errors :
155155 self .match_terms .append ("^ERROR" )
156156 self .match_expr : str = " " .join (f'-e "{ t } "' for t in self .match_terms )
@@ -200,20 +200,18 @@ def _check_oversized_allocations(self, line: str) -> bool:
200200 return False
201201
202202 def _search (self , versioned_nodes : VersionedNodes ) -> NodeToLines :
203- def make_vl () -> VersionAndLines :
204- return {"version" : None , "lines" : []}
205-
206- bad_lines : defaultdict [ClusterNode | CloudBroker , VersionAndLines ] = (
207- defaultdict (make_vl )
208- )
209203 test_name = self ._context .function_name
210- sw = Stopwatch ()
211- for version , node in versioned_nodes :
212- sw .start ()
204+ overall_sw = Stopwatch ()
205+ overall_sw .start ()
206+
207+ def scan_one (
208+ version : str | None , node : ClusterNode | CloudBroker
209+ ) -> tuple [ClusterNode | CloudBroker , VersionAndLines ]:
210+ node_sw = Stopwatch ()
211+ node_sw .start ()
213212 hostname = self ._get_hostname (node )
214213 self .logger .info (f"Scanning node { hostname } log for errors..." )
215- # Prepare/Build capture func shortcut
216- # Iterate
214+ vl : VersionAndLines = {"version" : version , "lines" : []}
217215 for line in self ._capture_log (node , self .match_expr ):
218216 line = line .strip ()
219217 # Check if this line holds error
@@ -226,15 +224,27 @@ def make_vl() -> VersionAndLines:
226224 allowed = self ._check_oversized_allocations (line )
227225 # If detected bad lines, log it and add to the list
228226 if not allowed :
229- bad_lines [node ]["version" ] = version
230- bad_lines [node ]["lines" ].append (line )
227+ vl ["lines" ].append (line )
231228 self .logger .warning (
232229 f"[{ test_name } ] Unexpected log line on { hostname } : { line } "
233230 )
234231 self .logger .info (
235- sw .elapsedf (f"##### Time spent to scan bad logs on '{ hostname } '" )
232+ node_sw .elapsedf (f"Time spent to scan bad logs on '{ hostname } '" )
236233 )
237- return dict (bad_lines )
234+ return node , vl
235+
236+ bad_lines : NodeToLines = {}
237+
238+ # Run scans in parallel
239+ with ThreadPoolExecutor () as executor :
240+ futures = [executor .submit (scan_one , v , n ) for v , n in versioned_nodes ]
241+ for fut in as_completed (futures ):
242+ node , vl = fut .result ()
243+ if vl ["lines" ]:
244+ bad_lines [node ] = vl
245+
246+ self .logger .info (overall_sw .elapsedf ("Time spent to scan bad logs overall" ))
247+ return bad_lines
238248
239249 def search_logs (self , versioned_nodes : VersionedNodes ) -> None :
240250 """
0 commit comments