Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 43 additions & 25 deletions python/rateslib/solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,11 @@ def _grad_s_vT_final_iteration_dual(self, algorithm: str | None = None) -> NDArr

def _grad_s_vT_final_iteration_analytical(self) -> NDArray[Nf64]:
"""Uses a pseudoinverse algorithm on floats"""
grad_s_vT: NDArray[Nf64] = np.linalg.pinv(self.J) # type: ignore[assignment]
if self.n == 0:
# then there are no instruments: self is only a Solver container of `pre_solvers`
grad_s_vT: NDArray[Nf64] = np.array([[]], dtype=float)
else:
grad_s_vT = np.linalg.pinv(self.J) # type: ignore[assignment]
return grad_s_vT

def _grad_s_vT_fixed_point_iteration(self) -> NDArray[Nf64]:
Expand Down Expand Up @@ -335,10 +339,12 @@ def J2_pre(self) -> NDArray[Nf64]:
] = pre_slvr.J2_pre
i, j = i + pre_slvr.pre_n, j + pre_slvr.pre_m

rates = np.array([_[0].rate(**_[1]) for _ in self.instruments])
# solver is passed in order to extract curves as string
_ = np.array([gradient(r, self.pre_variables, order=2) for r in rates])
J2[:, :, -self.m :] = np.transpose(_, (1, 2, 0))
if self.m > 0:
# then self is not only a container for `pre_solvers`
rates = np.array([_[0].rate(**_[1]) for _ in self.instruments])
# solver is passed in order to extract curves as string
_ = np.array([gradient(r, self.pre_variables, order=2) for r in rates])
J2[:, :, -self.m :] = np.transpose(_, (1, 2, 0))
self._J2_pre = J2
return self._J2_pre

Expand Down Expand Up @@ -511,16 +517,21 @@ def grad_s_vT_pre(self) -> NDArray[Nf64]:
m, n = pre_solver.pre_m, pre_solver.pre_n
grad_s_vT[i : i + m, j : j + n] = pre_solver.grad_s_vT_pre

# create the right column dependencies
grad_v_r = np.array([gradient(r, pre_solver.pre_variables) for r in self.r]).T
block = np.matmul(grad_v_r, self.grad_s_vT)
block = -1 * np.matmul(pre_solver.grad_s_vT_pre, block)
grad_s_vT[i : i + m, -self.n :] = block
# create the right column dependencies, only if self contains some instruments
# and variable of its own and is not only a container of `pre_solvers`
if self.n > 0:
grad_v_r = np.array([gradient(r, pre_solver.pre_variables) for r in self.r]).T
block = np.matmul(grad_v_r, self.grad_s_vT)
block = -1 * np.matmul(pre_solver.grad_s_vT_pre, block)
grad_s_vT[i : i + m, -self.n :] = block

i, j = i + m, j + n

# create bottom right block
grad_s_vT[-self.m :, -self.n :] = self.grad_s_vT
if self.n > 0:
# create bottom right block, only if self contains some instruments
# and variables of its own and is not only a container of `pre_solvers`
grad_s_vT[-self.m :, -self.n :] = self.grad_s_vT

self._grad_s_vT_pre = grad_s_vT
return self._grad_s_vT_pre

Expand Down Expand Up @@ -1450,8 +1461,11 @@ def r_pre(self) -> NDArray[Nobject]: # type: ignore[override]
r_pre[i : i + m] = pre_solver.r_pre
i = i + m

# create bottom right block
r_pre[-self.m :] = self.r
if self.m > 0:
# create bottom right block if solver contains its own instruments and self
# is not just a container of `pre_solvers`
r_pre[-self.m :] = self.r

self._r_pre = r_pre
return self._r_pre

Expand Down Expand Up @@ -1480,21 +1494,25 @@ def error(self) -> Series[float]:
-------
Series
"""
pre_s: Series[float] | None = None
pre_s: Series[float] = Series()
for pre_solver in self.pre_solvers:
if pre_s is None:
pre_s = pre_solver.error
if not pre_s.empty:
pre_s = concat([ser for ser in [pre_solver.error, pre_s] if not ser.empty])
else:
pre_s = concat([pre_solver.error, pre_s])
pre_s = pre_solver.error

_: Series[float] = Series(
self.x.astype(float) * 100 / self.rate_scalars,
index=MultiIndex.from_tuples([(self.id, inst) for inst in self.instrument_labels]),
)
if pre_s is None:
s: Series[float] = _
if self.m > 0:
_: Series[float] = Series(
self.x.astype(float) * 100 / self.rate_scalars,
index=MultiIndex.from_tuples([(self.id, inst) for inst in self.instrument_labels]),
)
if not pre_s.empty:
s: Series[float] = concat([pre_s, _])
else:
s = _
else:
s = concat([pre_s, _])
s = pre_s

return s

@property
Expand Down
134 changes: 132 additions & 2 deletions python/tests/test_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@
import numpy as np
import pytest
from numpy.testing import assert_allclose
from pandas import DataFrame, MultiIndex
from pandas import DataFrame, MultiIndex, Series
from pandas.errors import PerformanceWarning
from pandas.testing import assert_frame_equal, assert_series_equal
from rateslib import default_context
from rateslib.curves import CompositeCurve, Curve, LineCurve, MultiCsaCurve, index_left
from rateslib.default import NoInput
from rateslib.dual import Dual, Dual2, gradient, ift_1dim, newton_1dim, newton_ndim
from rateslib.dual import Dual, Dual2, Variable, gradient, ift_1dim, newton_1dim, newton_ndim
from rateslib.fx import FXForwards, FXRates
from rateslib.fx_volatility import FXDeltaVolSmile, FXDeltaVolSurface, FXSabrSmile, FXSabrSurface
from rateslib.instruments import (
Expand Down Expand Up @@ -2839,3 +2839,133 @@ def test_curves_without_their_own_params(label):
s=[2.0],
)
assert sv.result["status"] == "SUCCESS"


class TestContainerSolver:
# these tests involve a Solver that has no instruments of its own and is just a
# wrapper of 1 or multiple `pre_solvers`

def test_combine_separate_solvers_for_delta(self):
curve = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="x")
curve2 = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="y")
solver = Solver(
curves=[curve],
instruments=[Value(dt(2000, 9, 12), curves="x", metric="o/n_rate")],
s=[2.0],
instrument_labels=["X"],
id="A1",
)
solver2 = Solver(
curves=[curve2],
instruments=[Value(dt(2000, 9, 12), curves="y", metric="o/n_rate")],
s=[3.0],
instrument_labels=["Y"],
id="A2",
)
solver3 = Solver(pre_solvers=[solver, solver2])

v = IRS(dt(2000, 9, 12), "1d", "M", curves="x")
w = IRS(dt(2000, 9, 12), "1d", "M", curves="y")
result = Portfolio([v, w]).delta(solver=solver3)

m_idx = MultiIndex.from_tuples(
[("instruments", "A1", "X"), ("instruments", "A2", "Y")],
names=["type", "solver", "label"],
)
c_idx = MultiIndex.from_tuples([("usd", "usd")], names=["local_ccy", "display_ccy"])
expected = DataFrame([0.273825, 0.271870], index=m_idx, columns=c_idx)
assert_frame_equal(result, expected)

def test_combine_separate_solvers_for_exo_delta(self):
curve = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="x")
curve2 = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="y")
solver = Solver(
curves=[curve],
instruments=[Value(dt(2000, 9, 12), curves="x", metric="o/n_rate")],
s=[2.0],
instrument_labels=["X"],
id="A1",
)
solver2 = Solver(
curves=[curve2],
instruments=[Value(dt(2000, 9, 12), curves="y", metric="o/n_rate")],
s=[3.0],
instrument_labels=["Y"],
id="A2",
)
solver3 = Solver(pre_solvers=[solver, solver2])

v = IRS(
dt(2000, 9, 12), "1d", "M", curves="x", notional=Variable(1e8, ["exo"]), fixed_rate=5
)
w = IRS(
dt(2000, 9, 12), "1d", "M", curves="y", notional=Variable(1e8, ["exo"]), fixed_rate=4
)
result = (
Portfolio([v, w]).exo_delta(solver=solver3, vars=["exo"], vars_scalar=[1e8]).to_numpy()
)
pv = Portfolio([v, w]).npv(solver=solver3)
assert abs(result[0, 0] - pv) < 1e-7

def test_combine_separate_solvers_for_gamma(self):
curve = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="x")
curve2 = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="y")
solver = Solver(
curves=[curve],
instruments=[Value(dt(2000, 9, 12), curves="x", metric="o/n_rate")],
s=[2.0],
instrument_labels=["X"],
id="A1",
)
solver2 = Solver(
curves=[curve2],
instruments=[Value(dt(2000, 9, 12), curves="y", metric="o/n_rate")],
s=[3.0],
instrument_labels=["Y"],
id="A2",
)
solver3 = Solver(pre_solvers=[solver, solver2])

v = IRS(dt(2000, 9, 12), "1d", "M", curves="x")
w = IRS(dt(2000, 9, 12), "1d", "M", curves="y")
result = Portfolio([v, w]).gamma(solver=solver3).to_numpy()

partial_result1 = v.gamma(solver=solver).to_numpy()
partial_result2 = w.gamma(solver=solver2).to_numpy()

assert np.all(
result
== np.block(
[
[partial_result1, np.zeros(shape=(1, 1))],
[np.zeros(shape=(1, 1)), partial_result2],
]
)
)

def test_combine_separate_solvers_error(self):
curve = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="x")
curve2 = Curve({dt(2000, 1, 1): 1.0, dt(2001, 1, 1): 1.0}, id="y")
solver = Solver(
curves=[curve],
instruments=[Value(dt(2000, 9, 12), curves="x", metric="o/n_rate")],
s=[2.0],
instrument_labels=["X"],
id="A1",
)
solver2 = Solver(
curves=[curve2],
instruments=[Value(dt(2000, 9, 12), curves="y", metric="o/n_rate")],
s=[3.0],
instrument_labels=["Y"],
id="A2",
)
solver3 = Solver(pre_solvers=[solver, solver2])
result = solver3.error
assert isinstance(result, Series)

def test_error_empty(self):
s1 = Solver()
s2 = Solver()
s3 = Solver(pre_solvers=[s1, s2])
assert s3.error.empty
Loading