@@ -79,6 +79,7 @@ def where_function(cond, var1, var2):
7979 "kurt" : lambda x , skipna = True , bias = False : x .kurt (skipna = skipna , bias = bias ),
8080 "kurtosis" : lambda x , skipna = True , bias = False : x .kurtosis (skipna = skipna , bias = bias ),
8181 "nunique" : lambda x : x .nunique (),
82+ "mode" : lambda x , skipna = True : x .mode (skipna = skipna ),
8283}
8384
8485
@@ -126,53 +127,76 @@ def _calc_result_shape(self, df):
126127
127128 if isinstance (result_df , pd .DataFrame ):
128129 self .output_types = [OutputType .dataframe ]
129- return result_df .dtypes , result_df .index
130+ return result_df .dtypes , result_df .index , 2
130131 elif isinstance (result_df , pd .Series ):
131132 self .output_types = [OutputType .series ]
132- return pd .Series ([result_df .dtype ], index = [result_df .name ]), result_df .index
133+ return (
134+ pd .Series ([result_df .dtype ], index = [result_df .name ]),
135+ result_df .index ,
136+ 1 ,
137+ )
133138 else :
134139 self .output_types = [OutputType .scalar ]
135- return np .array (result_df ).dtype , None
140+ return np .array (result_df ).dtype , None , 0
136141
137142 def __call__ (self , df , output_type = None , dtypes = None , index = None ):
138143 self ._output_types = df .op .output_types
139144 normalize_reduction_funcs (self , ndim = df .ndim )
140145 if output_type is None or dtypes is None :
141146 with enter_mode (kernel = False , build = False ):
142- dtypes , index = self ._calc_result_shape (df )
147+ dtypes , index , out_ndim = self ._calc_result_shape (df )
143148 else :
144149 self .output_types = [output_type ]
150+ if output_type == OutputType .dataframe :
151+ out_ndim = 2
152+ elif output_type == OutputType .series :
153+ out_ndim = 1
154+ else :
155+ out_ndim = 0
145156
157+ reduced_len = (
158+ 1 if df .ndim != out_ndim or isinstance (self .raw_func , list ) else np .nan
159+ )
146160 if self .output_types [0 ] == OutputType .dataframe :
147161 if self .axis == 0 :
148- new_shape = (len (index ), len (dtypes ))
149- new_index = parse_index (index , store_data = True )
162+ new_shape = (len (index ) * reduced_len , len (dtypes ))
163+ new_index_value = parse_index (index , store_data = True )
164+ new_dtypes = dtypes
165+ new_col_name = parse_index (dtypes .index , store_data = True )
150166 else :
151- new_shape = (df .shape [0 ], len (dtypes ))
152- new_index = df .index_value
167+ new_shape = (df .shape [0 ], len (dtypes ) * reduced_len )
168+ new_index_value = df .index_value
169+ new_dtypes = None if np .isnan (reduced_len ) else dtypes
170+ new_col_name = parse_index (
171+ dtypes .index , store_data = not np .isnan (reduced_len )
172+ )
153173 return self .new_dataframe (
154174 [df ],
155175 shape = new_shape ,
156- dtypes = dtypes ,
157- index_value = new_index ,
158- columns_value = parse_index ( dtypes . index , store_data = True ) ,
176+ dtypes = new_dtypes ,
177+ index_value = new_index_value ,
178+ columns_value = new_col_name ,
159179 )
160180 elif self .output_types [0 ] == OutputType .series :
161181 if df .ndim == 1 :
162- new_shape = (len (index ),)
163- new_index = parse_index (index , store_data = True )
182+ new_shape = (len (index ) * reduced_len ,)
183+ new_index_value = parse_index (
184+ index , store_data = not np .isnan (reduced_len )
185+ )
164186 elif self .axis == 0 :
165- new_shape = (len (index ),)
166- new_index = parse_index (index , store_data = True )
187+ new_shape = (len (index ) * reduced_len ,)
188+ new_index_value = parse_index (
189+ index , store_data = not np .isnan (reduced_len )
190+ )
167191 else :
168192 new_shape = (df .shape [0 ],)
169- new_index = df .index_value
193+ new_index_value = df .index_value
170194 return self .new_series (
171195 [df ],
172196 shape = new_shape ,
173197 dtype = dtypes [0 ],
174198 name = dtypes .index [0 ],
175- index_value = new_index ,
199+ index_value = new_index_value ,
176200 )
177201 elif self .output_types [0 ] == OutputType .tensor :
178202 return self .new_tileable ([df ], dtype = dtypes , shape = (np .nan ,))
@@ -208,6 +232,9 @@ def _gen_map_chunks(
208232
209233 agg_chunks = np .empty (agg_chunks_shape , dtype = object )
210234 dtypes_cache = dict ()
235+ reduced_len = (
236+ 1 if in_df .ndim != out_df .ndim or isinstance (op .raw_func , list ) else np .nan
237+ )
211238 for chunk in in_df .chunks :
212239 input_index = chunk .index [1 - axis ] if len (chunk .index ) > 1 else 0
213240 if input_index not in input_index_to_output :
@@ -234,9 +261,9 @@ def _gen_map_chunks(
234261
235262 if map_op .output_types [0 ] == OutputType .dataframe :
236263 if axis == 0 :
237- shape = (1 , out_df .shape [- 1 ])
264+ shape = (reduced_len , chunk .shape [- 1 ])
238265 if out_df .ndim == 2 :
239- columns_value = out_df .columns_value
266+ columns_value = chunk .columns_value
240267 index_value = out_df .index_value
241268 else :
242269 columns_value = out_df .index_value
@@ -259,11 +286,11 @@ def _gen_map_chunks(
259286 index_value = index_value ,
260287 )
261288 else :
262- shape = (out_df .shape [0 ], 1 )
289+ shape = (chunk .shape [0 ], reduced_len )
263290 columns_value = parse_index (
264291 pd .Index ([0 ]), out_df .key , store_data = True
265292 )
266- index_value = out_df .index_value
293+ index_value = chunk .index_value
267294
268295 agg_chunk = map_op .new_chunk (
269296 [chunk ],
@@ -273,7 +300,9 @@ def _gen_map_chunks(
273300 index_value = index_value ,
274301 )
275302 else :
276- agg_chunk = map_op .new_chunk ([chunk ], shape = (1 ,), index = new_index )
303+ agg_chunk = map_op .new_chunk (
304+ [chunk ], shape = (reduced_len ,), index = new_index
305+ )
277306 agg_chunks [agg_chunk .index ] = agg_chunk
278307 return agg_chunks
279308
@@ -409,6 +438,9 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
409438 chunks = cls ._gen_map_chunks (
410439 op , in_df , out_df , axis_func_infos , input_index_to_output
411440 )
441+ reduced_len = (
442+ 1 if in_df .ndim != out_df .ndim or isinstance (op .raw_func , list ) else np .nan
443+ )
412444 while chunks .shape [axis ] > combine_size :
413445 if axis == 0 :
414446 new_chunks_shape = (
@@ -429,16 +461,16 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
429461 chks = chunks [i : i + combine_size , idx1 ]
430462 chunk_index = (idx0 , idx1 )
431463 if chks [0 ].ndim == 1 :
432- concat_shape = (len (chks ),)
433- agg_shape = (1 ,)
464+ concat_shape = (len (chks ) * reduced_len ,)
465+ agg_shape = (reduced_len ,)
434466 else :
435- concat_shape = (len (chks ), chks [0 ].shape [1 ])
436- agg_shape = (chks [0 ].shape [1 ], 1 )
467+ concat_shape = (len (chks ) * reduced_len , chks [0 ].shape [1 ])
468+ agg_shape = (chks [0 ].shape [1 ], reduced_len )
437469 else :
438470 chks = chunks [idx1 , i : i + combine_size ]
439471 chunk_index = (idx1 , idx0 )
440- concat_shape = (chks [0 ].shape [0 ], len (chks ))
441- agg_shape = (chks [0 ].shape [0 ], 1 )
472+ concat_shape = (chks [0 ].shape [0 ], len (chks ) * reduced_len )
473+ agg_shape = (chks [0 ].shape [0 ], reduced_len )
442474
443475 chks = chks .reshape ((chks .shape [0 ],)).tolist ()
444476 if len (chks ) == 1 :
@@ -485,12 +517,12 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
485517 if axis == 0 :
486518 chks = chunks [:, idx ]
487519 if chks [0 ].ndim == 1 :
488- concat_shape = (len (chks ),)
520+ concat_shape = (len (chks ) * reduced_len ,)
489521 else :
490- concat_shape = (len (chks ), chks [0 ].shape [1 ])
522+ concat_shape = (len (chks ) * reduced_len , chks [0 ].shape [1 ])
491523 else :
492524 chks = chunks [idx , :]
493- concat_shape = (chks [0 ].shape [0 ], len (chks ))
525+ concat_shape = (chks [0 ].shape [0 ], len (chks ) * reduced_len )
494526 chks = chks .reshape ((chks .shape [0 ],)).tolist ()
495527 chk = concat_op .new_chunk (
496528 chks ,
@@ -519,7 +551,7 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
519551 shape_len = len (col_index )
520552 kw .update (
521553 dict (
522- shape = (out_df .shape [0 ], shape_len ),
554+ shape = (out_df .shape [0 ] * reduced_len , shape_len ),
523555 columns_value = columns_value ,
524556 index = (0 , idx ),
525557 dtypes = out_df .dtypes [columns_value .to_pandas ()],
@@ -531,7 +563,10 @@ def _tile_tree(cls, op: "DataFrameAggregate"):
531563 dict (
532564 index = (idx , 0 ),
533565 index_value = src_col_chunk .index_value ,
534- shape = (src_col_chunk .shape [0 ], out_df .shape [1 ]),
566+ shape = (
567+ src_col_chunk .shape [0 ],
568+ out_df .shape [1 ] * reduced_len ,
569+ ),
535570 dtypes = out_df .dtypes ,
536571 )
537572 )
@@ -843,25 +878,26 @@ def _execute_agg(cls, ctx, op: "DataFrameAggregate"):
843878 concat_df = concat_df .iloc [:, 0 ]
844879 else :
845880 concat_df = concat_df .iloc [:, 0 ]
846- concat_df .name = op . outputs [ 0 ] .name
881+ concat_df .name = out .name
847882
848- concat_df = concat_df .astype (op . outputs [ 0 ] .dtype , copy = False )
883+ concat_df = concat_df .astype (out .dtype , copy = False )
849884 elif op .output_types [0 ] == OutputType .scalar :
850885 concat_df = concat_df .iloc [0 ]
851886 try :
852- concat_df = concat_df .astype (op . outputs [ 0 ] .dtype )
887+ concat_df = concat_df .astype (out .dtype )
853888 except AttributeError :
854889 # concat_df may be a string and has no `astype` method
855890 pass
856891 elif op .output_types [0 ] == OutputType .tensor :
857892 concat_df = xp .array (concat_df ).astype (dtype = out .dtype )
858893 else :
859- if axis == 0 :
860- concat_df = concat_df .reindex (op .outputs [0 ].index_value .to_pandas ())
861- else :
862- concat_df = concat_df [op .outputs [0 ].columns_value .to_pandas ()]
894+ if not np .isnan (out .shape [op .axis ]):
895+ if axis == 0 :
896+ concat_df = concat_df .reindex (out .index_value .to_pandas ())
897+ else :
898+ concat_df = concat_df [out .columns_value .to_pandas ()]
863899
864- concat_df = concat_df .astype (op . outputs [ 0 ] .dtypes , copy = False )
900+ concat_df = concat_df .astype (out .dtypes , copy = False )
865901 ctx [op .outputs [0 ].key ] = concat_df
866902
867903 @classmethod
0 commit comments