|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +Boilerplate for 3D-PTV trajectory post-processing |
| 4 | +using xarray + dask + zarr |
| 5 | +
|
| 6 | +Features: |
| 7 | +- Ragged array encoding for trajectories of different lengths |
| 8 | +- Vector-style storage (obs, component) |
| 9 | +- Spline smoothing and derivative calculation (velocity, acceleration) |
| 10 | +- Resampling onto a uniform time base |
| 11 | +- Streaming mode: append processed trajectories to a Zarr store |
| 12 | +""" |
| 13 | + |
| 14 | +import numpy as np |
| 15 | +import xarray as xr |
| 16 | +from scipy.interpolate import UnivariateSpline |
| 17 | +import zarr |
| 18 | + |
| 19 | +# --------------------------------------------------------------------- |
| 20 | +# 1. Example: Ragged array encoding for variable-length trajectories |
| 21 | +# --------------------------------------------------------------------- |
| 22 | + |
| 23 | +def build_ragged_example(): |
| 24 | + traj1_t = np.array([0, 1, 2]) |
| 25 | + traj1_x = np.array([0, 1, 2]) |
| 26 | + traj1_y = np.array([0, 1, 4]) |
| 27 | + traj1_z = np.array([0, 0, 0]) |
| 28 | + |
| 29 | + traj2_t = np.array([0, 2, 4, 6, 8]) |
| 30 | + traj2_x = np.array([0, 2, 4, 6, 8]) |
| 31 | + traj2_y = np.array([0, -1, -2, -3, -4]) |
| 32 | + traj2_z = np.array([0, 1, 0, -1, 0]) |
| 33 | + |
| 34 | + # Concatenate into ragged structure |
| 35 | + times = np.concatenate([traj1_t, traj2_t]) |
| 36 | + positions = np.vstack([ |
| 37 | + np.stack([traj1_x, traj1_y, traj1_z], axis=-1), |
| 38 | + np.stack([traj2_x, traj2_y, traj2_z], axis=-1) |
| 39 | + ]) |
| 40 | + trajectory_id = np.concatenate([ |
| 41 | + np.full(traj1_t.shape, 0), |
| 42 | + np.full(traj2_t.shape, 1) |
| 43 | + ]) |
| 44 | + |
| 45 | + ds = xr.Dataset( |
| 46 | + { |
| 47 | + "t": ("obs", times), |
| 48 | + "pos": (("obs", "component"), positions), |
| 49 | + "trajectory": ("obs", trajectory_id), |
| 50 | + }, |
| 51 | + coords={"component": ["x", "y", "z"], "obs": np.arange(len(times))} |
| 52 | + ) |
| 53 | + |
| 54 | + return ds |
| 55 | + |
| 56 | + |
| 57 | +# --------------------------------------------------------------------- |
| 58 | +# 2. Compute derivatives (velocity, acceleration) in ragged array |
| 59 | +# --------------------------------------------------------------------- |
| 60 | + |
| 61 | +def compute_derivatives_ragged(ds): |
| 62 | + def _derivs(sub): |
| 63 | + dt = np.gradient(sub.t.values) |
| 64 | + dpos = np.gradient(sub.pos.values, axis=0) |
| 65 | + vel = dpos / dt[:, None] |
| 66 | + acc = np.gradient(vel, axis=0) / dt[:, None] |
| 67 | + return xr.Dataset({ |
| 68 | + "vel": (("obs", "component"), vel), |
| 69 | + "acc": (("obs", "component"), acc) |
| 70 | + }) |
| 71 | + |
| 72 | + derivs = ds.groupby("trajectory").map(_derivs) |
| 73 | + return xr.merge([ds, derivs]) |
| 74 | + |
| 75 | + |
| 76 | +# --------------------------------------------------------------------- |
| 77 | +# 3. Spline smoothing + resampling on uniform time base |
| 78 | +# --------------------------------------------------------------------- |
| 79 | + |
| 80 | +def smooth_and_resample(t, pos, t_uniform, s=0.0): |
| 81 | + """Smooth trajectory with spline, resample to uniform time base. |
| 82 | + Returns position, velocity, acceleration arrays of shape (len(t_uniform), 3). |
| 83 | + """ |
| 84 | + comps = [] |
| 85 | + vels = [] |
| 86 | + accs = [] |
| 87 | + for d in range(pos.shape[1]): # loop over x,y,z |
| 88 | + spline = UnivariateSpline(t, pos[:, d], s=s) |
| 89 | + p = spline(t_uniform) |
| 90 | + v = spline.derivative(1)(t_uniform) |
| 91 | + a = spline.derivative(2)(t_uniform) |
| 92 | + comps.append(p) |
| 93 | + vels.append(v) |
| 94 | + accs.append(a) |
| 95 | + |
| 96 | + pos_u = np.stack(comps, axis=-1) |
| 97 | + vel_u = np.stack(vels, axis=-1) |
| 98 | + acc_u = np.stack(accs, axis=-1) |
| 99 | + return pos_u, vel_u, acc_u |
| 100 | + |
| 101 | + |
| 102 | +# --------------------------------------------------------------------- |
| 103 | +# 4. Streaming mode: append processed trajectories to Zarr |
| 104 | +# --------------------------------------------------------------------- |
| 105 | + |
| 106 | +def init_zarr_store(store_path, t_uniform): |
| 107 | + components = ["x", "y", "z"] |
| 108 | + |
| 109 | + ds = xr.Dataset( |
| 110 | + data_vars={ |
| 111 | + "position": (("trajectory", "time", "component"), |
| 112 | + np.empty((0, len(t_uniform), len(components)))), |
| 113 | + "velocity": (("trajectory", "time", "component"), |
| 114 | + np.empty((0, len(t_uniform), len(components)))), |
| 115 | + "acceleration": (("trajectory", "time", "component"), |
| 116 | + np.empty((0, len(t_uniform), len(components)))) |
| 117 | + }, |
| 118 | + coords={ |
| 119 | + "time": t_uniform, |
| 120 | + "component": components, |
| 121 | + "trajectory": [] |
| 122 | + } |
| 123 | + ) |
| 124 | + ds.to_zarr(store_path, mode="w") |
| 125 | + |
| 126 | + |
| 127 | +def append_to_zarr(store_path, traj_id, t, pos, t_uniform, s=0.0): |
| 128 | + pos_u, vel_u, acc_u = smooth_and_resample(t, pos, t_uniform, s=s) |
| 129 | + |
| 130 | + new = xr.Dataset( |
| 131 | + { |
| 132 | + "position": (("trajectory", "time", "component"), pos_u[np.newaxis, ...]), |
| 133 | + "velocity": (("trajectory", "time", "component"), vel_u[np.newaxis, ...]), |
| 134 | + "acceleration": (("trajectory", "time", "component"), acc_u[np.newaxis, ...]), |
| 135 | + }, |
| 136 | + coords={ |
| 137 | + "trajectory": [traj_id], |
| 138 | + "time": t_uniform, |
| 139 | + "component": ["x", "y", "z"] |
| 140 | + } |
| 141 | + ) |
| 142 | + new.to_zarr(store_path, mode="a", append_dim="trajectory") |
| 143 | + |
| 144 | + |
| 145 | +# --------------------------------------------------------------------- |
| 146 | +# 5. Example usage |
| 147 | +# --------------------------------------------------------------------- |
| 148 | + |
| 149 | +if __name__ == "__main__": |
| 150 | + # Step 1: Build ragged example |
| 151 | + ds_ragged = build_ragged_example() |
| 152 | + print("Ragged dataset:") |
| 153 | + print(ds_ragged) |
| 154 | + |
| 155 | + # Step 2: Compute derivatives in ragged array |
| 156 | + ds_with_derivs = compute_derivatives_ragged(ds_ragged) |
| 157 | + print("\nRagged dataset with velocity and acceleration:") |
| 158 | + print(ds_with_derivs) |
| 159 | + |
| 160 | + # Step 3+4: Streaming to Zarr |
| 161 | + t_uniform = np.linspace(0, 8, 81) # uniform time base (0.1s step) |
| 162 | + store = "trajectories.zarr" |
| 163 | + init_zarr_store(store, t_uniform) |
| 164 | + |
| 165 | + # Add first trajectory |
| 166 | + obs0 = ds_ragged.where(ds_ragged.trajectory == 0, drop=True) |
| 167 | + append_to_zarr(store, traj_id=0, t=obs0.t.values, pos=obs0.pos.values, t_uniform=t_uniform, s=0.1) |
| 168 | + |
| 169 | + # Add second trajectory |
| 170 | + obs1 = ds_ragged.where(ds_ragged.trajectory == 1, drop=True) |
| 171 | + append_to_zarr(store, traj_id=1, t=obs1.t.values, pos=obs1.pos.values, t_uniform=t_uniform, s=0.1) |
| 172 | + |
| 173 | + # Open the final store lazily with xarray+dask |
| 174 | + ds_zarr = xr.open_zarr(store) |
| 175 | + print("\nZarr dataset (streamed trajectories):") |
| 176 | + print(ds_zarr) |
0 commit comments