66from  distutils .version  import  LooseVersion 
77from  functools  import  partial 
88
9- import  dask 
10- import  dask .dataframe  as  dd 
119import  numpy  as  np 
1210import  tensorflow  as  tf 
1311from  tensorflow .keras .utils  import  to_categorical  as  tf_to_categorical 
1412
1513from  deeptables .utils  import  consts , dt_logging 
16- 
14+ from   hypernets . tabular   import   get_tool_box ,  is_dask_installed 
1715logger  =  dt_logging .get_logger (__name__ )
1816
1917TFDG_DASK_CHUNK  =  100 
@@ -105,6 +103,7 @@ def __call__(self, X, y=None, *, batch_size, shuffle, drop_remainder):
105103        return  ds 
106104
107105    def  _to_ds20 (self , X , y = None , * , batch_size , shuffle , drop_remainder ):
106+         import  dask 
108107        ds_types  =  {}
109108        ds_shapes  =  {}
110109        meta  =  self ._get_meta (X )
@@ -118,6 +117,7 @@ def _to_ds20(self, X, y=None, *, batch_size, shuffle, drop_remainder):
118117                ds_types [k ] =  'int32' 
119118
120119        if  y  is  not   None :
120+             import  dask .dataframe  as  dd 
121121            if  isinstance (y , dd .Series ):
122122                y  =  y .to_dask_array (lengths = True )
123123            if  self .task  ==  consts .TASK_MULTICLASS :
@@ -149,6 +149,7 @@ def to_spec(name, dtype, idx):
149149        sig  =  {k : to_spec (k , dtype , idx ) for  k , (dtype , idx ) in  meta .items ()}
150150
151151        if  y  is  not   None :
152+             import  dask .dataframe  as  dd 
152153            if  isinstance (y , dd .Series ):
153154                y  =  y .to_dask_array (lengths = True )
154155            if  self .task  ==  consts .TASK_MULTICLASS :
@@ -167,6 +168,7 @@ def to_spec(name, dtype, idx):
167168
168169    @staticmethod  
169170    def  _generate (meta , X , y , * , batch_size , shuffle , drop_remainder ):
171+         import  dask 
170172        total_size  =  dask .compute (X .shape )[0 ][0 ]
171173        chunk_size  =  min (total_size , batch_size  *  TFDG_DASK_CHUNK )
172174        fn  =  partial (_TFDGForDask ._compute_chunk , X , y , chunk_size )
@@ -205,6 +207,7 @@ def _generate(meta, X, y, *, batch_size, shuffle, drop_remainder):
205207
206208    @staticmethod  
207209    def  _to_categorical (y , * , num_classes ):
210+         import  dask 
208211        if  len (y .shape ) ==  1 :
209212            y  =  y .reshape (dask .compute (y .shape [0 ])[0 ], 1 )
210213        fn  =  partial (tf_to_categorical , num_classes = num_classes , dtype = 'float32' )
@@ -213,6 +216,7 @@ def _to_categorical(y, *, num_classes):
213216
214217    @staticmethod  
215218    def  _compute_chunk (X , y , chunk_size , i ):
219+         import  dask 
216220        try :
217221            Xc  =  X [i :i  +  chunk_size ]
218222            yc  =  y [i :i  +  chunk_size ] if  y  is  not   None  else  None 
@@ -236,7 +240,12 @@ def _range(start, stop, step, shuffle):
236240def  to_dataset (config , task , num_classes , X , y = None , * ,
237241               batch_size , shuffle , drop_remainder ,
238242               categorical_columns , continuous_columns , var_len_categorical_columns ):
239-     cls  =  _TFDGForDask  if  isinstance (X , dd .DataFrame ) else  _TFDGForPandas 
243+ 
244+     if  is_dask_installed :
245+         import  dask .dataframe  as  dd 
246+         cls  =  _TFDGForDask  if  isinstance (X , dd .DataFrame ) else  _TFDGForPandas 
247+     else :
248+         cls  =  _TFDGForPandas 
240249    logger .info (f'create dataset generator with { cls .__name__ }  , ' 
241250                f'batch_size={ batch_size }  , shuffle={ shuffle }  , drop_remainder={ drop_remainder }  ' )
242251
0 commit comments