|
8 | 8 | import numpy as np |
9 | 9 | import math |
10 | 10 | import platform |
| 11 | +from concurrent.futures import ProcessPoolExecutor |
11 | 12 |
|
12 | 13 | __author__ = 'willmcginnis', 'LiuShulun' |
13 | 14 |
|
@@ -108,7 +109,6 @@ class HashingEncoder(util.BaseEncoder, util.UnsupervisedTransformerMixin): |
108 | 109 | """ |
109 | 110 | prefit_ordinal = False |
110 | 111 | encoding_relation = util.EncodingRelation.ONE_TO_M |
111 | | - default_int_np_array = np.array(np.zeros((2,2), dtype='int')) |
112 | 112 |
|
113 | 113 | def __init__(self, max_process=0, max_sample=0, verbose=0, n_components=8, cols=None, drop_invariant=False, |
114 | 114 | return_df=True, hash_method='md5', process_creation_method='fork'): |
@@ -179,62 +179,54 @@ def _transform(self, X, override_return_df=False): |
179 | 179 | return X |
180 | 180 |
|
181 | 181 | @staticmethod |
182 | | - def hash_chunk(hash_method, shared_memory_result, np_df, N, shared_memory_offset): |
| 182 | + def hash_chunk(args): |
| 183 | + hash_method, np_df, N = args |
183 | 184 | # Calling getattr outside the loop saves some time in the loop |
184 | 185 | hasher_constructor = getattr(hashlib, hash_method) |
185 | 186 | # Same when the call to getattr is implicit |
186 | 187 | int_from_bytes = int.from_bytes |
| 188 | + result = np.zeros((np_df.shape[0], N), dtype='int') |
187 | 189 | for i, row in enumerate(np_df): |
188 | 190 | for val in row: |
189 | 191 | if val is not None: |
190 | 192 | hasher = hasher_constructor() |
191 | | - hasher.update(bytes(str(val), 'utf-8')) |
192 | 193 | # Computes an integer index from the hasher digest. The endian is |
193 | 194 | # "big" as the code use to read: |
194 | 195 | # column_index = int(hasher.hexdigest(), 16) % N |
195 | 196 | # which is implicitly considering the hexdigest to be big endian, |
196 | 197 | # even if the system is little endian. |
197 | 198 | # Building the index that way is about 30% faster than using the |
198 | 199 | # hexdigest. |
| 200 | + hasher.update(bytes(str(val), 'utf-8')) |
199 | 201 | column_index = int_from_bytes(hasher.digest(), byteorder='big') % N |
200 | | - row_index = (shared_memory_offset + i)*N |
201 | | - shared_memory_index = row_index + column_index |
202 | | - shared_memory_result[shared_memory_index] += 1 |
| 202 | + result[i, column_index] += 1 |
| 203 | + return result |
203 | 204 |
|
204 | | - def hashing_trick_with_np_parallel(self, df, N): |
| 205 | + def hashing_trick_with_np_parallel(self, df, N: int): |
205 | 206 | np_df = df.to_numpy() |
206 | | - shared_memory_result = multiprocessing.RawArray(HashingEncoder.default_int_np_array.dtype.char, len(df)*N) |
207 | | - |
208 | | - process_list = [] |
209 | | - chunk_size = int(len(np_df)/self.max_process) |
210 | 207 | ctx = multiprocessing.get_context(self.process_creation_method) |
211 | | - for i in range(0, self.max_process-1): |
212 | | - process = ctx.Process(target=self.hash_chunk, |
213 | | - args=(self.hash_method, shared_memory_result, np_df[i*chunk_size:((i+1)*chunk_size)], N, i*chunk_size)) |
214 | | - process_list.append(process) |
215 | 208 |
|
216 | | - # The last process processes all the rest of the dataframe, because the number of rows might not |
217 | | - # be divisible by max_process. |
218 | | - process = ctx.Process(target=self.hash_chunk, |
219 | | - args=(self.hash_method, shared_memory_result, np_df[(self.max_process-1)*chunk_size:], N, (self.max_process-1)*chunk_size)) |
220 | | - process_list.append(process) |
| 209 | + with ProcessPoolExecutor(max_workers=self.max_process, mp_context=ctx) as executor: |
| 210 | + result = np.concatenate(list( |
| 211 | + executor.map( |
| 212 | + self.hash_chunk, |
| 213 | + zip( |
| 214 | + [self.hash_method]*self.max_process, |
| 215 | + np.array_split(np_df, self.max_process), |
| 216 | + [N]*self.max_process |
| 217 | + ) |
| 218 | + ) |
| 219 | + )) |
221 | 220 |
|
222 | | - for process in process_list: |
223 | | - process.start() |
224 | | - for process in process_list: |
225 | | - process.join() |
226 | | - |
227 | | - np_result = np.array(shared_memory_result, 'int') |
228 | | - |
229 | | - return pd.DataFrame(np_result.reshape(len(df), N), index=df.index) |
| 221 | + return pd.DataFrame(result, index=df.index) |
230 | 222 |
|
231 | 223 | def hashing_trick_with_np_no_parallel(self, df, N): |
232 | 224 | np_df = df.to_numpy() |
233 | | - np_result = np.zeros((len(df)*N), dtype='int') |
234 | 225 |
|
235 | | - HashingEncoder.hash_chunk(self.hash_method, np_result, np_df, N, 0) |
| 226 | + result = HashingEncoder.hash_chunk((self.hash_method, np_df, N)) |
| 227 | + |
| 228 | + return pd.DataFrame(result, index=df.index) |
236 | 229 |
|
237 | | - return pd.DataFrame(np_result.reshape(len(df), N), index=df.index) |
238 | 230 |
|
239 | 231 | def hashing_trick(self, X_in, hashing_method='md5', N=2, cols=None, make_copy=False): |
240 | 232 | """A basic hashing implementation with configurable dimensionality/precision |
|
0 commit comments