Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 142 additions & 0 deletions qfeval_data/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1864,6 +1864,32 @@ def sum(self, axis: typing.Optional[Axis] = None) -> Data:
tensors[k] = functions.nansum(v, dim=ag.dim, keepdim=ag.keepdim)
return self.from_tensors(tensors, ag.timestamps, ag.symbols)

def min(self, axis: typing.Optional[Axis] = None) -> Data:
ag = self.__aggregate(axis, "min")
tensors: typing.Dict[str, torch.Tensor] = {}
for k, v in ag.items:
x = v
for d in ag.dim:
x = functions.nanmin(x, dim=d, keepdim=True).values
if not ag.keepdim:
for d in sorted(ag.dim, reverse=True):
x = x.squeeze(d)
tensors[k] = x
return self.from_tensors(tensors, ag.timestamps, ag.symbols)

def max(self, axis: typing.Optional[Axis] = None) -> Data:
ag = self.__aggregate(axis, "max")
tensors: typing.Dict[str, torch.Tensor] = {}
for k, v in ag.items:
x = v
for d in ag.dim:
x = functions.nanmax(x, dim=d, keepdim=True).values
if not ag.keepdim:
for d in sorted(ag.dim, reverse=True):
x = x.squeeze(d)
tensors[k] = x
return self.from_tensors(tensors, ag.timestamps, ag.symbols)

def mean(self, axis: typing.Optional[Axis] = None) -> Data:
ag = self.__aggregate(axis, "mean")
tensors = {}
Expand Down Expand Up @@ -1918,6 +1944,122 @@ def count(self, axis: typing.Optional[Axis] = None) -> Data:
)
return self.from_tensors(tensors, ag.timestamps, ag.symbols)

def first(
self, axis: typing.Optional[Axis] = "timestamp", skipna: bool = True
) -> Data:
"""Returns the first value along the given axis.

If `skipna=True`, returns the first non-NaN; otherwise, simply selects
the first element (which may be NaN).
"""
ag = self.__aggregate(axis, "first")
tensors: typing.Dict[str, torch.Tensor] = {}
for k, v in ag.items:
for d in ag.dim:
v = (functions.bfill(v, d) if skipna else v).narrow(d, 0, 1)
tensors[k] = functions.nansum(v, dim=ag.dim, keepdim=ag.keepdim)
return self.from_tensors(tensors, ag.timestamps, ag.symbols)

def last(
self, axis: typing.Optional[Axis] = "timestamp", skipna: bool = True
) -> Data:
"""Returns the last value along the given axis.

If `skipna=True`, returns the last non-NaN; otherwise, simply selects
the last element (which may be NaN).
"""
ag = self.__aggregate(axis, "last")
tensors: typing.Dict[str, torch.Tensor] = {}
for k, v in ag.items:
for d in ag.dim:
v = (functions.ffill(v, d) if skipna else v).narrow(d, -1, 1)
tensors[k] = functions.nansum(v, dim=ag.dim, keepdim=ag.keepdim)
return self.from_tensors(tensors, ag.timestamps, ag.symbols)

############################################################################
# Metrics
############################################################################

def annualized_return(self) -> Data:
"""Returns the annualized total return per series.

Formula reference: https://www.investopedia.com/terms/a/annualized-total-return.asp

Uses first/last non-NaN values along the timestamp axis and computes
(last / first) ** (1 / years) - 1, where years is the elapsed time
in years between the corresponding timestamps.
"""
if not self.has_timestamps():
raise ValueError("annualized_return requires valid timestamps")

start = self.first()
end = self.last()
ts = self.__timestamps.astype("datetime64[us]").astype("int64")
ts_year = (ts[-1] - ts[0]) / (365.2425 * 24 * 60 * 60 * 1e6)
return (end / start).apply(lambda r: torch.pow(r, 1.0 / ts_year) - 1)

ar = annualized_return

def annualized_volatility(self) -> Data:
"""Returns the annualized volatility per series.

Computes standard deviation of period returns along the timestamp axis
and scales it by sqrt(periods_per_year), where periods_per_year is the
number of periods in the dataset divided by elapsed years between the
first and last timestamps.
"""
if not self.has_timestamps():
raise ValueError("annualized_volatility requires valid timestamps")

ts = self.__timestamps.astype("datetime64[us]").astype("int64")
ts_year = (ts[-1] - ts[0]) / (365.2425 * 24 * 60 * 60 * 1e6)
scale = np.sqrt((len(self.timestamps) - 1) / ts_year)
return (
self.pct_change(skipna=True).std(axis="timestamp", ddof=0) * scale
)

avol = annualized_volatility

def annualized_sharpe_ratio(self) -> Data:
"""Returns annualized Sharpe ratio per series.

Defined as annualized_return / annualized_volatility.
"""
return self.annualized_return() / self.annualized_volatility()

asr = annualized_sharpe_ratio

def maximum_drawdown(self) -> Data:
"""Returns maximum drawdown per series.

Computes running peak (ffill + cummax) along timestamps, then the
drawdown series as current/peak - 1, and finally takes the minimum
(most negative) drawdown over time for each series.
"""
# NOTE: This applies ffill and bfill because cummax cannot skip NaNs.
filled = self.fillna(method="ffill").fillna(method="bfill")
peaks = filled.apply(lambda x: torch.cummax(x, dim=0).values)
return 1 - (filled / peaks).min(axis="timestamp")

mdd = maximum_drawdown

def metrics(self) -> Data:
"""Returns a Data object that contains common metrics per series.

The returned Data object contains the following columns:
- annualized_sharpe_ratio
- annualized_return
- annualized_volatility
- maximum_drawdown
"""
metrics = [
self.annualized_sharpe_ratio().rename("annualized_sharpe_ratio"),
self.annualized_return().rename("annualized_return"),
self.annualized_volatility().rename("annualized_volatility"),
self.maximum_drawdown().rename("maximum_drawdown"),
]
return metrics[0].merge(*metrics[1:])

############################################################################
# Private methods
############################################################################
Expand Down
1 change: 1 addition & 0 deletions tests/data/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

46 changes: 46 additions & 0 deletions tests/data/test_annualized_return.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from math import nan

import numpy as np
import torch

from qfeval_data import Data

from .util import timestamps


def test_annualized_return_basic() -> None:
ts = timestamps(3)
x = torch.tensor(
[
[100.0, 200.0],
[100.5, nan],
[100.2, 200.1],
],
dtype=torch.float32,
)
data = Data.from_tensors({"price": x}, ts, np.array(["A", "B"]))
actual = data.annualized_return()
expected = [
(100.2 / 100.0) ** (365.25 / 2.0) - 1.0,
(200.1 / 200.0) ** (365.25 / 2.0) - 1.0,
]
np.testing.assert_allclose(actual.price.array, expected, atol=1e-4)


def test_annualized_return_with_nans() -> None:
ts = timestamps(3)
x = torch.tensor(
[
[nan, 200.0],
[100.0, nan],
[100.1, 200.1],
],
dtype=torch.float32,
)
data = Data.from_tensors({"price": x}, ts, np.array(["A", "B"]))
actual = data.annualized_return()
expected = [
(100.1 / 100.0) ** (365.25 / 2.0) - 1.0,
(200.1 / 200.0) ** (365.25 / 2.0) - 1.0,
]
np.testing.assert_allclose(actual.price.array, expected, atol=1e-4)
28 changes: 28 additions & 0 deletions tests/data/test_annualized_sharpe_ratio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import numpy as np
import torch

from qfeval_data import Data

from .util import timestamps


def test_annualized_sharpe_ratio_basic() -> None:
ts = timestamps(4)
x = torch.tensor(
[
[101.0, 200.0],
[102.0, 205.0],
[100.0, 220.0],
[101.0, 210.0],
],
dtype=torch.float32,
)
data = Data.from_tensors({"price": x}, ts, np.array(["A", "B"]))

ret = data.annualized_return().price.array
vol = data.annualized_volatility().price.array
exp = ret / vol

np.testing.assert_allclose(
data.annualized_sharpe_ratio().price.array, exp, rtol=5e-4, atol=5e-5
)
61 changes: 61 additions & 0 deletions tests/data/test_annualized_volatility.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import numpy as np
import torch

from qfeval_data import Data

from .util import timestamps


def test_annualized_volatility_basic() -> None:
ts = timestamps(4)
x = torch.tensor(
[
[101.0, 200.0],
[102.0, 205.0],
[100.0, 220.0],
[101.0, 210.0],
],
dtype=torch.float64,
)
data = Data.from_tensors({"price": x}, ts, np.array(["A", "B"]))
actual = data.annualized_volatility()
expected = np.nanstd(
np.array(
[
[102.0 / 101.0 - 1.0, 205.0 / 200.0 - 1.0],
[100.0 / 102.0 - 1.0, 220.0 / 205.0 - 1.0],
[101.0 / 100.0 - 1.0, 210.0 / 220.0 - 1.0],
],
dtype=np.float64,
),
axis=0,
ddof=0,
) * np.sqrt(365.25)
np.testing.assert_allclose(actual.price.array, expected, atol=1e-5)


def test_annualized_volatility_with_nans() -> None:
ts = timestamps(4)
x = torch.tensor(
[
[float("nan"), 200.0],
[100.0, float("nan")],
[102.0, 210.0],
[101.0, 205.0],
],
dtype=torch.float64,
)
data = Data.from_tensors({"price": x}, ts, np.array(["A", "B"]))
actual = data.annualized_volatility()
expected = np.nanstd(
np.array(
[
[102.0 / 100.0 - 1.0, 210.0 / 200.0 - 1.0],
[101.0 / 102.0 - 1.0, 205.0 / 210.0 - 1.0],
],
dtype=np.float64,
),
axis=0,
ddof=0,
) * np.sqrt(365.25)
np.testing.assert_allclose(actual.price.array, expected, atol=1e-5)
Loading
Loading