22from typing import Any , Callable , List , Tuple
33import itertools
44import logging
5+ import math
56
67import numpy as np
78
@@ -28,7 +29,7 @@ def remove_blobs(item: Any) -> Any:
2829 return item
2930
3031
31- def gen_execute_batch_sets (base_executor , per_batch_response_handler : Callable = None ):
32+ def gen_execute_batch_sets (base_executor ):
3233
3334 #
3435 # execute_batch_sets - executes multiple sets of queries with optional constraints on follow on sets
@@ -47,7 +48,7 @@ def gen_execute_batch_sets(base_executor, per_batch_response_handler: Callable =
4748 # execution
4849 #
4950 def execute_batch_sets (query_set , blob_set , db , success_statuses : list [int ] = [0 ],
50- response_handler : Callable = None , commands_per_query : list [int ] = - 1 ,
51+ response_handler : Optional [ Callable ] = None , commands_per_query : list [int ] = - 1 ,
5152 blobs_per_query : list [int ] = - 1 , strict_response_validation : bool = False ):
5253
5354 logger .info ("Execute Batch Sets = Batch Size {0} Comands Per Query {1} Blobs Per Query {2}" .format (
@@ -69,13 +70,21 @@ def execute_batch_sets(query_set, blob_set, db, success_statuses: list[int] = [0
6970 # verify layout if a complex set
7071 if per_set_blobs :
7172 first_element_blobs = blob_set [0 ]
73+
74+ if len (first_element_blobs ) == 0 or len (first_element_blobs ) != set_total :
75+ # user has confused blob format for sure.
76+ logger .error ("Malformed blobs for first element. Blob return from your loader "
77+ "should be [query_blobs] where query_blobs = [ first_cmd_list, second_cmd_list, ... ] " )
78+ raise Exception (
79+ "Malformed blobs input. Expected First element to have a list of blobs for each set." )
80+
7281 first_query_blobs = first_element_blobs [0 ]
7382 # If someone is looking for info logging from PQS, it is likely that blobs are not being set properly.
7483 # The wrapping of blobs in general can be confusing. Best suggestion is looking at a loader.
7584 logger .info ("Blobs for first set = " +
76- str (remove_blobs (blob_set [ 0 ] )))
85+ str (remove_blobs (first_element_blobs )))
7786 logger .info ("First Blob for first set = " +
78- str (remove_blobs (blob_set [ 0 ][ 0 ] )))
87+ str (remove_blobs (first_query_blobs )))
7988 if not isinstance (first_query_blobs , list ):
8089 logger .error (
8190 "Expected a list of lists for the first element's blob sets" )
@@ -111,7 +120,7 @@ def set_blob_filter(all_blobs, strike_list, set_nm):
111120 # the list comprehension pulls out the blob set for the requested set
112121 # the blob set is then flattened as the query expects a flat array using blobs_per_query as the iterator
113122 # the flat list is them zipped with the strike list, which determines which blobs are unused
114- # the filter checks if the blob is to be struc
123+ # the filter checks if the blob is to be struck
115124 # the map pulls the remaining blobs out
116125
117126 return list (map (lambda pair : pair [0 ],
@@ -155,9 +164,9 @@ def first_only_blobs(all_blobs, strike_list, set_nm):
155164
156165 # allowed layouts for commands other than the seed command
157166 # { "cmd" : {} } -> standard single command
158- # [{ "cmd1": {}, "cmd2} : {}] -> standard multiple command
159- # [{ "constraint" : { } , { "cmd" : {} }] -> constraint with a single command
160- # [{ " constraints: { } , [{"cmd1" : {} }, {"cmd2": {} }]] -> constraint with multiple command
167+ # [{ "cmd1": {} },{ "cmd2" : {} }] -> standard multiple command
168+ # [{ constraints } , { "cmd" : {} }] -> constraint with a single command
169+ # [{ constraints } , [{"cmd1" : {} }, {"cmd2": {} }]] -> constraint with multiple command
161170
162171 known_constraint_keys = ["results" , "apply" ]
163172 constraints = None
@@ -202,6 +211,10 @@ def constraint_filter(single_line, single_results):
202211 passed_all_constraints = True
203212 for result_number in result_constraints :
204213
214+ if not isinstance (result_number , int ):
215+ raise Exception ("Keys for result constraints must be numbers: "
216+ f"{ result_number } is { type (result_number )} " )
217+
205218 if len (single_results ) < result_number or single_results [result_number ] is None :
206219 # in theory here we have two possibilities: a user can have a correctly formed constraint which didn't execute by design
207220 # ( which is what process here )
@@ -278,17 +291,24 @@ def constraint_filter(single_line, single_results):
278291 blob_strike_list = list (map (lambda q : q is None , queries ))
279292
280293 # filter out struck blobs
281- used_blobs = filter (lambda b : b is not None ,
282- blob_filter (blob_set , blob_strike_list , i ))
294+ used_blobs = list ( filter (lambda b : b is not None ,
295+ blob_filter (blob_set , blob_strike_list , i ) ))
283296
284- # TODO: add wrapped response_handler.
285- if response_handler != None :
286- logger .warning (
287- "ParallelQuerySet does not yet support a response_handler which will identify which set is being worked on" )
288297 if len (executable_queries ) > 0 :
289298 result_code , db_results , db_blobs = base_executor (executable_queries , used_blobs ,
290299 db , local_success_statuses ,
291300 None , commands_per_query [i ], blobs_per_query [i ], strict_response_validation = strict_response_validation )
301+ if response_handler != None and db .last_query_ok ():
302+ def map_to_set (query , query_blobs , resp , resp_blobs ):
303+ response_handler (
304+ i , query , query_blobs , resp , resp_blobs )
305+ try :
306+ ParallelQuery .map_response_to_handler (map_to_set ,
307+ executable_queries , used_blobs , db_results , db_blobs , commands_per_query [i ], blobs_per_query [i ])
308+ except BaseException as e :
309+ logger .exception (e )
310+ if strict_response_validation :
311+ raise e
292312 else :
293313 logger .info (
294314 f"Skipped executing set { i } , no executable queries" )
@@ -364,10 +384,8 @@ def do_batch(self, db: Connector, data: List[Tuple[Commands, Blobs]]) -> None:
364384 self .commands_per_query = self .generator .commands_per_query
365385 self .blobs_per_query = self .generator .blobs_per_query
366386 set_response_handler = None
367- if hasattr (self .generator , "set_response_handler" ) and callable (self .generator .set_response_handler ):
368- set_response_handler = self .generator .set_response_handler
369387 self .batch_command = gen_execute_batch_sets (
370- self .base_batch_command , set_response_handler )
388+ self .base_batch_command )
371389
372390 ParallelQuery .do_batch (self , db , data )
373391
@@ -388,7 +406,7 @@ def print_stats(self) -> None:
388406 else :
389407 mean = np .mean (times )
390408 std = np .std (times )
391- tp = 1 / mean * self .numthreads
409+ tp = 0 if mean == 0 else 1 / mean * self .numthreads
392410
393411 print (f"Avg Query time (s): { mean } " )
394412 print (f"Query time std: { std } " )
0 commit comments