-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathdata.py
More file actions
2259 lines (2005 loc) · 86.2 KB
/
data.py
File metadata and controls
2259 lines (2005 loc) · 86.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
################################################################################
# Imports of standard libraries
################################################################################
from __future__ import annotations
import fnmatch
import io
import itertools
import logging
import math
import os
import re
import sys
import typing
from dataclasses import dataclass
import numpy as np
import pandas as pd
import torch
try:
import matplotlib
from matplotlib import pyplot as plt
except ImportError:
matplotlib = None # type:ignore[assignment]
plt = None # type:ignore[assignment]
from qfeval_functions import functions
from . import plot
from . import util
logger = logging.getLogger(__name__)
def _define_binary_operator(
f: typing.Callable[..., typing.Any],
) -> typing.Callable[[Data, typing.Any], Data]:
def op(self: Data, other: typing.Any) -> Data:
def g(x: typing.Any, y: typing.Any) -> torch.Tensor:
return typing.cast(torch.Tensor, f(x, y))
return self.apply(g, other)
return op
def _define_unary_operator(
f: typing.Callable[..., typing.Any],
) -> typing.Callable[[Data], Data]:
def op(self: Data) -> Data:
def g(x: typing.Any) -> torch.Tensor:
return typing.cast(torch.Tensor, f(x))
return self.apply(g)
return op
Axis = typing.Union[
int,
typing.Literal["timestamp"],
typing.Literal["symbol"],
typing.Literal["column"],
]
# TODO(imos): Re-enable @typechecked once some workaround is found.
# @typechecked
class Data(object):
"""Manages numerical tensors, each of which is indexed by a timestamp
(np.datetime64) and a symbol (str). Tensors should represent numerical
information like OHLC (open, high, low, close). It can slice tensors in a
low computational cost via the [] operator (i.e., Tensors can also be
accessible without transfering from the main memory). Tensors are managed
as a map from a string name (column name) to a multi-dimensional tensor.
The tensor should have two or more dimensions (i.e., a feature would have
other dimensions in addition to a timestamp and a symbol). The tensor's
leading two dimensions corresponds to timestamps and symbols repspectively.
"""
############################################################################
# Initialization and builder methods
############################################################################
def __init__(self, data: Data):
"""Initializes a new Data object. Use from_tensors when creating a new
Data object from tensors."""
super().__init__()
assert isinstance(data, Data)
self.__timestamps: np.ndarray = data.__timestamps
self.__symbols: np.ndarray = data.__symbols
self.__tensors: typing.Dict[str, torch.Tensor] = data.__tensors
@classmethod
def from_preset(
cls,
name: str = "pfn-topix500",
dtype: typing.Any = None,
device: typing.Any = None,
paths: typing.List[str] = [],
) -> Data:
# TODO(masanori): check compatibility with actual data path
paths += sys.path
for path in paths:
f = os.path.join(path, "data", f"{name}.csv")
if os.path.exists(f):
return cls.from_csv(f, dtype=dtype, device=device)
f = os.path.join(path, "data", f"{name}.csv.xz")
if os.path.exists(f):
return cls.from_csv(f, dtype=dtype, device=device)
raise FileNotFoundError(f"No such preset: {name}")
@classmethod
def from_csv(
cls,
input: typing.Union[str, io.IOBase],
dtype: typing.Any = None,
device: typing.Any = None,
) -> Data:
logger.debug(f"Reading CSV data from: {input}")
return cls.from_dataframe(
pd.read_csv(input)
.set_index(["timestamp", "symbol"])
.astype(torch.zeros((), dtype=dtype).numpy().dtype)
.reset_index(),
dtype=dtype,
device=device,
)
@classmethod
def from_dataframe(
cls,
df: pd.DataFrame,
dtype: typing.Any = None,
device: typing.Any = None,
) -> Data:
"""Builds and returns Data based on the given DataFrame object. The
DataFrame object must have timestamp and symbol columns.
"""
# 1. Preprocess the given data frame. This removes duplicates if any
# because df.pivot fails when it has duplicates.
orig_shape = df.shape
df = df.drop_duplicates(("timestamp", "symbol"), keep="last")
if df.shape != orig_shape:
logger.warning(
"Removed duplicates in the given DataFrame: "
f"{orig_shape} => {df.shape}"
)
# 2. Build NumPy arrays, each of which represents exactly one column of
# the source DataFrame.
arrays: typing.Dict[
str,
typing.Union[
np.ndarray, typing.Dict[typing.Tuple[int, ...], np.ndarray]
],
] = {}
for column in df.columns:
if column in ("timestamp", "symbol"):
continue
m = re.match(r"^([^\[\]]*)(?:\[(.*)\])?$", column)
if m is None:
raise ValueError(f"Invalid column: {column}")
name, index = typing.cast(
typing.Tuple[str, typing.Optional[str]], m.groups()
)
table = df.pivot(index="timestamp", columns="symbol", values=column)
value = table.values
if index is None:
arrays[name] = value
else:
arrays.setdefault(name, {})
arrays[name][tuple(map(int, index.split(",")))] = value
# 3. Unite multi-dimensional columns.
for k, v in arrays.items():
if not isinstance(v, dict):
continue
items = list(v.items())
shape = list(items[0][0])
for item in items:
for dim, size in enumerate(item[0]):
shape[dim] = max(shape[dim], size + 1)
value = np.full(
items[0][1].shape + tuple(shape),
math.nan,
dtype=items[0][1].dtype,
)
for item in items:
value[(slice(None), slice(None)) + item[0]] = item[1]
arrays[k] = value
# 4. Build Data.
torch_device = util.torch_device(device)
tensors = {
k: torch.tensor(v, dtype=dtype, device=torch_device)
for k, v in arrays.items()
}
timestamps: np.ndarray = np.array(table.index, dtype=np.datetime64)
symbols: np.ndarray = np.array(table.columns, dtype=np.str_)
return cls.from_tensors(tensors, timestamps, symbols)
@classmethod
def from_tensors(
cls,
tensors: typing.Dict[str, torch.Tensor],
timestamps: np.ndarray,
symbols: np.ndarray,
) -> Data:
"""Returns Data with the given tensors. This is the most primitive
builder of Data class, and the other builders should not access internal
properties without calling Data.from_tensors.
"""
assert len(timestamps.shape) == 1
assert len(symbols.shape) == 1 # type:ignore[unreachable]
assert isinstance(tensors, dict)
result: Data = object.__new__(cls)
timestamps_index = (
slice(None)
if np.array_equal(timestamps, np.sort(timestamps))
else np.argsort(timestamps)
)
timestamps = timestamps[timestamps_index] # type: ignore
result.__timestamps = timestamps.astype("datetime64", copy=False)
symbols_index = (
slice(None)
if np.array_equal(symbols, np.sort(symbols))
else np.argsort(symbols)
)
symbols = symbols[symbols_index] # type: ignore
result.__symbols = symbols.astype(np.str_, copy=False)
device: typing.Optional[torch.device] = None
for k, v in tensors.items():
assert isinstance(k, str)
assert isinstance(v, torch.Tensor)
assert len(v.shape) >= 2
assert v.shape[0] == timestamps.shape[0]
assert v.shape[1] == symbols.shape[0]
if device is None:
device = v.device
else:
assert v.device == device
tensors[k] = tensors[k][timestamps_index, :][:, symbols_index] # type: ignore # TODO(masanori): fix type error
result.__tensors = tensors
return result
############################################################################
# Python special methods
############################################################################
def __repr__(self) -> str:
"""Returns the Data's summary."""
return (
repr(self.to_dataframe())
+ f"\n\n[{self.__timestamps.shape[0]} timestamps "
+ f"x {self.__symbols.shape[0]} symbols]"
)
def __getitem__(self, key: typing.Any) -> Data:
"""Returns the Data's slice specified by the given index(es)."""
# Get items by a boolean mask.
if isinstance(key, Data):
assert key.dtype == torch.bool
assert self.dtype.is_floating_point
mask = key.like(self).raw_tensor
return self.from_tensors(
{
k: torch.where(
mask,
v,
torch.as_tensor(
math.nan, dtype=v.dtype, device=v.device
),
)
for k, v in self.__tensors.items()
},
self.__timestamps,
self.__symbols,
)
if isinstance(key, tuple):
assert len(key) == 2
timestamp_index, symbol_index = key
else:
timestamp_index, symbol_index = key, slice(None)
if timestamp_index is None or symbol_index is None:
raise KeyError("Data cannot be indexed by None")
timestamp_index, collapse_timestamp = _to_index(
timestamp_index, self.timestamp_index
)
symbol_index, collapse_symbol = _to_index(
symbol_index, self.symbol_index
)
result = self.from_tensors(
{
k: v[timestamp_index][:, symbol_index]
for k, v in self.__tensors.items()
},
self.__timestamps[timestamp_index],
self.__symbols[symbol_index],
)
if collapse_timestamp:
result = result.sum(axis="timestamp")
if collapse_symbol:
result = result.sum(axis="symbol")
return result
def __getattr__(self, key: str) -> Data:
try:
return self.get(key)
except KeyError:
raise AttributeError(key)
def __float__(self) -> float:
return float(self.tensor)
def __getstate__(self) -> typing.Dict[str, typing.Any]:
# We explicitly define __getstate__ and __setstate__ to avoid a maximum
# recursion error caused by self.__getattr__ and self.get.
# If we do not define __getstate__ and __setstate__, self.__getattr__
# is called when unpickling a Data object because its attributes such
# as self.__tensors are not set. This causes a maximum recursion error
# because self.__getattr__ calls self.get, which tries to access
# attributes, which again calls self.__getattr__.
return {
"timestamps": self.__timestamps,
"symbols": self.__symbols,
"tensors": self.__tensors,
}
def __setstate__(self, state: typing.Dict[str, typing.Any]) -> None:
self.__timestamps = state["timestamps"]
self.__symbols = state["symbols"]
self.__tensors = state["tensors"]
############################################################################
# Python operators
############################################################################
eq = _define_binary_operator(lambda x, y: x == y)
ne = _define_binary_operator(lambda x, y: x != y)
__add__ = _define_binary_operator(lambda x, y: x + y)
__radd__ = _define_binary_operator(lambda x, y: y + x)
__sub__ = _define_binary_operator(lambda x, y: x - y)
__rsub__ = _define_binary_operator(lambda x, y: y - x)
__mul__ = _define_binary_operator(lambda x, y: x * y)
__rmul__ = _define_binary_operator(lambda x, y: y * x)
__matmul__ = _define_binary_operator(lambda x, y: x @ y)
__rmatmul__ = _define_binary_operator(lambda x, y: y @ x)
__truediv__ = _define_binary_operator(lambda x, y: x / y)
__rtruediv__ = _define_binary_operator(lambda x, y: y / x)
__floordiv__ = _define_binary_operator(lambda x, y: x // y)
__rfloordiv__ = _define_binary_operator(lambda x, y: y // x)
__mod__ = _define_binary_operator(lambda x, y: x % y)
__rmod__ = _define_binary_operator(lambda x, y: y % x)
__pow__ = _define_binary_operator(lambda x, y: x**y)
__rpow__ = _define_binary_operator(lambda x, y: y**x)
__lshift__ = _define_binary_operator(lambda x, y: x << y)
__rlshift__ = _define_binary_operator(lambda x, y: y << x)
__rshift__ = _define_binary_operator(lambda x, y: x >> y)
__rrshift__ = _define_binary_operator(lambda x, y: y >> x)
__eq__ = eq # type: ignore
__ne__ = ne # type: ignore
__gt__ = _define_binary_operator(lambda x, y: x > y)
__lt__ = _define_binary_operator(lambda x, y: x < y)
__ge__ = _define_binary_operator(lambda x, y: x >= y)
__le__ = _define_binary_operator(lambda x, y: x <= y)
__and__ = _define_binary_operator(lambda x, y: x & y)
__or__ = _define_binary_operator(lambda x, y: x | y)
__xor__ = _define_binary_operator(lambda x, y: x ^ y)
__invert__ = _define_unary_operator(lambda x: ~x)
__neg__ = _define_unary_operator(lambda x: -x)
__pos__ = _define_unary_operator(lambda x: +x)
__abs__ = _define_unary_operator(abs)
############################################################################
# Properties
############################################################################
@property
def raw_tensors(self) -> typing.Dict[str, torch.Tensor]:
return self.__tensors
@property
def raw_tensor(self) -> torch.Tensor:
if len(self.raw_tensors) != 1:
raise RuntimeError(
"Data.raw_tensor can be used only when Data has exactly one "
+ f"tensor, but it has {len(self.raw_tensors)} tensors."
)
return next(iter(self.raw_tensors.values()))
@property
def tensors(self) -> typing.Dict[str, torch.Tensor]:
return {
k: v[self.__index_slices()] for k, v in self.raw_tensors.items()
}
@property
def tensor(self) -> torch.Tensor:
return self.raw_tensor[self.__index_slices()]
@property
def arrays(self) -> typing.Dict[str, np.ndarray]:
return {k: v.detach().cpu().numpy() for k, v in self.tensors.items()}
@property
def array(self) -> np.ndarray:
return typing.cast(np.ndarray, self.tensor.detach().cpu().numpy())
@property
def timestamps(self) -> np.ndarray:
if self.has_timestamps():
return self.__timestamps
raise ValueError("Data does not have valid timestamps")
@timestamps.setter
def timestamps(self, timestamps: np.ndarray) -> None:
if self.__timestamps.shape != timestamps.shape:
raise ValueError(
f"Inconsistent shape: expected={self.__timestamps.shape}, "
+ f"actual={self.timestamps.shape}"
)
assert np.array_equal(timestamps, np.sort(timestamps))
self.__timestamps = timestamps.astype("datetime64", copy=False)
@property
def symbols(self) -> np.ndarray:
if self.has_symbols():
return self.__symbols
raise ValueError("Data does not have valid symbols")
@symbols.setter
def symbols(self, symbols: np.ndarray) -> None:
if self.__symbols.shape != symbols.shape:
raise ValueError(
f"Inconsistent shape: expected={self.__symbols.shape}, "
+ f"actual={self.symbols.shape}"
)
assert np.array_equal(symbols, np.sort(symbols)), symbols
self.__symbols = symbols.astype(np.str_, copy=False)
@property
def columns(self) -> typing.List[str]:
return list(self.__tensors.keys())
@property
def shape(self) -> typing.Tuple[int, int]:
r"""Returns a 2-dimensional tuple representing the shape:
(timestamp, symbol). Aggregated dimensions will be 1.
"""
return self.size()
@property
def device(self) -> torch.device:
for t in self.__tensors.values():
return t.device
raise RuntimeError("No tensors are stored")
@property
def dtype(self) -> torch.dtype:
for t in self.__tensors.values():
return t.dtype
raise RuntimeError("No tensors are stored")
############################################################################
# Public methods
############################################################################
@typing.overload
def get(self, *columns: str) -> Data:
pass
@typing.overload
def get(self, arg: typing.Iterable[str]) -> Data:
pass
@typing.overload
def get(self, arg: typing.Callable[[str], bool]) -> Data:
pass
@typing.overload
def get(self, *, pattern: typing.Optional[str] = None) -> Data:
pass
def get(
self,
arg: typing.Any = None,
*args: typing.Any,
pattern: typing.Optional[str] = None,
) -> Data:
"""Returns a subset of columns as a new Data object."""
if arg is not None:
args = (arg,) + args
if pattern is not None:
assert len(args) == 0
# NOTE: This ensure the type of a variable to be captured for mypy.
# mypy does not deduce the narrow type of a captured variable.
pattern_str = pattern
return self.get(lambda x: fnmatch.fnmatch(x, pattern_str))
columns = []
for arg in args:
if isinstance(arg, str):
columns.append(arg)
elif callable(arg):
assert len(args) == 1
for column in self.columns:
if arg(column):
columns.append(column)
else:
assert len(args) == 1
for x in arg:
columns.append(x)
tensors = {}
for column in columns:
tensors[column] = self.__tensors[column]
return self.from_tensors(tensors, self.__timestamps, self.__symbols)
def set(self, key: str, value: typing.Union[torch.Tensor, Data]) -> None:
"""Sets a column with the given name and value. If the name already
exists, this replaces the column. Otherwise, this appends the value as
a new column."""
if isinstance(value, torch.Tensor):
tensor = value
elif isinstance(value, Data):
assert np.array_equal(self.__timestamps, value.__timestamps)
assert np.array_equal(self.__symbols, value.__symbols)
tensor = value.tensor
else:
raise TypeError(f"Unsupported type: {value.__class__.__name__}")
assert len(tensor.shape) >= 2
assert tensor.shape[:2] == self.shape
if len(self.__tensors) > 0:
assert tensor.dtype == self.dtype
assert tensor.device == self.device
self.__tensors[key] = tensor
def copy(self, deep: bool = False) -> Data:
"""Returns a copy of itself.
By default, this is not a deep copy, so it prevents from changing a set
of columns but does not prevent from updating their tensors.
Set deep=True if you need a deep copy.
Args:
deep (bool): Make a deep copy if set to True.
Returns:
Data: A copy.
"""
if deep:
return self.from_tensors(
{k: v.clone() for k, v in self.__tensors.items()},
self.__timestamps.copy(),
self.__symbols.copy(),
)
else:
return self.from_tensors(
self.__tensors.copy(), self.__timestamps, self.__symbols
)
@typing.overload
def size(self, dim: None = None) -> typing.Tuple[int, int]:
pass
@typing.overload
def size(self, dim: Axis) -> int:
pass
def size(
self, dim: typing.Optional[Axis] = None
) -> typing.Union[typing.Tuple[int, int], int]:
r"""Returns the size of the corresponding dimension if `dim` is given.
If no dimension is specified, this returns a 2-dimensional tuple
representing the shape: (timestamp, symbol).
It returns 1 for an aggregated dimension (i.e.,
`Data.sum("symbol").size("symbol")` should always return 1).
"""
if dim is None:
return self.size(0), self.size(1)
dim_int = _parse_axis(dim)
if dim_int == 0:
return self.__timestamps.size
elif dim_int == 1:
return self.__symbols.size
raise ValueError(f"Data.size got an unexpected dimension: {dim}")
@typing.overload
def to(self, dtype: torch.dtype) -> Data:
pass
@typing.overload
def to(
self, device: torch.device, dtype: typing.Optional[torch.dtype] = None
) -> Data:
pass
@typing.overload
def to(self, tensor: torch.Tensor) -> Data:
pass
@typing.overload
def to(self, data: Data) -> Data:
pass
def to(self, *args: typing.Any, **kwargs: typing.Any) -> Data:
r"""Converts dtype and/or device of the tensors."""
for arg in args:
if isinstance(arg, Data):
return self.to(next(iter(arg.raw_tensors.values())))
return self.from_tensors(
{k: v.to(*args, **kwargs) for k, v in self.raw_tensors.items()},
self.__timestamps,
self.__symbols,
)
def has_timestamps(self) -> bool:
"""Returns true iff the Data has timestamps."""
return self.__timestamps.shape != (
1,
) or not np.array_equal( # type:ignore[comparison-overlap]
self.__timestamps, self.__invalid_timestamp()[None]
)
def has_symbols(self) -> bool:
"""Returns true iff the Data has symbols."""
return self.__symbols.shape != (
1,
) or not np.array_equal( # type:ignore[comparison-overlap]
self.__symbols, self.__invalid_symbol()[None]
)
def equals(self, other: Data) -> bool:
r"""Returns true if the Data equals to the given Data exactly."""
if not np.array_equal(self.__timestamps, other.__timestamps):
return False
if not np.array_equal(self.__symbols, other.__symbols):
return False
if self.columns != other.columns:
return False
for k, v in self.__tensors.items():
if not torch.allclose(
v, other.__tensors[k], rtol=0, atol=0, equal_nan=True
):
return False
return True
def allclose(
self, other: Data, rtol: float = 1e-05, atol: float = 1e-08
) -> bool:
r"""Returns true if the Data equals to the given Data exactly."""
if not np.array_equal(self.__timestamps, other.__timestamps):
return False
if not np.array_equal(self.__symbols, other.__symbols):
return False
if self.columns != other.columns:
return False
for k, v in self.__tensors.items():
if not torch.allclose(
v, other.__tensors[k], rtol=rtol, atol=atol, equal_nan=True
):
return False
return True
def to_dataframe(self) -> pd.DataFrame:
"""Converts the Data into a DataFrame. The returned DataFrame has
secondary columns iff one or more tensors have extra dimensions (i.e.,
3 or more dimensions)."""
tensors = []
columns = []
for name, tensor in self.__tensors.items():
# NOTE: Since the 1st and 2nd dimensions could be 0, so the other
# dimensions cannot use -1 in reshape.
tensors.append(tensor.reshape(-1, int(np.prod(tensor.shape[2:]))))
for index in itertools.product(*map(range, tensor.shape[2:])):
if index == ():
columns.append(name)
else:
index_str = ",".join(map(str, index))
columns.append(f"{name}[{index_str}]")
df = pd.DataFrame(
torch.cat(tensors, dim=1).detach().cpu().numpy(),
columns=pd.Index(columns),
)
# Insert timestamp and symbol columns.
timestamps = np.broadcast_to(self.__timestamps[:, None], self.shape)
df.insert(0, "timestamp", timestamps.reshape(-1))
symbols = np.broadcast_to(self.__symbols[None], self.shape)
df.insert(1, "symbol", symbols.reshape(-1))
# Drop rows whose all values other than timestamp and symbol are NaN.
df = df.dropna(thresh=3).reset_index(drop=True)
# Drop unnecessary columns from timestamp/symbol columns.
if not self.has_timestamps():
# Drop timestamp column.
df = df.drop(columns="timestamp")
if not self.has_symbols():
# Drop symbol column.
df = df.drop(columns="symbol")
return df
def to_table(self) -> pd.DataFrame:
"""Converts the Data into a table as a DataFrame object.
This tries to convert the data into a two-dimensional DataFrame. If it
is impossible, this raises a RuntimeError. The latter dimension should
be used for columns if the data has multiple columns because they are
rarely homogeneous. If both of timestamp/symbol have exactly one
element, this uses timestamp as the former dimension because it is
often used for indexing.
"""
# If it has exactly one column, to_table should output a table indexed
# by (timestamp, symbol).
if len(self.columns) == 1:
if self.has_timestamps():
index = pd.Index(self.__timestamps, name="timestamp")
else:
index = None
if self.has_symbols():
columns = pd.Index(self.__symbols, name="symbol")
else:
# Inherit the column name if no symbols exist.
columns = self.columns
return pd.DataFrame(
util.to_numpy(next(iter(self.__tensors.values()))),
index=index,
columns=columns,
)
# If it has more than one columns, to_table should use columns for the
# latter dimension. There are many options for the former dimension.
# The possible patterns of dimensions are the followings:
# (timestamp: 1, symbol: 1) => (timestamp, column),
# (timestamp: N, symbol: 1) => (timestamp, column),
# (timestamp: 1, symbol: X) => (timestamp, column),
# (timestamp: N, symbol: X) => (timestamp, column),
# (timestamp: X, symbol: 1) => (symbol, column),
# (timestamp: 1, symbol: N) => (symbol, column),
# (timestamp: X, symbol: N) => (symbol, column),
# (timestamp: X, symbol: X) => (None, column),
# (timestamp: N, symbol: N) => Invalid,
# where
# - X means the index has no elements,
# - 1 means the index has exactly one element,
# - N means the index has multiple elements.
if len(self.__symbols) == 1 and self.has_timestamps():
index = pd.Index(self.__timestamps, name="timestamp")
elif not self.has_symbols() and not self.has_timestamps():
index = None
elif len(self.__timestamps) == 1:
index = pd.Index(self.__symbols, name="symbol")
else:
raise RuntimeError(
"Data.to_table requires 2D data, but it has "
+ f"{len(self.columns)} columns, {len(self.__symbols)} symbols, "
+ f"{len(self.__timestamps)} timestamps"
)
columns = pd.Index(self.columns, name="column")
return pd.DataFrame(
np.concatenate(
[util.to_numpy(v) for v in self.__tensors.values()], axis=1
),
index=index,
columns=columns,
)
def to_series(self) -> pd.Series:
"""Converts the Data to a Series. The Data must be a single time
series. Otherwise, this raises a ValueError.
"""
if self.__symbols.shape[0] != 1:
raise ValueError(
"Data.to_series can be applied to Data with exactly one "
+ f"symbol, but its shape is {self.__symbols.shape}."
)
df = self.to_dataframe()
# Use timestamp as an index if exists (if not aggregated).
if "timestamp" in df.columns:
df = df.set_index("timestamp")
# Use symbol as a name if exists (if not aggregated).
name = ""
if "symbol" in df.columns:
name = str(self.__symbols[0])
# Drop symbol column.
# NOTE: DataFrame.drop does not work due to MultiIndex.
df = pd.DataFrame(df.iloc[:, ~df.columns.get_loc("symbol")])
if len(df.columns) != 1:
raise ValueError(
"Data.to_series can be applied to Data with exactly single "
+ f"column, but it has {len(df.columns)} columns."
)
return pd.Series(df.iloc[:, 0], name=name)
@typing.overload
def to_csv(self, path: None = None) -> str:
pass
@typing.overload
def to_csv(self, path: str) -> None:
pass
def to_csv(self, path: typing.Optional[str] = None) -> typing.Optional[str]:
result = self.to_dataframe().to_csv(path, index=False)
return typing.cast(typing.Optional[str], result)
def to_matrix_csv(
self, path: typing.Optional[str] = None
) -> typing.Optional[str]:
result = self.to_matrix().to_csv(path)
return typing.cast(typing.Optional[str], result)
def to_matrix(self) -> pd.DataFrame:
return pd.DataFrame(
self.array, index=self.__timestamps, columns=self.__symbols
)
def timestamp_index(
self, v: typing.Any, side: str = "equal"
) -> typing.Union[None, int, np.ndarray]:
"""Converts an index representing timestamp(s) into an integer
index."""
return _to_integer_index(
self.__timestamps, lambda x: np.datetime64(x, "ns"), v, side
)
def symbol_index(
self, v: typing.Any, side: str = "equal"
) -> typing.Union[None, int, np.ndarray]:
"""Converts an index representing symbol(s) into an integer index."""
return _to_integer_index(self.__symbols, str, v, side)
def like(self, other: "Data") -> "Data":
"""Reshape itself to the other's shape. This should respect timestamps
and symbols of each argument (i.e., they can different sets of
timestamps and symbols). Values whose the combination of a timestamp
and a symbol does not exist in `other` will be discarded. On the other
hand, values whose the combiation does not exist are treated as invalid
values (defined by qfeval.core.util.invalid_values_like).
"""
timestamp_indexes, timestamp_mask = util.make_array_mapping(
self.__timestamps, other.__timestamps
)
symbol_indexes, symbol_mask = util.make_array_mapping(
self.__symbols, other.__symbols
)
tensors = {}
for k, v in self.__tensors.items():
v = v[
torch.tensor(timestamp_indexes[:, None]),
torch.tensor(symbol_indexes[None, :]),
]
mask = timestamp_mask[:, None] | symbol_mask[None, :]
mask = mask.reshape(
mask.shape + (1,) * (len(v.shape) - len(mask.shape))
)
tensors[k] = torch.where(
torch.tensor(mask, device=v.device),
util.nans(like=v),
v,
)
return self.from_tensors(tensors, other.__timestamps, other.__symbols)
def rename(
self,
columns: typing.Union[
str,
typing.Iterable[str],
typing.Dict[str, str],
],
) -> Data:
r"""Rename column names."""
# 1. Force `columns` to be a list or dictionary.
if isinstance(columns, str):
columns = [columns]
# 2. Build a mapper to map an old name to a new name.
mapper: typing.Dict[str, str] = {}
if isinstance(columns, dict):
mapper = {c: c for c in self.columns}
for src, dest in columns.items():
mapper[src] = dest
if src not in self.columns:
raise KeyError(src)
else:
columns = list(columns)
if len(columns) != len(self.columns):
raise ValueError(
"Inconsistent number of columns: "
+ f"actual={len(columns)}, expected={len(self.columns)}"
)
mapper = dict(zip(self.columns, columns))
# 3. Build a new Data.
tensors = {mapper[k]: v for k, v in self.raw_tensors.items()}
return self.from_tensors(tensors, self.__timestamps, self.__symbols)
# TODO(imos): Deprecate this.
def with_column_name(self, name: str) -> Data:
r"""Deprecated: use Data.rename instead."""
return self.rename(name)
def merge_columns(self, other: Data) -> Data:
assert np.array_equal(other.__timestamps, self.__timestamps)
assert np.array_equal(other.__symbols, self.__symbols)
return self.from_tensors(
{**self.__tensors, **other.__tensors},
self.__timestamps,
self.__symbols,
)
def merge(self, *others: Data) -> Data:
r"""Merges the Data object and the given Data object(s) and returns the
merged Data object.
The returned Data object should be the union of the Data objects. If
some Data objects have values for the same combination of a
timestamp/symbol and a column, the last non-NaN value should be
selected.
Parameters:
- *others (Data): Data objects should be merged with `self`.
"""
xs: typing.Tuple[Data, ...] = (self,) + others
# Build timestamps/symbols for the result.
timestamps = _merge_arrays(
[x.__timestamps for x in xs if x.has_timestamps()],
self.__timestamps,
)
symbols = _merge_arrays(
[x.__symbols for x in xs if x.has_symbols()], self.__symbols
)
# Build parameters for columns.
shapes = {}
# A mapping from a column name to parameters. A parameter should
# consist of the followings:
# - values (Tensor): a flattened tensor of a column in the Data to be
# merged.
# - indexes (Tensor): indexes in the flattened result tensor for the
# flattened tensor to be merged (i.e., the shape should correspond to
# the shape of `values`).
column_to_parameters: typing.Dict[
str, typing.List[typing.Tuple[torch.Tensor, torch.Tensor]]
] = {}
for x in xs:
# Calculate base indexes. `base_index[i, j]` represents `x[i, j]`
# should be merged into `base_indexes[i, j]`-th batch of `result`.
base_indexes = torch.tensor(
np.searchsorted(timestamps, x.__timestamps)[:, None],
device=self.device,
) * symbols.size + torch.tensor(
np.searchsorted(symbols, x.__symbols)[None, :],
device=self.device,
)
for column, tensor in x.__tensors.items():
if column not in shapes:
shapes[column] = (
timestamps.size,
symbols.size,
) + tensor.shape[2:]
column_to_parameters[column] = []
if tensor.shape[2:] != shapes[column][2:]:
raise ValueError(
f"Inconsistent shape in column `{column}': "
+ f"actual={tensor.shape[2:]}, "
+ f"expected={shapes[column][2:]}"
)
size = int(np.prod(shapes[column][2:]))
tensor = tensor.flatten()
# Determine which indexes of the result should be filled.