|
4 | 4 | import re |
5 | 5 | import zipfile |
6 | 6 | from typing import Any, Callable, Dict, Iterator, List, Optional, Sequence, Tuple, Union |
7 | | -from .helper import string_sig |
| 7 | +import numpy as np |
8 | 8 | import pandas |
9 | 9 | from pandas.api.types import is_numeric_dtype |
| 10 | +from .helper import string_sig |
10 | 11 |
|
11 | 12 |
|
12 | 13 | def enumerate_csv_files( |
@@ -197,6 +198,27 @@ def load(self, verbose: int = 0): |
197 | 198 | if verbose: |
198 | 199 | print(f"[CubeLogs.load] load from list of dicts, n={len(self._data)}") |
199 | 200 | self.data = pandas.DataFrame(self._data) |
| 201 | + elif isinstance(self._data, list) and all( |
| 202 | + isinstance(r, pandas.DataFrame) for r in self._data |
| 203 | + ): |
| 204 | + if verbose: |
| 205 | + print(f"[CubeLogs.load] load from list of DataFrame, n={len(self._data)}") |
| 206 | + self.data = pandas.concat(self._data, axis=0) |
| 207 | + elif isinstance(self._data, list): |
| 208 | + cubes = [] |
| 209 | + for item in enumerate_csv_files(self._data, verbose=verbose): |
| 210 | + df = open_dataframe(item) |
| 211 | + cube = CubeLogs( |
| 212 | + df, |
| 213 | + time=self._time, |
| 214 | + keys=self._keys, |
| 215 | + values=self._values, |
| 216 | + ignored=self._ignored, |
| 217 | + recent=self.recent, |
| 218 | + ) |
| 219 | + cube.load() |
| 220 | + cubes.append(cube.data) |
| 221 | + self.data = pandas.concat(cubes, axis=0) |
200 | 222 | else: |
201 | 223 | raise NotImplementedError( |
202 | 224 | f"Not implemented with the provided data (type={type(self._data)})" |
@@ -281,16 +303,25 @@ def _preprocess(self): |
281 | 303 | last = self.values[0] |
282 | 304 | gr = self.data[[self.time, *self.keys, last]].groupby([self.time, *self.keys]).count() |
283 | 305 | gr = gr[gr[last] > 1] |
284 | | - assert gr.shape[0] == 0, f"There are duplicated rows:\n{gr}" |
285 | 306 | if self.recent: |
286 | | - gr = self.data[[*self.keys, self.time]].groupby(self.keys, as_index=False).max() |
287 | | - filtered = pandas.merge(self.data, gr, on=[self.time, *self.keys]) |
| 307 | + cp = self.data.copy() |
| 308 | + assert ( |
| 309 | + "__index__" not in cp.columns |
| 310 | + ), f"'__index__' should not be a column in {cp.columns}" |
| 311 | + cp["__index__"] = np.arange(cp.shape[0]) |
| 312 | + gr = ( |
| 313 | + cp[[*self.keys, self.time, "__index__"]] |
| 314 | + .groupby(self.keys, as_index=False) |
| 315 | + .max() |
| 316 | + ) |
| 317 | + filtered = pandas.merge(cp, gr, on=[self.time, "__index__", *self.keys]) |
288 | 318 | assert filtered.shape[0] <= self.data.shape[0], ( |
289 | 319 | f"Keeping the latest row brings more row {filtered.shape} " |
290 | 320 | f"(initial is {self.data.shape})." |
291 | 321 | ) |
292 | | - self.data = filtered |
| 322 | + self.data = filtered.drop("__index__", axis=1) |
293 | 323 | else: |
| 324 | + assert gr.shape[0] == 0, f"There are duplicated rows:\n{gr}" |
294 | 325 | gr = self.data[[*self.keys, self.time]].groupby(self.keys).count() |
295 | 326 | gr = gr[gr[self.time] > 1] |
296 | 327 | assert ( |
|
0 commit comments