@@ -50,27 +50,30 @@ def worker(self, thid, generator, start, end):
5050
5151 db = self .db .create_new_connection ()
5252
53- data_for_query = []
54-
5553 if thid == 0 and self .stats :
5654 pb = ProgressBar .ProgressBar (self .loader_filename )
5755
58- for i in range ( start , end ):
56+ total_batches = ( end - start ) // self . batchsize
5957
60- data_for_query .append (generator [i ])
58+ if (end - start ) % self .batchsize > 0 :
59+ total_batches += 1
6160
62- if len (data_for_query ) >= self .batchsize :
63- self .do_batch (db , data_for_query )
64- data_for_query = []
65- if thid == 0 and self .stats :
66- pb .update ((i - start ) / (end - start ))
61+ for i in range (total_batches ):
6762
68- if len (data_for_query ) > 0 :
69- self .do_batch (db , data_for_query )
70- data_for_query = []
63+ batch_start = start + i * self .batchsize
64+ batch_end = min (batch_start + self .batchsize , end )
7165
72- if thid == 0 and self .stats :
73- pb .update (1 )
66+ try :
67+ # This can be done with slices instead of list comprehension.
68+ # but required support from generator.
69+ data_for_query = [ generator [idx ]
70+ for idx in range (batch_start , batch_end ) ]
71+ self .do_batch (db , data_for_query )
72+ except :
73+ self .error_counter += 1
74+
75+ if thid == 0 and self .stats :
76+ pb .update ((i + 1 ) / total_batches )
7477
7578 def ingest (self , generator , batchsize = 1 , numthreads = 1 , stats = False ):
7679
0 commit comments