@@ -862,26 +862,26 @@ def consolidate_partition_udf(
862862 index_array = tiledb .open (index_array_uri , mode = "r" )
863863 ids_array = tiledb .open (ids_array_uri , mode = "w" )
864864 parts_array = tiledb .open (parts_array_uri , mode = "w" )
865+ read_slices = []
865866 for part in range (partition_id_start , partition_id_end ):
866- logger .info (f"Partition: { part } " )
867- i = 0
868- read_slices = partition_slices [part ]
869-
870- logger .debug (f"Read slices: { read_slices } " )
871- ids = partial_write_array_ids_array .multi_index [read_slices ]["values" ]
872- vectors = partial_write_array_parts_array .multi_index [:, read_slices ]["values" ]
873- start_pos = int (index_array [part ]["values" ])
874- end_pos = int (index_array [part + 1 ]["values" ])
875-
876- logger .debug (
877- f"Ids shape { ids .shape } , expected size: { end_pos - start_pos } expected range:({ start_pos } ,{ end_pos } )" )
878- if ids .shape [0 ] != end_pos - start_pos :
879- raise ValueError ("Incorrect partition size." )
880-
881- logger .info (f"Writing data to array: { parts_array_uri } " )
882- parts_array [:, start_pos :end_pos ] = vectors
883- logger .info (f"Writing data to array: { ids_array_uri } " )
884- ids_array [start_pos :end_pos ] = ids
867+ for partition_slice in partition_slices [part ]:
868+ read_slices .append (partition_slice )
869+
870+ logger .debug (f"Read slices: { read_slices } " )
871+ ids = partial_write_array_ids_array .multi_index [read_slices ]["values" ]
872+ vectors = partial_write_array_parts_array .multi_index [:, read_slices ]["values" ]
873+ start_pos = int (index_array [partition_id_start ]["values" ])
874+ end_pos = int (index_array [partition_id_end ]["values" ])
875+
876+ logger .debug (
877+ f"Ids shape { ids .shape } , expected size: { end_pos - start_pos } expected range:({ start_pos } ,{ end_pos } )" )
878+ if ids .shape [0 ] != end_pos - start_pos :
879+ raise ValueError ("Incorrect partition size." )
880+
881+ logger .info (f"Writing data to array: { parts_array_uri } " )
882+ parts_array [:, start_pos :end_pos ] = vectors
883+ logger .info (f"Writing data to array: { ids_array_uri } " )
884+ ids_array [start_pos :end_pos ] = ids
885885
886886 # --------------------------------------------------------------------
887887 # DAG
0 commit comments