@@ -138,10 +138,36 @@ def __init__(self, db, dry_run=False):
138138
139139 def generate_batch (self , data ):
140140 """
141- Here we flatten the individual queries to run them as
142- a single query in a batch
141+ Here we flatten the individual queries to run them as
142+ a single query in a batch
143+ We also update the _ref values and connections refs.
143144 """
144- q = [cmd for query in data for cmd in query [0 ]]
145+ def update_refs (batched_commands ):
146+ updates = {}
147+ for i , cmd in enumerate (batched_commands ):
148+ if isinstance (cmd , list ):
149+ # Only pralllel queries will work.
150+ break
151+ values = cmd [list (cmd .keys ())[0 ]]
152+ if "_ref" in values :
153+ updates [values ["_ref" ]] = i + 1
154+ values ["_ref" ] = i + 1
155+ assert values ["_ref" ] < 100000
156+ if "image_ref" in values :
157+ values ["image_ref" ] = updates [values ["image_ref" ]]
158+ if "video_ref" in values :
159+ values ["video_ref" ] = updates [values ["video_ref" ]]
160+ if "is_connected_to" in values and "_ref" in values ["is_connected_to" ]:
161+ values ["is_connected_to" ]["_ref" ] = updates [values ["is_connected_to" ]["_ref" ]]
162+ if "connect" in values and "ref" in values ["connect" ]:
163+ values ["connect" ]["ref" ] = updates [values ["connect" ]["ref" ]]
164+ if "src" in values :
165+ values ["src" ] = updates [values ["src" ]]
166+ if "dst" in values :
167+ values ["dst" ] = updates [values ["dst" ]]
168+ return batched_commands
169+
170+ q = update_refs ([cmd for query in data for cmd in query [0 ]])
145171 blobs = [blob for query in data for blob in query [1 ]]
146172
147173 return q , blobs
@@ -234,6 +260,7 @@ def worker(self, thid, generator, start, end):
234260 if (end - start ) % self .batchsize > 0 :
235261 total_batches += 1
236262
263+ logger .info (f"Worker { thid } executing { total_batches } batches" )
237264 for i in range (total_batches ):
238265
239266 batch_start = start + i * self .batchsize
@@ -306,8 +333,7 @@ def query(self, generator, batchsize=1, numthreads=4, stats=False):
306333 # if len(generator[0]) > 0:
307334 #
308335 # Not applicable to old style loaders.
309- self .commands_per_query = min (
310- len (generator [0 ][0 ]), batchsize )
336+ self .commands_per_query = len (generator [0 ][0 ])
311337 if len (generator [0 ][1 ]):
312338 self .blobs_per_query = len (generator [0 ][1 ])
313339 else :
0 commit comments