3232
3333
3434def format_bytes (size ):
35- for unit in [' Bytes' , 'KB' , 'MB' , 'GB' , 'TB' , 'PB' ]:
35+ for unit in [" Bytes" , "KB" , "MB" , "GB" , "TB" , "PB" ]:
3636 if size < 1024.0 :
3737 return f"{ size :.2f} { unit } "
3838 size /= 1024.0
@@ -123,7 +123,9 @@ def numpy_to_matrix_block(sds, np_arr: np.array):
123123 not sds .multi_pipe_enabled or total_bytes < 2 * min_bytes_per_pipe
124124 )
125125 if use_single_pipe :
126- sds ._log .debug ("Using single FIFO pipe for {}" .format (format_bytes (total_bytes )))
126+ sds ._log .debug (
127+ "Using single FIFO pipe for {}" .format (format_bytes (total_bytes ))
128+ )
127129 pipe_id = 0
128130 pipe = sds .FIFO_PY2JAVA_PIPES [pipe_id ]
129131 fut = sds .executor_pool .submit (
@@ -193,19 +195,18 @@ def matrix_block_to_numpy(sds, mb: JavaObject):
193195
194196 rows = mb .getNumRows ()
195197 cols = mb .getNumColumns ()
198+ try :
199+ if sds .data_transfer_mode == 1 :
200+ dtype = np .float64
196201
197- if sds .data_transfer_mode == 1 :
198- dtype = np .float64
202+ elem_size = np .dtype (dtype ).itemsize
203+ num_elements = rows * cols
204+ total_bytes = num_elements * elem_size
205+ batch_size_bytes = 32 * 1024 # 32 KB
199206
200- elem_size = np .dtype (dtype ).itemsize
201- num_elements = rows * cols
202- total_bytes = num_elements * elem_size
203- batch_size_bytes = 32 * 1024 # 32 KB
207+ arr = np .empty (num_elements , dtype = dtype )
208+ mv = memoryview (arr ).cast ("B" )
204209
205- arr = np .empty (num_elements , dtype = dtype )
206- mv = memoryview (arr ).cast ("B" )
207-
208- try :
209210 pipe_id = 0
210211 pipe = sds .FIFO_JAVA2PY_PIPES [pipe_id ]
211212
@@ -219,15 +220,16 @@ def matrix_block_to_numpy(sds, mb: JavaObject):
219220 fut .result () # wait for Java to finish writing
220221 return arr .reshape ((rows , cols ))
221222
222- except Exception as e :
223- sds .exception_and_close (e )
224- else :
225- buf = jvm .org .apache .sysds .runtime .util .Py4jConverterUtils .convertMBtoPy4JDenseArr (
226- mb
227- )
228- return np .frombuffer (buf , count = rows * cols , dtype = np .float64 ).reshape (
229- (rows , cols )
230- )
223+ else :
224+ buf = jvm .org .apache .sysds .runtime .util .Py4jConverterUtils .convertMBtoPy4JDenseArr (
225+ mb
226+ )
227+ return np .frombuffer (buf , count = rows * cols , dtype = np .float64 ).reshape (
228+ (rows , cols )
229+ )
230+ except Exception as e :
231+ sds .exception_and_close (e )
232+ return None
231233
232234
233235def convert (jvm , fb , idx , num_elements , value_type , pd_series , conversion = "column" ):
0 commit comments