Skip to content

Commit 9d04ef0

Browse files
committed
feat(env): add wavy projection system and relevant tests
- Introduced the wavy projection system for 2D motion projected on a 3D sinusoidal surface with equations for partial derivatives and rollouts. - Added tests to validate system behavior, including various edge cases and conditions. - Integrated the system into the pipeline and created new configuration files for its simulation and testing scenarios.
1 parent 547fcda commit 9d04ef0

File tree

3 files changed

+158
-0
lines changed

3 files changed

+158
-0
lines changed

src/math_gymnasium/dynamical_systems/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
from .non_linear_system.simple_limit_cycle_system_van_der_pol import (
2121
rollout_simple_limit_cycle_partial_derivative,
2222
)
23+
from .non_linear_system.wavy_projection_system import (
24+
rollout_wavy_projection_partial_derivative,
25+
)
2326
from .linear_debug_system import rollout_linear_debug_system
2427

2528
__all__ = [
@@ -32,5 +35,6 @@
3235
"rollout_aizawa_attractor_partial_derivative",
3336
"rollout_damped_oscillator_partial_derivative",
3437
"rollout_simple_limit_cycle_partial_derivative",
38+
"rollout_wavy_projection_partial_derivative",
3539
"rollout_linear_debug_system",
3640
]
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
# coding=utf-8
2+
3+
import numpy as np
4+
5+
from tools.math_tools.ndarray_tools.custom_msg import nan_infinity_console_warning
6+
from tools.math_tools.space_conversion_tools.time_to_delta_time import (
7+
convert_state_time_to_state_delta_time,
8+
)
9+
10+
11+
def wavy_projection_partial_derivative(
12+
xyz: np.ndarray,
13+
vx: float = 1.0,
14+
vy: float = 1.0,
15+
dtype: np.dtype = np.float64,
16+
debug: bool = False,
17+
):
18+
"""
19+
Computes the partial derivatives for a wavy projection system.
20+
This is a 2D linear motion in (x, y) projected onto a 3D sinusoidal surface.
21+
22+
System equations:
23+
dx/dt = vx
24+
dy/dt = vy
25+
dz/dt = cos(x)*vx - sin(y)*vy => (Integration: z = sin(x) + cos(y) + C)
26+
27+
:param xyz: An array containing the x, y, and z coordinates.
28+
:param vx: Velocity in x direction.
29+
:param vy: Velocity in y direction.
30+
:param dtype: Data type for computations.
31+
:param debug: Warn if nan or infinity values are encountered.
32+
:return: An array containing the partial derivatives [x_dot, y_dot, z_dot].
33+
"""
34+
xyz = np.nan_to_num(xyz)
35+
x, y, z = xyz.astype(dtype)
36+
37+
x_dot = vx
38+
y_dot = vy
39+
# z is the projection on sin(x) + cos(y)
40+
# dz/dt = d(sin(x) + cos(y))/dt = cos(x)*dx/dt - sin(y)*dy/dt
41+
z_dot = np.cos(x) * vx - np.sin(y) * vy
42+
43+
xyz_dot = np.array([x_dot, y_dot, z_dot], dtype=dtype)
44+
45+
if debug and not np.all(np.isfinite(xyz_dot)):
46+
nan_infinity_console_warning("xyz_dot")
47+
48+
xyz_dot = np.nan_to_num(xyz_dot)
49+
return xyz_dot.squeeze()
50+
51+
52+
def rollout_wavy_projection_partial_derivative(
53+
time_space: np.ndarray,
54+
vx: float = 1.0,
55+
vy: float = 1.0,
56+
initiale_coordinates=(0.0, 0.0, 1.0), # z = sin(0) + cos(0) = 1.0
57+
time_space_is_delta_time: bool = False,
58+
dtype: np.dtype = np.float64,
59+
) -> np.ndarray:
60+
"""
61+
Calculates the trajectory of a wavy projection system over a given time space.
62+
63+
Assume `time_space` values are increasing if `time_space_is_delta_time=True`
64+
65+
:param time_space: Array representing time steps in wallclock time or delta time.
66+
:param vx: Velocity in x direction.
67+
:param vy: Velocity in y direction.
68+
:param initiale_coordinates: The state at timestep 0
69+
:param time_space_is_delta_time: Set to True if `time_space` is an array of delta time.
70+
:param dtype: Data type for computations.
71+
:return: Array of computed x, y, z coordinates over the given time space.
72+
"""
73+
assert isinstance(initiale_coordinates, tuple) and len(initiale_coordinates) == 3
74+
assert isinstance(time_space, np.ndarray) and time_space.ndim == 1
75+
76+
xyzs = np.empty((time_space.size, 3), dtype=dtype)
77+
xyzs_partial_derivative = np.empty_like(xyzs)
78+
79+
xyzs[0] = initiale_coordinates
80+
xyzs_partial_derivative[0] = (0.0, 0.0, 0.0)
81+
82+
if not time_space_is_delta_time:
83+
delta_time = convert_state_time_to_state_delta_time(time_space.astype(dtype))
84+
else:
85+
delta_time = time_space.copy().astype(dtype)
86+
87+
for i in np.arange(time_space.size - 1):
88+
xyzs_partial_derivative[i + 1] = wavy_projection_partial_derivative(
89+
xyzs[i], vx, vy, dtype
90+
)
91+
xyzs[i + 1] = xyzs[i] + xyzs_partial_derivative[i + 1] * delta_time[i + 1]
92+
93+
assert np.all(np.isfinite(xyzs_partial_derivative))
94+
assert np.all(np.isfinite(xyzs))
95+
return xyzs
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# coding=utf-8
2+
from typing import Tuple
3+
4+
import numpy as np
5+
import pytest
6+
7+
from math_gymnasium.dynamical_systems.non_linear_system.wavy_projection_system import (
8+
rollout_wavy_projection_partial_derivative,
9+
)
10+
11+
12+
@pytest.fixture
13+
def setup_wavy_projection_sys_spaces() -> Tuple[
14+
np.ndarray,
15+
Tuple[float, float, float],
16+
]:
17+
t_time_space = np.arange(10) * 0.1
18+
t_init_coord = (0.0, 0.0, 1.0)
19+
return t_time_space, t_init_coord
20+
21+
22+
class TestRolloutWavyProjectionPartialDerivative:
23+
def test_default(self, setup_wavy_projection_sys_spaces):
24+
(t_time_space, t_init_coord) = setup_wavy_projection_sys_spaces
25+
t_time_space_freeze = t_time_space.copy()
26+
coords = rollout_wavy_projection_partial_derivative(
27+
t_time_space, vx=1.0, vy=1.0, initiale_coordinates=t_init_coord,
28+
)
29+
assert np.array_equal(
30+
t_time_space_freeze, t_time_space
31+
), "original array was overridden"
32+
assert coords.shape == (t_time_space.shape[0], 3)
33+
34+
# Check if x and y are linear
35+
# x = 0.0, 0.1, 0.2, ...
36+
# y = 0.0, 0.1, 0.2, ...
37+
assert np.allclose(coords[:, 0], t_time_space)
38+
assert np.allclose(coords[:, 1], t_time_space)
39+
40+
# Check if z is roughly sin(x) + cos(y)
41+
# For the first few steps, z should be close to sin(x) + cos(y)
42+
# Note: integration might have some error, so we just check it's not linear
43+
z_expected = np.sin(coords[:, 0]) + np.cos(coords[:, 1])
44+
# Since it's Euler integration, it won't be exact
45+
# We just check it follows the trend
46+
assert not np.allclose(coords[:, 2], coords[0, 2] + (coords[1, 2] - coords[0, 2]) * np.arange(10))
47+
48+
def test_output_coord_using_delta_time(self, setup_wavy_projection_sys_spaces):
49+
(t_time_space, t_init_coord) = setup_wavy_projection_sys_spaces
50+
t_dt_space = np.ones_like(t_time_space) * 0.1
51+
t_dt_space[0] = 0.0
52+
53+
coord = rollout_wavy_projection_partial_derivative(
54+
t_dt_space, vx=1.0, vy=1.0, initiale_coordinates=t_init_coord, time_space_is_delta_time=True
55+
)
56+
assert coord.shape == (t_time_space.shape[0], 3)
57+
# Initial z is 1.0.
58+
# Next z = 1.0 + (cos(0)*1 - sin(0)*1)*0.1 = 1.0 + 0.1 = 1.1
59+
assert np.allclose(coord[1, 2], 1.1)

0 commit comments

Comments
 (0)