-
-
Notifications
You must be signed in to change notification settings - Fork 146
Expand file tree
/
Copy pathdatautils.py
More file actions
306 lines (248 loc) · 11.5 KB
/
datautils.py
File metadata and controls
306 lines (248 loc) · 11.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
"""
**datautils** module provide some utility functions for data manipulation.
important
This is (and should remain) the only non-framework module with dependencies to libraries like pandas or sklearn
until replacement by simpler/lightweight versions to avoid potential version conflicts with libraries imported by benchmark frameworks.
Also, this module is intended to be imported by frameworks integration modules,
therefore, it should have no dependency to any other **amlb** module outside **utils**.
"""
import logging
import os
import arff
import numpy as np
import pandas as pd
from sklearn.base import TransformerMixin
from sklearn.metrics import accuracy_score, auc, average_precision_score, balanced_accuracy_score, confusion_matrix, fbeta_score, \
log_loss, mean_absolute_error, mean_squared_error, mean_squared_log_error, precision_recall_curve, \
r2_score, roc_auc_score # just aliasing
from sklearn.preprocessing import LabelEncoder, LabelBinarizer, OneHotEncoder
from .utils import profile, path_from_split, repr_def, split_path, touch
try:
from sklearn.preprocessing import OrdinalEncoder # from sklearn 0.20
except ImportError:
class OrdinalEncoder(LabelEncoder):
def _reshape(self, y):
return np.asarray(y, dtype=object).reshape(len(y))
def fit(self, y):
super().fit(self._reshape(y))
return self
def fit_transform(self, y):
return super().fit_transform(self._reshape(y)).astype(float, copy=False)
def transform(self, y):
return super().transform(self._reshape(y)).astype(float, copy=False)
try:
from sklearn.impute import SimpleImputer as Imputer # from sklearn 0.20
except ImportError:
from sklearn.preprocessing import Imputer as Imp
class Imputer(Imp):
def __init__(self, missing_values=np.NaN, **kwargs):
super(Imputer, self).__init__(missing_values='NaN' if missing_values is np.NaN else missing_values, **kwargs)
log = logging.getLogger(__name__)
def read_csv(path, nrows=None, header=True, index=False, as_data_frame=True, dtype=None):
"""
read csv file to DataFrame.
for now, delegates to pandas, just simplifying signature in the case we want to get rid of pandas dependency
(numpy should be enough for our needs).
:param path: the path to a csv file or a file-like object, or readable (with read() method) object.
:param nrows: the number of rows to read, if not specified, all are read.
:param header: if the columns header should be read.
:param as_data_frame: if the result should be returned as a data frame (default) or a numpy array.
:param dtype: data type for columns.
:return: a DataFrame
"""
df = pd.read_csv(path,
nrows=nrows,
header=0 if header else None,
index_col=0 if index else None,
dtype=dtype)
return df if as_data_frame else df.values
def write_csv(data, path, header=True, columns=None, index=False, append=False):
if is_data_frame(data):
data_frame = data
else:
data_frame = to_data_frame(data, columns=columns)
header = header and columns is not None
touch(path)
data_frame.to_csv(path,
header=header,
index=index,
mode='a' if append else 'w')
@profile(logger=log)
def reorder_dataset(path, target_src=0, target_dest=-1, save=True):
if target_src == target_dest and save: # no reordering needed, not data to load, returning original path
return path
p = split_path(path)
p.basename += ("_target_" + ("first" if target_dest == 0 else "last" if target_dest == -1 else str(target_dest)))
reordered_path = path_from_split(p)
if os.path.isfile(reordered_path):
if save: # reordered file already exists, returning it as there's no data to load here
return reordered_path
else: # reordered file already exists, use it to load reordered data
path = reordered_path
with open(path) as file:
df = arff.load(file)
columns = np.asarray(df['attributes'], dtype=object)
data = np.asarray(df['data'], dtype=object)
if target_src == target_dest or path == reordered_path: # no reordering needed, returning loaded data
return data
ori = list(range(len(columns)))
src = len(columns)+1+target_src if target_src < 0 else target_src
dest = len(columns)+1+target_dest if target_dest < 0 else target_dest
if src < dest:
new = ori[:src]+ori[src+1:dest]+[src]+ori[dest:]
elif src > dest:
new = ori[:dest]+[src]+ori[dest:src]+ori[src+1:]
else: # no reordering needed, returning loaded data or original path
return data if not save else path
reordered_attr = columns[new]
reordered_data = data[:, new]
if not save:
return reordered_data
with open(reordered_path, 'w') as file:
arff.dump({
'description': df['description'],
'relation': df['relation'],
'attributes': reordered_attr.tolist(),
'data': reordered_data.tolist()
}, file)
# TODO: provide the possibility to return data even if save is set to false,
# as the client code doesn't want to have to load the data again,
# and may want to benefit from the caching of reordered data for future runs.
return reordered_path
def is_data_frame(df):
return isinstance(df, pd.DataFrame)
def to_data_frame(obj, columns=None):
if obj is None:
return pd.DataFrame()
elif isinstance(obj, dict):
return pd.DataFrame.from_dict(obj, columns=columns, orient='columns' if columns is None else 'index')
elif isinstance(obj, (list, np.ndarray)):
return pd.DataFrame.from_records(obj, columns=columns)
else:
raise ValueError("Object should be a dictionary {col1:values, col2:values, ...} "
"or an array of dictionary-like objects [{col1:val, col2:val}, {col1:val, col2:val}, ...].")
class Encoder(TransformerMixin):
"""
Overly complex "generic" encoder that can handle missing values, auto encoded format (e.g. int for target, float for predictors)...
Should never have written this, but does the job currently. However, should think about simpler single-purpose approach.
"""
def __init__(self, type='label', target=True, encoded_type=int,
missing_policy='ignore', missing_values=None, missing_replaced_by='',
normalize_fn=None):
"""
:param type:
:param target:
:param missing_policy: one of ['ignore', 'mask', 'encode'].
ignore: use only if there's no missing value for sure data to be transformed, otherwise it may raise an error during transform()
mask: replace missing values only internally
encode: encode all missing values as the encoded value of missing_replaced_by
:param missing_values:
:param missing_replaced_by:
:param normalize_fn: if provided, function applied to all elements during fit and transform (for example, trimming spaces, lowercase...)
"""
super().__init__()
assert missing_policy in ['ignore', 'mask', 'encode']
self.for_target = target
self.missing_policy = missing_policy
self.missing_values = set(missing_values).union([None]) if missing_values else {None}
self.missing_replaced_by = missing_replaced_by
self.missing_encoded_value = None
self.normalize_fn = normalize_fn
self.classes = None
# self.encoded_type = int if target else encoded_type
self.encoded_type = encoded_type
if type == 'label':
self.delegate = LabelEncoder() if target else OrdinalEncoder()
elif type == 'one-hot':
self.delegate = LabelBinarizer() if target else OneHotEncoder(sparse=False, handle_unknown='ignore')
elif type == 'no-op':
self.delegate = None
# self.encoded_type = encoded_type
else:
raise ValueError("Encoder `type` should be one of {}.".format(['label', 'one-hot']))
@property
def _ignore_missing(self):
return self.for_target or self.missing_policy == 'ignore'
@property
def _mask_missing(self):
return not self.for_target and self.missing_policy == 'mask'
@property
def _encode_missing(self):
return not self.for_target and self.missing_policy == 'encode'
def _reshape(self, vec):
return vec if self.for_target else vec.reshape(-1, 1)
def fit(self, vec):
"""
:param vec: must be a line vector (array)
:return:
"""
if not self.delegate:
return self
vec = np.asarray(vec, dtype=object)
if self.normalize_fn:
vec = self.normalize_fn(vec)
self.classes = np.unique(vec) if self._ignore_missing else np.unique(np.insert(vec, 0, self.missing_replaced_by))
if self._mask_missing:
self.missing_encoded_value = self.delegate.fit_transform(self._reshape(self.classes))[0]
else:
self.delegate.fit(self._reshape(self.classes))
return self
def transform(self, vec, **params):
"""
:param vec: must be single value (str) or a line vector (array)
:param params:
:return:
"""
if log.isEnabledFor(5): # logging.TRACE
log.debug("Transforming %s using %s", vec, repr_def(self))
return_value = lambda v: v
if isinstance(vec, str):
vec = [vec]
return_value = lambda v: v[0]
vec = np.asarray(vec, dtype=object)
if not self.delegate:
return return_value(vec.astype(self.encoded_type, copy=False))
if self._mask_missing or self._encode_missing:
mask = [v in self.missing_values for v in vec]
if any(mask):
# if self._mask_missing:
# missing = vec[mask]
vec[mask] = self.missing_replaced_by
if self.normalize_fn:
vec = self.normalize_fn(vec)
res = self.delegate.transform(self._reshape(vec), **params).astype(self.encoded_type, copy=False)
if self._mask_missing:
res[mask] = np.NaN if self.encoded_type == float else None
return return_value(res)
if self.normalize_fn:
vec = self.normalize_fn(vec)
return return_value(self.delegate.transform(self._reshape(vec), **params).astype(self.encoded_type, copy=False))
def inverse_transform(self, vec, **params):
"""
:param vec: must a single value or line vector (array)
:param params:
:return:
"""
if not self.delegate:
return vec
# TODO: handle mask
vec = np.asarray(vec).astype(self.encoded_type, copy=False)
return self.delegate.inverse_transform(vec, **params)
def impute(X_fit, *X_s, missing_values=np.NaN, strategy='mean'):
"""
:param X_fit:
:param X_s:
:param missing_values:
:param strategy: 'mean', 'median', 'most_frequent', 'constant' (from sklearn 0.20 only, requires fill_value arg)
:return:
"""
# TODO: impute only if np.isnan(X_fit).any() ?
imputer = Imputer(missing_values=missing_values, strategy=strategy)
imputed = imputer.fit_transform(X_fit)
if len(X_s) > 0:
result = [imputed]
for X in X_s:
result.append(imputer.transform(X))
return result
else:
return imputed