Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
02a54e1
append to trajectory
Nov 25, 2025
db4cfa2
test append to trajectory
Nov 25, 2025
6f78f54
style
Nov 25, 2025
b378c6e
revert small change
Nov 25, 2025
1feff96
maintain step per system
Nov 26, 2025
eed1fc1
format
Nov 26, 2025
031fc56
integrate only for (n_steps - initial_step) steps when continuing
Nov 26, 2025
5b2469f
Merge branch 'main' into append-to-trajectory
danielzuegner Nov 26, 2025
6fd5cc9
change `load_new_trajectories` behavior
Nov 26, 2025
4e820ee
back to `step`
Nov 27, 2025
8b8c449
format
Nov 27, 2025
0a230bf
truncate trajectories
Nov 27, 2025
e138a33
fix tests
Nov 27, 2025
82059d5
fix style
Nov 27, 2025
2e4bb06
style
Nov 27, 2025
65e61d0
fix type hint
Nov 27, 2025
c1065e7
style
Nov 27, 2025
38619af
fix kT indexing in integrate, step counting in optimize
Nov 28, 2025
dde853c
style
Nov 28, 2025
6a1114c
Merge branch 'main' into append-to-trajectory
orionarcher Nov 28, 2025
6cba46f
Merge branch 'main' into append-to-trajectory
orionarcher Dec 9, 2025
787a967
rename variable
Dec 10, 2025
9c637f5
return positions last step
Dec 10, 2025
0031296
truncate to positions last step
Dec 10, 2025
1874dc5
extract methods
Dec 10, 2025
51ca30d
fix tests
Dec 10, 2025
dfd39f0
format
Dec 10, 2025
2aed1bb
Merge branch 'main' into append-to-trajectory
danielzuegner Dec 30, 2025
36d915a
prek
Dec 30, 2025
2470b46
disable auto truncating
Dec 30, 2025
28bb660
style
Dec 30, 2025
6de046a
rename
Dec 30, 2025
e801817
prek
Dec 30, 2025
318f79c
remove unused variable
Jan 7, 2026
6dbd9e3
fix trajectory reopen
Jan 7, 2026
2c10ea3
Merge branch 'main' into append-to-trajectory
orionarcher Jan 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 261 additions & 0 deletions tests/test_trajectory.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import sys
import tempfile
from collections.abc import Callable, Generator
from pathlib import Path

Expand Down Expand Up @@ -834,3 +835,263 @@ def test_write_ase_trajectory_importerror(
with pytest.raises(ImportError, match="ASE is required to convert to ASE trajectory"):
traj.write_ase_trajectory(tmp_path / "dummy.traj")
traj.close()


def test_optimize_append_to_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""Test appending to an existing trajectory when running ts.optimize."""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/optimize_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
)

# First optimization run
opt_state = ts.optimize(
system=si_double_sim_state,
model=lj_model,
max_steps=5,
optimizer=ts.Optimizer.fire,
trajectory_reporter=trajectory_reporter,
steps_between_swaps=100,
)

for traj in trajectory_reporter.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file has 5 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 6))

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
_ = ts.optimize(
system=opt_state,
model=lj_model,
max_steps=7,
optimizer=ts.Optimizer.fire,
trajectory_reporter=trajectory_reporter_2,
steps_between_swaps=100,
)
for traj in trajectory_reporter_2.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file now has 7 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 8))


def test_integrate_append_to_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""Test appending to an existing trajectory when running ts.integrate."""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/integrate_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
)

# First integration run
int_state = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

for traj in trajectory_reporter.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file has 5 frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 6))

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
# run 7 more steps of integration.
_ = ts.integrate(
system=int_state,
model=lj_model,
timestep=0.001,
temperature=300.0,
n_steps=7,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter_2,
)
for traj in trajectory_reporter_2.trajectories:
with TorchSimTrajectory(traj.filename, mode="r") as traj:
# Check that the trajectory file now has 12 (5 + 7) frames
np.testing.assert_allclose(traj.get_steps("positions"), range(1, 13))


def test_truncate_trajectory(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test trajectory.truncate_to_step().
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [f"{temp_dir}/truncate_trajectory_{idx}.h5" for idx in range(2)]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

# Manually remove last two frames from second trajectory to create unevenness
with TorchSimTrajectory(traj_files[1], mode="a") as traj:
traj.truncate_to_step(3)
# Verify that it has 3 frames now.
for array_name in traj.array_registry:
target_length = 3
target_steps = [1, 2, 3]
# Special cases: global arrays
if array_name in ["atomic_numbers", "masses"]:
target_length = 1
target_steps = [0]
if array_name == "pbc":
target_length = 3
target_steps = [0]
assert len(traj.get_array(array_name)) == target_length
np.testing.assert_allclose(traj.get_steps(array_name), target_steps)
with pytest.raises(
ValueError,
match=(
"Cannot truncate to a step greater than the last step. "
"self.last_step=3 < step=10"
),
):
traj.truncate_to_step(10)
with pytest.raises(
ValueError, match="Step must be larger than 0. Got step=0"
):
traj.truncate_to_step(0)


def test_truncate_trajectory_reporter(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test TrajectoryReporter.truncate_to_step().
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [
f"{temp_dir}/truncate_reporter_trajectory_{idx}.h5" for idx in range(2)
]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

trajectory_reporter.truncate_to_step(step=min(trajectory_reporter.last_steps))
assert trajectory_reporter.last_steps == [5, 5]
with pytest.raises(
ValueError,
match=(
"Step 7 is greater than the minimum last step "
r"across trajectories \(5\)\."
),
):
trajectory_reporter.truncate_to_step(7)
# try negative number
with pytest.raises(ValueError, match="Step must be greater than 0. Got step=-2"):
trajectory_reporter.truncate_to_step(-2)
# truncate to step 3
trajectory_reporter.truncate_to_step(3)
assert trajectory_reporter.last_steps == [3, 3]


def test_integrate_uneven_trajectory_append(
si_double_sim_state: SimState, lj_model: LennardJonesModel
) -> None:
"""
Test appending to an existing trajectory with uneven frames running ts.integrate.
Expected behavior: ts.integrate should first truncate all trajectories to the shortest
length, and then append new frames to all trajectories.
"""

# Create a temporary trajectory file
with tempfile.TemporaryDirectory() as temp_dir:
traj_files = [
f"{temp_dir}/uneven_integrate_trajectory_{idx}.h5" for idx in range(2)
]

# Initialize model and state
trajectory_reporter = ts.TrajectoryReporter(
traj_files,
state_frequency=1,
prop_calculators={1: {"velocities": lambda state: state.velocities}},
)

# First integration run for 5 steps.
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
n_steps=5,
temperature=300.0,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter,
)

# Manually remove last two frames from second trajectory to create unevenness
with TorchSimTrajectory(traj_files[1], mode="a") as traj:
traj.truncate_to_step(3)

trajectory_reporter_2 = ts.TrajectoryReporter(
traj_files, state_frequency=1, trajectory_kwargs=dict(mode="a")
)
# Should raise a ValueError:
with pytest.raises(
ValueError, match="Cannot resume integration from inconsistent states"
):
_ = ts.integrate(
system=si_double_sim_state,
model=lj_model,
timestep=0.001,
temperature=300.0,
n_steps=4,
integrator=ts.Integrator.nvt_langevin,
trajectory_reporter=trajectory_reporter_2,
)
Loading
Loading