@@ -54,7 +54,7 @@ class HashingEncoder(util.BaseEncoder, util.UnsupervisedTransformerMixin):
5454 6C12T CPU with 100,000 samples makes max_sample=16,666.
5555 It is not recommended to set it larger than the default value.
5656 n_components: int
57- how many bits to use to represent the feature. By default we use 8 bits.
57+ how many bits to use to represent the feature. By default, we use 8 bits.
5858 For high-cardinality features, consider using up-to 32 bits.
5959
6060 Example
@@ -132,38 +132,39 @@ def __init__(self, max_process=0, max_sample=0, verbose=0, n_components=8, cols=
132132 def _fit (self , X , y = None , ** kwargs ):
133133 pass
134134
135- @staticmethod
136- def require_data (self , data_lock , new_start , done_index , hashing_parts , cols , process_index ):
137- if data_lock .acquire ():
138- if new_start .value :
139- end_index = 0
140- new_start .value = False
141- else :
142- end_index = done_index .value
143-
144- if all ([self .data_lines > 0 , end_index < self .data_lines ]):
145- start_index = end_index
146- if (self .data_lines - end_index ) <= self .max_sample :
147- end_index = self .data_lines
135+ def require_data (self , data_lock , new_start , done_index , hashing_parts , process_index ):
136+ is_finished = False
137+ while not is_finished :
138+ if data_lock .acquire ():
139+ if new_start .value :
140+ end_index = 0
141+ new_start .value = False
148142 else :
149- end_index += self .max_sample
150- done_index .value = end_index
151- data_lock .release ()
143+ end_index = done_index .value
152144
153- data_part = self .X .iloc [start_index : end_index ]
154- # Always get df and check it after merge all data parts
155- data_part = self .hashing_trick (X_in = data_part , hashing_method = self .hash_method , N = self .n_components , cols = self .cols )
156- part_index = int (math .ceil (end_index / self .max_sample ))
157- hashing_parts .put ({part_index : data_part })
158- if self .verbose == 5 :
159- print ("Process - " + str (process_index ),
160- "done hashing data : " + str (start_index ) + "~" + str (end_index ))
161- if end_index < self .data_lines :
162- self .require_data (self , data_lock , new_start , done_index , hashing_parts , cols = cols , process_index = process_index )
145+ if all ([self .data_lines > 0 , end_index < self .data_lines ]):
146+ start_index = end_index
147+ if (self .data_lines - end_index ) <= self .max_sample :
148+ end_index = self .data_lines
149+ else :
150+ end_index += self .max_sample
151+ done_index .value = end_index
152+ data_lock .release ()
153+
154+ data_part = self .X .iloc [start_index : end_index ]
155+ # Always get df and check it after merge all data parts
156+ data_part = self .hashing_trick (X_in = data_part , hashing_method = self .hash_method ,
157+ N = self .n_components , cols = self .cols )
158+ part_index = int (math .ceil (end_index / self .max_sample ))
159+ hashing_parts .put ({part_index : data_part })
160+ is_finished = end_index >= self .data_lines
161+ if self .verbose == 5 :
162+ print (f"Process - { process_index } done hashing data : { start_index } ~ { end_index } " )
163+ else :
164+ data_lock .release ()
165+ is_finished = True
163166 else :
164167 data_lock .release ()
165- else :
166- data_lock .release ()
167168
168169 def _transform (self , X ):
169170 """
@@ -184,12 +185,12 @@ def _transform(self, X):
184185 if self .max_sample == 0 :
185186 self .max_sample = 1
186187 if self .max_process == 1 :
187- self .require_data (self , data_lock , new_start , done_index , hashing_parts , cols = self . cols , process_index = 1 )
188+ self .require_data (data_lock , new_start , done_index , hashing_parts , process_index = 1 )
188189 else :
189190 n_process = []
190- for thread_index in range (self .max_process ):
191+ for thread_idx in range (self .max_process ):
191192 process = multiprocessing .Process (target = self .require_data ,
192- args = (self , data_lock , new_start , done_index , hashing_parts , self . cols , thread_index + 1 ))
193+ args = (data_lock , new_start , done_index , hashing_parts , thread_idx + 1 ))
193194 process .daemon = True
194195 n_process .append (process )
195196 for process in n_process :
0 commit comments