1010import logging
1111from dataflows import (
1212 Flow , printer , filter_rows , add_field , load ,
13- concatenate , set_type , add_computed_field
13+ concatenate , set_type , add_computed_field , parallelize
1414)
1515from datapackage_pipelines_migdar .flows .dump_to_es import es_dumper
1616from datapackage_pipelines_migdar .flows .i18n import split_and_translate , fix_urls
@@ -61,6 +61,7 @@ def one(i):
6161
6262def get_sheets ():
6363 def func (rows ):
64+ total = 0
6465 for row in rows :
6566 print ('Attempting with %r' % row )
6667 wb = load_workbook (row ['filename' ])
@@ -83,11 +84,18 @@ def func(rows):
8384 continue
8485 if i > 3 :
8586 break
87+ migdar_id_col = headers .index ('migdar_id' )
8688 row ['headers' ] = i
89+ j = i + 1
90+ while sheet .cell (row = j , column = migdar_id_col ).value :
91+ j += 1
92+ print ('%s // %s: Found %r ROWS' % (row ['filename' ], sheet_name , j - i - 1 ))
93+ total += j - i - 1
8794 break
8895 if row .get ('headers' ) is not None :
8996 yield row
9097 break
98+ print ('TOTAL ROWS' , total )
9199 return func
92100
93101
@@ -132,7 +140,10 @@ def base_flow():
132140 )),
133141 add_field ('filename' , 'string' ,
134142 default = lambda row : 'pubfiles/{modifiedTime}-{id}.xlsx' .format (** row )),
135- download_files (),
143+ parallelize (
144+ download_files (),
145+ num_processors = 16 ,
146+ ),
136147 add_field ('sheet' , 'string' ),
137148 add_field ('headers' , 'integer' , 1 ),
138149 get_sheets (),
0 commit comments