|
| 1 | +import re |
| 2 | +from typing import Any, Callable, Dict, Optional, Sequence, Tuple |
| 3 | +import pandas |
| 4 | + |
| 5 | + |
| 6 | +class CubeLogs: |
| 7 | + """ |
| 8 | + Processes logs coming from experiments. |
| 9 | + """ |
| 10 | + |
| 11 | + def __init__( |
| 12 | + self, |
| 13 | + data: Any, |
| 14 | + time: str = "date", |
| 15 | + keys: Sequence[str] = ("version_.*", "model_.*"), |
| 16 | + values: Sequence[str] = ("time_.*", "disc_.*"), |
| 17 | + ignored: Sequence[str] = (), |
| 18 | + recent: bool = False, |
| 19 | + formulas: Optional[Dict[str, Callable[[pandas.DataFrame], pandas.Series]]] = None, |
| 20 | + ): |
| 21 | + self._data = data |
| 22 | + self._time = time |
| 23 | + self._keys = keys |
| 24 | + self._values = values |
| 25 | + self._ignored = ignored |
| 26 | + self.recent = recent |
| 27 | + self._formulas = formulas |
| 28 | + |
| 29 | + def load(self, verbose: int = 0): |
| 30 | + """Loads and preprocesses the data.""" |
| 31 | + if isinstance(self._data, pandas.DataFrame): |
| 32 | + if verbose: |
| 33 | + print(f"[CubeLogs.load] load from dataframe, shape={self._data.shape}") |
| 34 | + self.data = self._data |
| 35 | + elif isinstance(self._data, list) and all(isinstance(r, dict) for r in self._data): |
| 36 | + if verbose: |
| 37 | + print(f"[CubeLogs.load] load from list of dicts, shape={self._data.shape}") |
| 38 | + self.data = pandas.DataFrame(self._data) |
| 39 | + else: |
| 40 | + raise NotImplementedError( |
| 41 | + f"Not implemented with the provided data (type={type(self._data)})" |
| 42 | + ) |
| 43 | + |
| 44 | + assert all(isinstance(c, str) for c in self._data.columns), ( |
| 45 | + f"The class only supports string as column names " |
| 46 | + f"but found {[c for c in self._data.columns if not isinstance(c, str)]}" |
| 47 | + ) |
| 48 | + if verbose: |
| 49 | + print(f"[CubeLogs.load] loaded with shape={self.data.shape}") |
| 50 | + |
| 51 | + self._initialize_columns() |
| 52 | + if verbose: |
| 53 | + print(f"[CubeLogs.load] time={self.time}") |
| 54 | + print(f"[CubeLogs.load] keys={self.keys}") |
| 55 | + print(f"[CubeLogs.load] values={self.values}") |
| 56 | + print(f"[CubeLogs.load] ignored={self.ignored}") |
| 57 | + print(f"[CubeLogs.load] ignored_values={self.ignored_values}") |
| 58 | + print(f"[CubeLogs.load] ignored_keys={self.ignored_keys}") |
| 59 | + assert not ( |
| 60 | + set(self.keys) & set(self.values) |
| 61 | + ), f"Columns {set(self.keys) & set(self.values)} cannot be keys and values" |
| 62 | + assert not ( |
| 63 | + set(self.keys) & set(self.ignored) |
| 64 | + ), f"Columns {set(self.keys) & set(self.ignored)} cannot be keys and ignored" |
| 65 | + assert not ( |
| 66 | + set(self.values) & set(self.ignored) |
| 67 | + ), f"Columns {set(self.keys) & set(self.ignored)} cannot be values and ignored" |
| 68 | + assert ( |
| 69 | + self.time not in self.keys |
| 70 | + and self.time not in self.values |
| 71 | + and self.time not in self.ignored |
| 72 | + ), f"Column {self.time!r} is also a key, a value or ignored" |
| 73 | + self._columns = [self.time, *self.keys, *self.values, *self.ignored] |
| 74 | + self.dropped = [c for c in self.data.columns if c not in set(self.columns)] |
| 75 | + self.data = self.data[self.columns] |
| 76 | + if verbose: |
| 77 | + print(f"[CubeLogs.load] dropped={self.dropped}") |
| 78 | + print(f"[CubeLogs.load] data.shape={self.data.shape}") |
| 79 | + |
| 80 | + self._preprocess() |
| 81 | + if self.recent and verbose: |
| 82 | + print(f"[CubeLogs.load] keep most recent data.shape={self.data.shape}") |
| 83 | + |
| 84 | + # Let's apply the formulas |
| 85 | + if self._formulas: |
| 86 | + for k, f in self._formulas.items(): |
| 87 | + if verbose: |
| 88 | + print(f"[CubeLogs.load] apply formaula {k!r}") |
| 89 | + self.data[k] = f(self.data) |
| 90 | + self.values_for_key = {k: set(self.data[k]) for k in self.keys} |
| 91 | + nans = [c for c in self.keys if self.data[c].isna().astype(int).sum() > 0] |
| 92 | + assert not nans, f"The following keys {nans} have nan values. This is not allowed." |
| 93 | + if verbose: |
| 94 | + print(f"[CubeLogs.load] done, shape={self.shape}") |
| 95 | + |
| 96 | + @property |
| 97 | + def shape(self) -> Tuple[int, int]: |
| 98 | + "Returns the shape." |
| 99 | + assert hasattr(self, "data"), "Method load was not called" |
| 100 | + return self.data.shape |
| 101 | + |
| 102 | + @property |
| 103 | + def columns(self) -> Sequence[str]: |
| 104 | + "Returns the columns." |
| 105 | + assert hasattr(self, "data"), "Method load was not called" |
| 106 | + return self.data.columns |
| 107 | + |
| 108 | + def _preprocess(self): |
| 109 | + last = self.values[0] |
| 110 | + gr = self.data[[self.time, *self.keys, last]].groupby([self.time, *self.keys]).count() |
| 111 | + gr = gr[gr[last] > 1] |
| 112 | + assert gr.shape[0] == 0, f"There are duplicated rows:\n{gr}" |
| 113 | + if self.recent: |
| 114 | + gr = self.data[[*self.keys, self.time]].groupby(self.keys, as_index=False).max() |
| 115 | + filtered = pandas.merge(self.data, gr, on=[self.time, *self.keys]) |
| 116 | + assert filtered.shape[0] <= self.data.shape[0], ( |
| 117 | + f"Keeping the latest row brings more row {filtered.shape} " |
| 118 | + f"(initial is {self.data.shape})." |
| 119 | + ) |
| 120 | + self.data = filtered |
| 121 | + else: |
| 122 | + gr = self.data[[*self.keys, self.time]].groupby(self.keys).count() |
| 123 | + gr = gr[gr[self.time] > 1] |
| 124 | + assert ( |
| 125 | + gr.shape[0] == 0 |
| 126 | + ), f"recent should be true to keep the most recent row:\n{gr}" |
| 127 | + |
| 128 | + @classmethod |
| 129 | + def _filter_column(cls, filters, columns, can_be_empty=False): |
| 130 | + set_cols = set() |
| 131 | + for f in filters: |
| 132 | + reg = re.compile(f) |
| 133 | + cols = [c for c in columns if reg.search(c)] |
| 134 | + set_cols |= set(cols) |
| 135 | + assert ( |
| 136 | + can_be_empty or set_cols |
| 137 | + ), f"Filters {filters} returns an empty set from {columns}" |
| 138 | + return sorted(set_cols) |
| 139 | + |
| 140 | + def _initialize_columns(self): |
| 141 | + self.keys = self._filter_column(self._keys, self.data.columns) |
| 142 | + self.values = self._filter_column(self._values, self.data.columns) |
| 143 | + self.ignored = self._filter_column(self._ignored, self.data.columns, True) |
| 144 | + assert ( |
| 145 | + self._time in self.data.columns |
| 146 | + ), f"Column {self._time} not found in {self.data.columns}" |
| 147 | + ignored_keys = set(self.ignored) & set(self.keys) |
| 148 | + ignored_values = set(self.ignored) & set(self.values) |
| 149 | + self.keys = [c for c in self.keys if c not in ignored_keys] |
| 150 | + self.values = [c for c in self.values if c not in ignored_values] |
| 151 | + self.ignored_keys = sorted(ignored_keys) |
| 152 | + self.ignored_values = sorted(ignored_values) |
| 153 | + self.time = self._time |
| 154 | + |
| 155 | + def __str__(self) -> str: |
| 156 | + "usual" |
| 157 | + return str(self.data) if hasattr(self, "data") else str(self._data) |
| 158 | + |
| 159 | + def view( |
| 160 | + self, |
| 161 | + key_index: Sequence[str], |
| 162 | + values: Sequence[str], |
| 163 | + ignore_unique: bool = True, |
| 164 | + order: Optional[Sequence[str]] = None, |
| 165 | + ) -> pandas.DataFrame: |
| 166 | + """ |
| 167 | + Returns a dataframe, a pivot view. |
| 168 | + `key_index` determines the index, the other key columns determines |
| 169 | + the columns. If `ignore_unique` is True, every columns with a unique value |
| 170 | + is removed. |
| 171 | +
|
| 172 | + :param key_index: keys to put in the row index |
| 173 | + :param values: values to show |
| 174 | + :param ignore_unique: ignore keys with a unique value |
| 175 | + :param order: to reorder key in columns index |
| 176 | + :return: dataframe |
| 177 | + """ |
| 178 | + key_index = self._filter_column(key_index, self.keys) |
| 179 | + values = self._filter_column(values, self.values) |
| 180 | + assert set(key_index) <= set( |
| 181 | + self.keys |
| 182 | + ), f"Non existing columns in key_index {set(key_index) - set(self.keys)}" |
| 183 | + assert set(values) <= set( |
| 184 | + self.values |
| 185 | + ), f"Non existing columns in values {set(values) - set(self.values)}" |
| 186 | + key_columns = {c for c in self.keys if c not in key_index} |
| 187 | + if ignore_unique: |
| 188 | + key_index = [k for k in key_index if len(self.values_for_key[k]) > 1] |
| 189 | + key_columns = [k for k in key_columns if len(self.values_for_key[k]) > 1] |
| 190 | + if order: |
| 191 | + assert set(order) <= set(key_columns), ( |
| 192 | + f"Non existing columns from order in key_columns " |
| 193 | + f"{set(order) - set(key_columns)}" |
| 194 | + ) |
| 195 | + key_columns = [*order, *[c for c in key_columns if c not in order]] |
| 196 | + return self.data.pivot(index=key_index[::-1], columns=key_columns, values=values) |
0 commit comments