-
Notifications
You must be signed in to change notification settings - Fork 820
Expand file tree
/
Copy path_timeseries.py
More file actions
2688 lines (2360 loc) · 105 KB
/
_timeseries.py
File metadata and controls
2688 lines (2360 loc) · 105 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Timeseries datasets.
Timeseries data is special and has to be processed and passed in a special way.
This module defines TimeSeriesDataSet,
a class that is able to handle a wide variety of timeseries data problems.
"""
from collections.abc import Callable
from copy import copy as _copy, deepcopy
from functools import cached_property
import inspect
from typing import Any, Optional, TypeVar, Union
import warnings
import numpy as np
import pandas as pd
from sklearn.exceptions import NotFittedError
from sklearn.preprocessing import RobustScaler, StandardScaler
from sklearn.utils.validation import check_is_fitted
import torch
from torch.distributions import Beta
from torch.nn.utils import rnn
from torch.utils.data import DataLoader, Dataset
from torch.utils.data.sampler import Sampler, SequentialSampler
from pytorch_forecasting.data.encoders import (
EncoderNormalizer,
GroupNormalizer,
MultiNormalizer,
NaNLabelEncoder,
TorchNormalizer,
)
from pytorch_forecasting.data.samplers import TimeSynchronizedBatchSampler
from pytorch_forecasting.utils import repr_class
from pytorch_forecasting.utils._coerce import _coerce_to_dict, _coerce_to_list
from pytorch_forecasting.utils._dependencies import _check_matplotlib
def _find_end_indices(
diffs: np.ndarray, max_lengths: np.ndarray, min_length: int
) -> tuple[np.ndarray, np.ndarray]:
"""
Identify end indices in series even if some values are missing.
Parameters
----------
diffs : np.ndarray
array of differences to next time step. nans should be filled up with ones
max_lengths : np.ndarray
maximum length of sequence by position.
min_length : int
minimum length of sequence.
Returns
-------
tuple[np.ndarray, np.ndarray]
tuple of arrays where first is end indices and second is list of start
and end indices that are currently missing.
"""
missing_start_ends = []
end_indices = []
length = 1
start_idx = 0
max_idx = len(diffs) - 1
max_length = max_lengths[start_idx]
for idx, diff in enumerate(diffs):
if length >= max_length:
while length >= max_length:
if length == max_length:
end_indices.append(idx)
else:
end_indices.append(idx - 1)
length -= diffs[start_idx]
if start_idx < max_idx:
start_idx += 1
max_length = max_lengths[start_idx]
elif length >= min_length:
missing_start_ends.append([start_idx, idx])
length += diff
if len(missing_start_ends) > 0: # required for numba compliance
return np.asarray(end_indices), np.asarray(missing_start_ends)
else:
return np.asarray(end_indices), np.empty((0, 2), dtype=np.int64)
try:
import numba
_find_end_indices = numba.jit(nopython=True)(_find_end_indices)
except ImportError:
pass
def check_for_nonfinite(tensor: torch.Tensor, names: str | list[str]) -> torch.Tensor:
"""Check if tensor contains NAs or infinite values and has correct dimension.
Checks:
* whether tensor is finite, otherwise raises ValueError
* checks whether dimension of tensor is correct. If tensor is a str,
tensor.ndim has to be 1, and if tensor is a list, tensor.ndim has to be 2.
Otherwise raises AssertionError.
Parameters
----------
names : str or list of str
name(s) of column(s) to check
tensor : torch.Tensor
tensor to check
Returns
-------
torch.Tensor
returns tensor unchanged, if checks yield no issues
Raises
------
ValueError
if tensor contains NAs or infinite values
AssertionError
if tensor has incorrect dimension, see above
"""
if isinstance(names, str):
names = [names]
assert tensor.ndim == 1, names
nans = (~torch.isfinite(tensor).unsqueeze(-1)).sum(0)
else:
assert tensor.ndim == 2, names
nans = (~torch.isfinite(tensor)).sum(0)
for name, na in zip(names, nans):
if na > 0:
raise ValueError(
f"{na} ({na / tensor.size(0):.2%}) of {name} "
"values were found to be NA or infinite (even after encoding). "
"NA values are not allowed "
"`allow_missing_timesteps` refers to missing rows, not to missing "
"values. Possible strategies to "
f"fix the issue are (a) dropping the variable {name}, "
"(b) using `NaNLabelEncoder(add_nan=True)` for categorical variables, "
"(c) filling missing values and/or (d) optionally adding a variable "
"indicating filled values"
)
return tensor
NORMALIZER = TorchNormalizer | EncoderNormalizer | NaNLabelEncoder
Columns = list[str]
TargetType = list[str, str]
TargetPositive = list[str, bool]
TargetSkew = list[str, float]
DataProperties = dict[str, Columns | TargetType | TargetPositive | TargetSkew]
TimeSeriesDataType = TypeVar("TimeSeriesType", bound="TimeSeriesDataSet")
class TimeSeriesDataSet(Dataset):
"""PyTorch Dataset for fitting timeseries models.
The dataset automates common tasks such as
* scaling and encoding of variables
* normalizing the target variable
* efficiently converting timeseries in pandas dataframes to torch tensors
* holding information about static and time-varying variables known and unknown in
the future
* holding information about related categories (such as holidays)
* downsampling for data augmentation
* generating inference, validation and test datasets
The :ref:`tutorial on passing data to models <passing-data>` is helpful to
understand the output of the dataset
and how it is coupled to models.
Each sample is a subsequence of a full time series. The subsequence consists of
encoder and decoder/prediction
timepoints for a given time series. This class constructs an index which defined
which subsequences exists and
can be samples from (``index`` attribute). The samples in the index are defined
by the various parameters.
to the class (encoder and prediction lengths, minimum prediction length, randomize
length and predict keywords).
How samples are
sampled into batches for training, is determined by the DataLoader.
The class provides the
:py:meth:`~TimeSeriesDataSet.to_dataloader` method
to convert the dataset into a dataloader.
Large datasets:
Currently the class is limited to in-memory operations (that can be sped up by an
existing installation of `numba <https://pypi.org/project/numba/>`_).
If you have extremely large data,
however, you can pass prefitted encoders and and scalers to it and a subset of
sequences to the class to
construct a valid dataset (plus, likely the EncoderNormalizer should be used to
normalize targets).
when fitting a network, you would then to create a custom DataLoader that rotates
through the datasets.
There are currently no in-built methods to do this.
Parameters
----------
data : pd.DataFrame
dataframe with sequence data - each row can be identified with
``time_idx`` and the ``group_ids``
time_idx : str
integer typed column denoting the time index within ``data``.
This columns is used to determine the sequence of samples.
If there are no missing observations,
the time index should increase by ``+1`` for each subsequent sample.
The first time_idx for each series does not necessarily
have to be ``0`` but any value is allowed.
target : Union[str, list[str]]
column(s) in ``data`` denoting the forecasting target.
Can be categorical or continuous dtype.
group_ids : list[str]
list of column names identifying a time series instance within ``data``
This means that the ``group_ids``
identify a sample together with the ``time_idx``.
If you have only one timeseries, set this to the
name of column that is constant.
weight : str, optional, default=None
column name for weights. Defaults to None.
max_encoder_length : int, optional, default=30
maximum length to encode.
This is the maximum history length used by the time series dataset.
min_encoder_length : int, optional, default=max_encoder_length
minimum allowed length to encode. Defaults to max_encoder_length.
min_prediction_idx : int, optional, default = first time_idx in data
minimum ``time_idx`` from where to start predictions.
This parameter can be useful to create a validation or test set.
max_prediction_length : int, optional, default=1
maximum prediction/decoder length
(choose this not too short as it can help convergence)
min_prediction_length : int, optional, default=max_prediction_length
minimum prediction/decoder length
static_categoricals : list of str, optional, default=None
list of categorical variables that do not change over time, in ``data``,
entries can be also lists which are then encoded together
(e.g. useful for product categories)
static_reals : list of str, optional, default=None
list of continuous variables that do not change over time
time_varying_known_categoricals : list of str, optional, default=None
list of categorical variables that change over time and are known in the future,
entries can be also lists which are then encoded together
(e.g. useful for special days or promotion categories)
time_varying_known_reals : list of str, optional, default=None
list of continuous variables that change over time and are known in the future
(e.g. price of a product, but not demand of a product)
time_varying_unknown_categoricals : list of str, optional, default=None
list of categorical variables that are not known in the future
and change over time.
entries can be also lists which are then encoded together
(e.g. useful for whether categories).
Target variables should be included here, if categorical.
time_varying_unknown_reals : list of str, optional, default=None
list of continuous variables that are not known in the future
and change over time.
Target variables should be included here, if real.
variable_groups : Dict[str, list[str]], optional, default=None
dictionary mapping a name to a list of columns in the data.
The name should be present
in a categorical or real class argument, to be able to encode or scale the
columns by group.
This will effectively combine categorical variables is particularly useful
if a categorical variable can have multiple values at the same time.
An example are holidays which can be overlapping.
constant_fill_strategy : dict, optional, default=None
Keys must be str, values can be str, float, int or bool.
Dictionary of column names with constants to fill in missing values if there
are gaps in the sequence (by default forward fill strategy is used).
The values will be only used if ``allow_missing_timesteps=True``.
A common use case is to denote that demand was 0 if the sample is not in the
dataset.
allow_missing_timesteps : bool, optional, default=False
whether to allow missing timesteps that are automatically filled up.
Missing values refer to gaps in the ``time_idx``, e.g. if a specific
timeseries has only samples for 1, 2, 4, 5, the sample for 3 will be
generated on-the-fly.
Allow missing does not deal with ``NA`` values. You should fill NA values
before passing the dataframe to the TimeSeriesDataSet.
lags : dict[str, list[int]], optional, default=None
dictionary of variable names mapped to list of time steps by which the
variable should be lagged.
Lags can be useful to indicate seasonality to the models.
Useful to add if seasonalit(ies) of the data are known.,
In this case, it is recommended to add the target variables
with the corresponding lags to improve performance.
Lags must be at not larger than the shortest time series as all time series
will be cut by the largest lag value to prevent NA values.
A lagged variable has to appear in the time-varying variables.
If you only want the lagged but not the current value, lag it manually in
your input data using
``data[lagged_varname] = ``
``data.sort_values(time_idx).groupby(group_ids, observed=True).shift(lag)``.
add_relative_time_idx : bool, optional, default=False
whether to add a relative time index as feature, i.e.,
for each sampled sequence, the index will range from -encoder_length to
prediction_length.
add_target_scales : bool, optional, default=False
whether to add scales for target to static real features, i.e., add the
center and scale of the unnormalized timeseries as features.
add_encoder_length : Union[bool, str], optional, default="auto"
whether to add encoder length to list of static real variables.
Defaults to "auto", iwhich is same as
``True`` iff ``min_encoder_length != max_encoder_length``.
target_normalizer : torch transformer, str, list, tuple, optional, default="auto"
Transformer that takes group_ids, target and time_idx to normalize targets.
You can choose from
:py:class:`~pytorch_forecasting.data.encoders.TorchNormalizer`,
:py:class:`~pytorch_forecasting.data.encoders.GroupNormalizer`,
:py:class:`~pytorch_forecasting.data.encoders.NaNLabelEncoder`,
:py:class:`~pytorch_forecasting.data.encoders.EncoderNormalizer`
(on which overfitting tests will fail)
or ``None`` for using no normalizer. For multiple targets, use a
:py:class`~pytorch_forecasting.data.encoders.MultiNormalizer`.
By default an appropriate normalizer is chosen automatically.
categorical_encoders : dict[str, BaseEstimator]
dictionary of scikit learn label transformers.
If you have unobserved categories in
the future / a cold-start problem, you can use the
:py:class:`~pytorch_forecasting.data.encoders.NaNLabelEncoder` with
``add_nan=True``.
Defaults effectively to sklearn's ``LabelEncoder()``.
Prefitted encoders will not be fit again.
scalers : optional, dict with str keys and torch or sklearn scalers as values
dictionary of scikit-learn or torch scalers.
Defaults to sklearn's ``StandardScaler()``.
Other options
are :py:class:`~pytorch_forecasting.data.encoders.EncoderNormalizer`,
:py:class:`~pytorch_forecasting.data.encoders.GroupNormalizer`
or scikit-learn's ``StandardScaler()``,
``RobustScaler()`` or ``None`` for using no normalizer / normalizer
with ``center=0`` and ``scale=1``
(``method="identity"``).
Prefittet encoders will not be fit again (with the exception of the
:py:class:`~pytorch_forecasting.data.encoders.EncoderNormalizer` that is
fit on every encoder sequence).
randomize_length : optional, None, bool, or tuple of float.
None or False if not to randomize lengths.
Tuple of beta distribution concentrations from which
probabilities are sampled that are used to sample new sequence lengths
with a binomial distribution.
If True, defaults to (0.2, 0.05), i.e. ~1/4 of samples
around minimum encoder length.
Defaults to False otherwise.
predict_mode : bool
If True, the TimeSeriesDataSet will only create one sequence
per time series (i.e. only from the latest provided samples).
Effectively, this will select each time series identified by ``group_ids``
the last ``max_prediction_length`` samples of each time series as
prediction samples and everything previous up to ``max_encoder_length``
samples as encoder samples.
If False, the TimeSeriesDataSet will create subsequences by sliding a
window over the data samples.
For training use cases, it's preferable to set predict_mode=False
to get all subseries.
On the other hand, predict_mode = True is ideal for validation cases.
"""
# todo: refactor:
# - creating base class with minimal functionality
# - "outsource" transformations -> use pytorch transformations as default
# todo: integrate graphs
# - add option to pass networkx graph to the dataset -> clearly defined
# - create method to create networkx graph for hierarchies -> clearly defined
# - convert networkx graph to pytorch geometric graph
# - create sampler to sample from the graph
# - create option in `to_dataloader` method to use a graph sampler
# -> automatically changing collate function which returns graphs
# -> should incorporate entire dataset but be compatible with current approach
# - integrate hierarchical loss somehow into loss metrics
# how to get there:
# - add networkx and pytorch_geometric to requirements BUT as extras
# -> do we also need torch_sparse, etc.? -> can we avoid this? probably not
# - networkx graph: define what makes sense from user perspective
# - define conversion into pytorch geometric graph? is this a two-step process of
# - encoding networkx graph and converting it into "unfilled" pytorch geometric
# graph
# - then creating full graph in collate function on the fly?
# - or is data already stored in pytorch geometric graph, only cut through it?
# - dataformat would change? Is is all timeseries data? + mask when valid?
# - then making cuts through the graph in sampling?
# - would it be best in this case to re-think the timeseries class and design it
# as series of transformations?
# - what is the new master data? very off current state or very similar?
# - current approach is storing data in long format which is memory efficient
# and using the index object to
# make sense of it when accessing. graphs would require wide format?
# - do NOT overengineer, i.e. support only usecase of single static graph,
# but only subset might be relevant
# -> however, should think what happens if we want a dynamic graph. would this
# completely change the
# data format?
# decisions:
# - stay with long format and create graph on the fly even if hampering
# efficiency and performance
# - go with pytorch_geometric approach for future proofing
# - directly convert networkx into pytorch_geometric graph
# - sampling: support only time-synchronized.
# - sample randomly an instance from index as now.
# - then get additional samples as per graph (that has been created) and
# available data
# - then collate into graph object
def __init__(
self,
data: pd.DataFrame,
time_idx: str,
target: str | list[str],
group_ids: list[str],
weight: str | None = None,
max_encoder_length: int = 30,
min_encoder_length: int = None,
min_prediction_idx: int = None,
min_prediction_length: int = None,
max_prediction_length: int = 1,
static_categoricals: list[str] | None = None,
static_reals: list[str] | None = None,
time_varying_known_categoricals: list[str] | None = None,
time_varying_known_reals: list[str] | None = None,
time_varying_unknown_categoricals: list[str] | None = None,
time_varying_unknown_reals: list[str] | None = None,
variable_groups: dict[str, list[int]] | None = None,
constant_fill_strategy: dict[str, str | float | int | bool] | None = None,
allow_missing_timesteps: bool = False,
lags: dict[str, list[int]] | None = None,
add_relative_time_idx: bool = False,
add_target_scales: bool = False,
add_encoder_length: bool | str = "auto",
target_normalizer: NORMALIZER
| str
| list[NORMALIZER]
| tuple[NORMALIZER]
| None = "auto",
categorical_encoders: dict[str, NaNLabelEncoder] | None = None,
scalers: dict[
str, StandardScaler | RobustScaler | TorchNormalizer | EncoderNormalizer
]
| None = None,
randomize_length: None | tuple[float, float] | bool = False,
predict_mode: bool = False,
):
"""Timeseries dataset holding data for models."""
super().__init__()
# write variables to self and handle defaults
# -------------------------------------------
self.max_encoder_length = max_encoder_length
if min_encoder_length is None:
min_encoder_length = max_encoder_length
self.min_encoder_length = min_encoder_length
self.max_prediction_length = max_prediction_length
if min_prediction_length is None:
min_prediction_length = max_prediction_length
self.min_prediction_length = min_prediction_length
self.target = target
self.weight = weight
self.time_idx = time_idx
self.group_ids = _coerce_to_list(group_ids)
self.static_categoricals = static_categoricals
self._static_categoricals = _coerce_to_list(static_categoricals)
self.static_reals = static_reals
self._static_reals = _coerce_to_list(static_reals)
self.time_varying_known_categoricals = time_varying_known_categoricals
self._time_varying_known_categoricals = _coerce_to_list(
time_varying_known_categoricals
)
self.time_varying_known_reals = time_varying_known_reals
self._time_varying_known_reals = _coerce_to_list(time_varying_known_reals)
self.time_varying_unknown_categoricals = time_varying_unknown_categoricals
self._time_varying_unknown_categoricals = _coerce_to_list(
time_varying_unknown_categoricals
)
self.time_varying_unknown_reals = time_varying_unknown_reals
self._time_varying_unknown_reals = _coerce_to_list(time_varying_unknown_reals)
self.add_relative_time_idx = add_relative_time_idx
# set automatic defaults
if isinstance(randomize_length, bool):
if not randomize_length:
randomize_length = None
else:
randomize_length = (0.2, 0.05)
self.randomize_length = randomize_length
if min_prediction_idx is None:
min_prediction_idx = data[self.time_idx].min()
self.min_prediction_idx = min_prediction_idx
self.constant_fill_strategy = constant_fill_strategy
self._constant_fill_strategy = _coerce_to_dict(constant_fill_strategy)
self.predict_mode = predict_mode
self.allow_missing_timesteps = allow_missing_timesteps
self.target_normalizer = target_normalizer
self.categorical_encoders = categorical_encoders
self._categorical_encoders = _coerce_to_dict(categorical_encoders)
self.scalers = scalers
self._scalers = _coerce_to_dict(scalers)
self.add_target_scales = add_target_scales
self.variable_groups = variable_groups
self._variable_groups = _coerce_to_dict(variable_groups)
self.lags = lags
self._lags = _coerce_to_dict(lags)
# add_encoder_length
if isinstance(add_encoder_length, str):
msg = (
f"Only 'auto' allowed for add_encoder_length "
f"but found {add_encoder_length}"
)
assert add_encoder_length == "auto", msg
add_encoder_length = self.min_encoder_length != self.max_encoder_length
self.add_encoder_length = add_encoder_length
# overwrite values
self.reset_overwrite_values()
# check parameters
self._check_params()
# data preprocessing in pandas
# ----------------------------
# get metadata from data
self._data_properties = self._data_properties(data)
# target normalizer
self.target_normalizer = self._set_target_normalizer(
self._data_properties, self.target_normalizer
)
# add time index relative to prediction position
if self.add_relative_time_idx:
assert (
"relative_time_idx" not in data.columns
), "relative_time_idx is a protected column and must not be present in data"
if (
"relative_time_idx" not in self._time_varying_known_reals
and "relative_time_idx" not in self.reals
):
self._time_varying_known_reals.append("relative_time_idx")
# add decoder length to static real variables
if self.add_encoder_length:
assert (
"encoder_length" not in data.columns
), "encoder_length is a protected column and must not be present in data"
if (
"encoder_length" not in self._time_varying_known_reals
and "encoder_length" not in self.reals
):
self._static_reals.append("encoder_length")
# add columns for additional features
if self.add_relative_time_idx or self.add_encoder_length:
data = data.copy() # only copies indices (underlying data is NOT copied)
if self.add_relative_time_idx:
data.loc[:, "relative_time_idx"] = (
0.0 # dummy - real value will be set dynamically in __getitem__()
)
if self.add_encoder_length:
data.loc[:, "encoder_length"] = (
0 # dummy - real value will be set dynamically in __getitem__()
)
# validate
self._validate_data(data)
# add lags
if len(self._lags) > 0:
self._set_lagged_variables()
# filter data
if min_prediction_idx is not None:
# filtering for min_prediction_idx will be done on subsequence level,
# ensuring that minimal decoder index is always >= min_prediction_idx
data = data[
lambda x: x[self.time_idx]
>= self.min_prediction_idx - self.max_encoder_length - self.max_lag
]
data = data.sort_values(self.group_ids + [self.time_idx])
# preprocess data
data = self._preprocess_data(data)
msg = "Target normalizer is separate and not in scalers."
for target in self.target_names:
assert target not in self._scalers, msg
# index for getitem based resampling
# ----------------------------------
# NOTE: this should be refactored and probably in a DataLoader
# create index
self.index = self._construct_index(data, predict_mode=self.predict_mode)
# data conversion to torch tensors
# --------------------------------
# convert to torch tensor for high performance data loading later
self.data = self._data_to_tensors(data)
# check that all tensors are finite
self._check_tensors(self.data)
def _check_params(self):
"""Check parameters of self against assumptions."""
assert isinstance(
self.max_encoder_length, int
), "max encoder length must be integer"
assert (
self.min_encoder_length <= self.max_encoder_length
), "max encoder length has to be larger equals min encoder length"
assert isinstance(
self.min_encoder_length, int
), "min encoder length must be integer"
assert isinstance(
self.max_prediction_length, int
), "max prediction length must be integer"
assert (
self.min_prediction_length <= self.max_prediction_length
), "max prediction length has to be larger equals min prediction length"
assert (
self.min_prediction_length > 0
), "min prediction length must be larger than 0"
assert isinstance(
self.min_prediction_length, int
), "min prediction length must be integer"
msg = (
f"add_encoder_length should be boolean or 'auto' "
f"but found {self.add_encoder_length}"
)
assert isinstance(self.add_encoder_length, bool), msg
for target in self.target_names:
assert (
target not in self._time_varying_known_reals
), f"target {target} should be an unknown continuous variable in the future"
assert self.min_lag > 0, "lags should be positive"
def _data_properties(self, data: pd.DataFrame) -> DataProperties:
"""Returns a dict with properties of the data used later.
Parameters
----------
data : pd.DataFrame
Returns
-------
dict
dictionary with properties of the data.
The following fields are returned:
* columns : list[str]
list of column names in the data
* target_type : dict[str, str]
type of target variable, categorial or real.
Keys are target variable names in self.target_names.
Value is either "categorical" or "real".
* target_positive : dict[str, bool]
whether target variable is positive.
Keys are target variable names in self.target_names that are real.
Value is True if all values of the target variable are positive.
Computed and returned only if target_normalizer is "auto".
* target_skew : dict[str, float]
skew of target variable.
Keys are target variable names in self.target_names that are
real and positive. Value is the skew of the target variable.
Computed and returned only if target_normalizer is "auto".
"""
target_norm = self.target_normalizer
details_required = isinstance(target_norm, str) and target_norm == "auto"
props = {"target_type": {}, "target_skew": {}, "target_positive": {}}
props["columns"] = data.columns.tolist()
for target in self.target_names:
if data[target].dtype.kind != "f": # category
props["target_type"][target] = "categorical"
else:
props["target_type"][target] = "real"
if details_required:
props["target_positive"][target] = (data[target] > 0).all()
if props["target_positive"][target]:
props["target_skew"][target] = data[target].skew()
return props
def _set_lagged_variables(self) -> None:
"""Add lagged variables to lists of variables.
* generates lagged variable names and adds them to the appropriate lists
of time-varying variables, typed by known/unknown and categorical/real
* checks that all lagged variables passed by user adhere to the
naming convention of lags
"""
var_name_dict = {
("real", "known"): "_time_varying_known_reals",
("real", "unknown"): "_time_varying_unknown_reals",
("cat", "known"): "_time_varying_known_categoricals",
("cat", "unknown"): "_time_varying_unknown_categoricals",
}
def _attr(realcat, known):
return getattr(self, var_name_dict[(realcat, known)])
def _append_if_new(lst, x):
if x not in lst:
lst.append(x)
# check that all names passed in self._lags appear as variables
all_time_varying_var_names = [x for kw in var_name_dict for x in _attr(*kw)]
for name in self._lags:
if name not in all_time_varying_var_names:
raise KeyError(
f"lagged variable {name} is not a known "
"nor unknown time-varying variable"
)
# add lagged variables to type indicators
for name in self._lags:
lagged_names = self._get_lagged_names(name)
# add lags
for realcat, known in var_name_dict:
var_names = _attr(realcat, known)
if name in var_names:
for lagged_name, lag in lagged_names.items():
# if lag is longer than horizon, lagged var becomes future-known
if known == "known" or lag >= self.max_prediction_length:
_append_if_new(_attr(realcat, "known"), lagged_name)
else:
_append_if_new(_attr(realcat, "unknown"), lagged_name)
@property
def dropout_categoricals(self) -> list[str]:
"""
list of categorical variables that are unknown when making a
forecast without observed history
"""
return [
name
for name, encoder in self._categorical_encoders.items()
if encoder.add_nan
]
def _get_lagged_names(self, name: str) -> dict[str, int]:
"""
Generate names for lagged variables
Parameters
----------
name : str
name of variable to lag
Returns
-------
dict[str, int]
dictionary mapping new variable names to lags
"""
return {f"{name}_lagged_by_{lag}": lag for lag in self._lags.get(name, [])}
@cached_property
def lagged_variables(self) -> dict[str, str]:
"""Lagged variables.
Parameters
----------
dict[str, str]
dictionary of variable names corresponding to lagged variables,
mapped to variable that is lagged
"""
vars = {}
for name in self._lags:
vars.update(dict.fromkeys(self._get_lagged_names(name), name))
return vars
@cached_property
def lagged_targets(self) -> dict[str, str]:
"""Subset of lagged_variables to variables that are lagged targets.
Parameters
----------
dict[str, str]
dictionary of variable names corresponding to lagged variables,
mapped to variable that is lagged
"""
vars = {}
for name in self._lags:
vars.update(
{
lag_name: name
for lag_name in self._get_lagged_names(name)
if name in self.target_names
}
)
return vars
@cached_property
def min_lag(self) -> int:
"""
Minimum number of time steps variables are lagged.
Returns
-------
int: minimum lag
"""
if len(self._lags) == 0:
return 1e9
else:
return min([min(lag) for lag in self._lags.values()])
@cached_property
def max_lag(self) -> int:
"""
Maximum number of time steps variables are lagged.
Returns
-------
int: maximum lag
"""
if len(self._lags) == 0:
return 0
else:
return max([max(lag) for lag in self._lags.values()])
def _set_target_normalizer(
self,
data_properties: DataProperties,
target_normalizer: NORMALIZER | str | list | tuple,
) -> TorchNormalizer:
"""Determine target normalizer.
Determines normalizers for variables based on self.target_normalizer setting.
Coerces normalizers to torch normalizer, and deals with the "auto" setting.
In the auto case, the normalizer for a variable x is determined as follows:
* if x is categorical, a NaNLabelEncoder is used
* if x is real and max_encoder_length > 20 and min_encoder_length > 1,
an EncoderNormalizer is used, otherwise a GroupNormalizer is used.
The transformation used in it is determined as follows:
* if x is real and positive, a log transformation is used if the skew of x is
larger than 2.5, otherwise a ReLU transformation is used
* if x is real and not positive, no transformation is used
The "auto" case uses metadata from the data passed in ``data_properties``,
otherwise the ``data_properties`` are not used.
Parameters
----------
data_properties : dict
Dictionary of data properties as returned by self._data_properties(data)
target_normalizer : Union[NORMALIZER, str, list, tuple, None]
Normalizer for target variable. If "auto", the normalizer is determined
as above.
Returns
-------
TorchNormalizer
Normalizer for target variable, determined as above.
"""
if isinstance(target_normalizer, str) and target_normalizer == "auto":
target_normalizer = self._get_auto_normalizer(data_properties)
elif isinstance(target_normalizer, tuple | list):
target_normalizer = MultiNormalizer(self.target_normalizer)
elif target_normalizer is None:
target_normalizer = TorchNormalizer(method="identity")
# validation
assert (
not isinstance(target_normalizer, EncoderNormalizer)
or self.min_encoder_length >= target_normalizer.min_length
), "EncoderNormalizer is only allowed if min_encoder_length > 1"
assert isinstance(target_normalizer, TorchNormalizer | NaNLabelEncoder), (
f"target_normalizer has to be either None or of "
f"class TorchNormalizer but found {target_normalizer}"
)
assert not self.multi_target or isinstance(
target_normalizer, MultiNormalizer
), (
"multiple targets / list of targets requires MultiNormalizer as "
f"target_normalizer but found {target_normalizer}"
)
return target_normalizer
def _get_auto_normalizer(self, data_properties: DataProperties) -> TorchNormalizer:
"""Get normalizer for auto setting, using data_properties.
See docstring of _set_target_normalizer for details.
Parameters
----------
data_properties : dict
Dictionary of data properties as returned by self._data_properties(data)
Returns
-------
TorchNormalizer
Normalizer for target variable
"""
normalizers = []
for target in self.target_names:
if data_properties["target_type"][target] == "categorical":
normalizers.append(NaNLabelEncoder())
if self.add_target_scales:
warnings.warn(
"Target scales will be only added for continuous targets",
UserWarning,
)
else: # real
if data_properties["target_positive"][target]:
if data_properties["target_skew"][target] > 2.5:
transformer = "log"
else:
transformer = "relu"
else:
transformer = None
if self.max_encoder_length > 20 and self.min_encoder_length > 1:
normalizers.append(EncoderNormalizer(transformation=transformer))
else:
normalizers.append(GroupNormalizer(transformation=transformer))
if self.multi_target:
target_normalizer = MultiNormalizer(normalizers)
else:
target_normalizer = normalizers[0]
return target_normalizer
@cached_property
def _group_ids_mapping(self) -> dict[str, str]:
"""
Mapping of group id names to group ids used to identify series in dataset -
group ids can also be used for target normalizer.
The former can change from training to validation and test dataset
while the later must not.
"""
return {name: f"__group_id__{name}" for name in self.group_ids}
@cached_property
def _group_ids(self) -> list[str]:
"""
Group ids used to identify series in dataset.
See :py:meth:`~TimeSeriesDataSet._group_ids_mapping` for details.
"""
return list(self._group_ids_mapping.values())
def _validate_data(self, data: pd.DataFrame) -> None:
"""Validate assumptions on data.."""