55import json
66import logging
77import math
8+ import inspect
89
910
1011from aperturedb .DaskManager import DaskManager
1718def execute_batch (q : Commands , blobs : Blobs , db : Connector ,
1819 success_statuses : list [int ] = [0 ],
1920 response_handler : Optional [Callable ] = None , commands_per_query : int = 1 , blobs_per_query : int = 0 ,
20- strict_response_validation : bool = False ) -> Tuple [int , CommandResponses , Blobs ]:
21+ strict_response_validation : bool = False , cmd_index = None ) -> Tuple [int , CommandResponses , Blobs ]:
2122 """
2223 Execute a batch of queries, doing useful logging around it.
2324 Calls the response handler if provided.
@@ -50,7 +51,8 @@ def execute_batch(q: Commands, blobs: Blobs, db: Connector,
5051 if response_handler is not None :
5152 try :
5253 ParallelQuery .map_response_to_handler (response_handler ,
53- q , blobs , r , b , commands_per_query , blobs_per_query )
54+ q , blobs , r , b , commands_per_query , blobs_per_query ,
55+ cmd_index )
5456 except BaseException as e :
5557 logger .exception (e )
5658 if strict_response_validation :
@@ -112,7 +114,7 @@ def getSuccessStatus(cls):
112114
113115 @classmethod
114116 def map_response_to_handler (cls , handler , query , query_blobs , response , response_blobs ,
115- commands_per_query , blobs_per_query ):
117+ commands_per_query , blobs_per_query , cmd_index_offset ):
116118 # We could potentially always call this handler function
117119 # and let the user deal with the error cases.
118120 blobs_returned = 0
@@ -140,7 +142,8 @@ def map_response_to_handler(cls, handler, query, query_blobs, response, respons
140142 response [start :end ] if issubclass (
141143 type (response ), list ) else response ,
142144 response_blobs [blobs_returned :blobs_returned + b_count ] if
143- len (response_blobs ) >= blobs_returned + b_count else None )
145+ len (response_blobs ) >= blobs_returned + b_count else None ,
146+ None if cmd_index_offset is None else cmd_index_offset + i )
144147 blobs_returned += b_count
145148
146149 def __init__ (self , db : Connector , dry_run : bool = False ):
@@ -218,7 +221,7 @@ def call_response_handler(self, q: Commands, blobs: Blobs, r: CommandResponses,
218221 except BaseException as e :
219222 logger .exception (e )
220223
221- def do_batch (self , db : Connector , data : List [Tuple [Commands , Blobs ]]) -> None :
224+ def do_batch (self , db : Connector , batch_start : int , data : List [Tuple [Commands , Blobs ]]) -> None :
222225 """
223226 Executes batch of queries and blobs in the database.
224227
@@ -257,6 +260,19 @@ def process_responses(requests, input_blobs, responses, output_blobs):
257260 response_handler = self .generator .response_handler
258261 if hasattr (self .generator , "strict_response_validation" ) and isinstance (self .generator .strict_response_validation , bool ):
259262 strict_response_validation = self .generator .strict_response_validation
263+
264+ # if response_handler doesn't support index, just discard the index with a wrapper.
265+ if response_handler is not None :
266+ parameter_count = len (inspect .signature (
267+ response_handler ).parameters )
268+ if parameter_count < 4 or parameter_count > 5 :
269+ raise Exception ("Bad Signature for response_handler :"
270+ f"expected 6 > args > 3, got { parameter_count } " )
271+ if parameter_count == 4 :
272+ indexless_handler = response_handler
273+ def response_handler (query , qblobs , resp , rblobs , qindex ): return indexless_handler (
274+ query , qblobs , resp , rblobs )
275+
260276 result , r , b = self .batch_command (
261277 q ,
262278 blobs ,
@@ -265,7 +281,8 @@ def process_responses(requests, input_blobs, responses, output_blobs):
265281 response_handler ,
266282 self .commands_per_query ,
267283 self .blobs_per_query ,
268- strict_response_validation = strict_response_validation )
284+ strict_response_validation = strict_response_validation ,
285+ cmd_index = batch_start )
269286 if result == 0 :
270287 query_time = db .get_last_query_time ()
271288 worker_stats ["succeeded_commands" ] = len (q )
@@ -316,7 +333,8 @@ def worker(self, thid: int, generator, start: int, end: int):
316333 batch_end = min (batch_start + self .batchsize , end )
317334
318335 try :
319- self .do_batch (db , generator [batch_start :batch_end ])
336+ self .do_batch (db , batch_start ,
337+ generator [batch_start :batch_end ])
320338 except Exception as e :
321339 logger .exception (e )
322340 logger .warning (
0 commit comments