Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions Data/README_TRACTIVE_THERMAL_MODELING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Run Thermal Modeling
Todo

### Run Tests

```bash
python -m unittest discover -s test
```

#### Plot the curve
```bash
python ./TractiveBatteryThermalModelViewer.py -h
usage: TractiveBatteryThermalModelViewer.py [-h] --path_parquet PATH_PARQUET [--column-name COLUMN_NAME] [--t-end T_END] [--initial-temp INITIAL_TEMP]

View tractive battery thermal model as a function of current draw.

options:
-h, --help show this help message and exit
--path_parquet PATH_PARQUET
Path to the Parquet file containing current data.
--column-name COLUMN_NAME
Name of the current column in the Parquet file (default: SME_TEMP_BusCurrent).
--t-end T_END End time in seconds for the simulation (default: 60).
--initial-temp INITIAL_TEMP
Initial temperature in °C (default: 22).
```

Example Command :

```
python TractiveBatteryThermalModelViewer.py --path_parquet <file_name>
```
155 changes: 155 additions & 0 deletions Data/Simultaneous_Plot_Viewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# read current draw from parquet file
from fs4BatteryThermalModelingEx import thermal_ode_solve_ivp
import argparse
import polars as pl
import numpy as np
import matplotlib.pyplot as plt
from numpy.typing import NDArray
from typing import Tuple
from scipy.integrate import solve_ivp

def load_avg_temp_from_parquet(path: str) -> NDArray[np.float64]:
"""
Load average temperature from all ACC_SEG*_TEMPS_CELL* columns in a Parquet file.

Parameters
----------
path : str
Path to the Parquet file containing temperature data.

Returns
-------
NDArray[np.float64]
Array of average temperatures across all cells.
"""
columns_list = [
"ACC_SEG0_TEMPS_CELL0", "ACC_SEG0_TEMPS_CELL1", "ACC_SEG0_TEMPS_CELL2",
"ACC_SEG0_TEMPS_CELL3", "ACC_SEG0_TEMPS_CELL4", "ACC_SEG0_TEMPS_CELL5",
"ACC_SEG1_TEMPS_CELL0", "ACC_SEG1_TEMPS_CELL1", "ACC_SEG1_TEMPS_CELL2",
"ACC_SEG1_TEMPS_CELL3", "ACC_SEG1_TEMPS_CELL4", "ACC_SEG1_TEMPS_CELL5",
"ACC_SEG2_TEMPS_CELL0", "ACC_SEG2_TEMPS_CELL1", "ACC_SEG2_TEMPS_CELL2",
"ACC_SEG2_TEMPS_CELL3", "ACC_SEG2_TEMPS_CELL4", "ACC_SEG2_TEMPS_CELL5",
"ACC_SEG3_TEMPS_CELL0", "ACC_SEG3_TEMPS_CELL1", "ACC_SEG3_TEMPS_CELL2",
"ACC_SEG3_TEMPS_CELL3", "ACC_SEG3_TEMPS_CELL4", "ACC_SEG3_TEMPS_CELL5",
"ACC_SEG4_TEMPS_CELL0", "ACC_SEG4_TEMPS_CELL1", "ACC_SEG4_TEMPS_CELL2",
"ACC_SEG4_TEMPS_CELL3", "ACC_SEG4_TEMPS_CELL4", "ACC_SEG4_TEMPS_CELL5"
]

temps_df = pl.read_parquet(path, columns=columns_list)
# take the mean of all the columns
avg_temp_df = temps_df.select(pl.mean_horizontal(pl.col(columns_list)).alias("avg"))
return avg_temp_df["avg"].to_numpy()

def run_thermal_model_from_parquet(
path: str,
current_column: str = "SME_TEMP_BusCurrent",
t_span: Tuple[float, float] = (0, 10),
initial_temp: float = 30,
t_eval: NDArray[np.float64] | None = None,
):
"""
Load current data from a Parquet file and run the thermal ODE model.

Parameters
----------
path : str
Path to the Parquet file containing current data.
current_column : str, optional
Name of the current column in the Parquet file (default: "SME_TEMP_BusCurrent").
t_span : Tuple[float, float], optional
Integration time span (t0, tf) in seconds (default: (0, 10)).
initial_temp : float, optional
Initial temperature in °C (default: 30).
t_eval : NDArray[np.float64] | None, optional
Times at which to evaluate the solution. If None, uses 100 points over t_span.

Returns
-------
OdeResult
Solution from thermal_ode_solve_ivp containing temperature predictions.
"""
df = pl.read_parquet(path)
current_draw = df[current_column].to_numpy()

if t_eval is None:
t_eval = np.linspace(t_span[0], t_span[1], 100, dtype=np.float64)

return thermal_ode_solve_ivp(current_draw, t_span, initial_temp, t_eval)

def parse_args() -> argparse.Namespace:
"""
Parse command line arguments.

Returns
-------
argparse.Namespace
Parsed command line arguments.
"""
parser = argparse.ArgumentParser(
description="Compare thermal model predictions with measured temperatures from parquet data."
)
parser.add_argument(
"parquet_path",
type=str,
help="Path to the Parquet file containing temperature and current data.",
)
return parser.parse_args()


def plot_temperature_comparison(
solution,
av_temp_array: NDArray[np.float64],
t_span: Tuple[float, float] = (0, 10),
) -> None:
"""
Plot comparison between thermal model prediction and measured temperatures.

Parameters
----------
solution : OdeResult
Solution from thermal_ode_solve_ivp containing model predictions.
av_temp_array : NDArray[np.float64]
Array of measured average temperatures.
t_span : Tuple[float, float], optional
Time span (t0, tf) in seconds for the actual data (default: (0, 10)).
"""
# Create time array for av_temp_array (assuming same time span as model)
t_actual = np.linspace(t_span[0], t_span[1], len(av_temp_array))

# Plot the temperature vs time in the same plot window
plt.figure() # Explicitly create a single figure
plt.plot(solution.t, solution.y[0], label="Model prediction", linewidth=2)
plt.plot(t_actual, av_temp_array, label="Measured temperature", linewidth=2, alpha=0.7)
plt.xlabel("Time [s]")
plt.ylabel("Temperature [°C]")
plt.title("Thermal Model Comparison")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()


def main() -> None:
"""
Main function to load data, run thermal model, and plot comparison.
"""
args = parse_args()
parquet_path = args.parquet_path

# Load average temperature data from parquet
av_temp_array = load_avg_temp_from_parquet(parquet_path)

# Run thermal model
sol = run_thermal_model_from_parquet(parquet_path)

# Check if solution was successful
if not sol.success:
raise RuntimeError(f"ODE solver failed: {sol.message}")

# Plot the comparison
plot_temperature_comparison(sol, av_temp_array, t_span=(0, 10))

# run the main function: python Simultaneous_Plot_Viewer.py "/Users/gautham/Documents/fs-data/FS-3/08102025/08102025Endurance1_FirstHalf.parquet"
if __name__ == "__main__":
main()

116 changes: 116 additions & 0 deletions Data/TractiveBatteryThermalModelViewer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# TractiveBatteryThermalModelViewer.py

import argparse
import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import NDArray
import polars as pl

from fs4BatteryThermalModelingEx import thermal_ode_solve_ivp

def load_current_from_parquet(path: str, column: str) -> NDArray[np.float64]:
"""
Load a single current column from a Parquet file as a NumPy array.
"""
# df = pl.read_parquet(path, columns=[column])
# df = pl.read_parquet(path, columns=["ACC_SEG0_TEMPS_CELL0", "ACC_SEG0_TEMPS_CELL1"])
# # take the average of 64 Columns and create a dataframe with the average value
# avg_df = df.select(
# pl.mean_horizontal("ACC_SEG0_TEMPS_CELL0", "ACC_SEG0_TEMPS_CELL1").alias("avg")
# )
csv_df = pl.read_csv("../Docs/Columns.csv").select("Column Name") # or columns=["c1"] to only read c1

filtered_df = csv_df.filter(
# pl.col("Column Name").str.contains("ACC_SEG0_TEMPS", literal=True)
pl.col("Column Name").str.contains(r"ACC_SEG\d+_TEMPS_") # \d+ = one or more
)

acc_seg_temps_list: list[str] = filtered_df["Column Name"].to_list()
cleaned_acc_seg_temps_list = [s.strip("'") for s in acc_seg_temps_list]
parquet_df = pl.read_parquet(path, columns= cleaned_acc_seg_temps_list)
# take the average of 64 Columns and create a dataframe with the average value
avg_df = parquet_df.select(
pl.mean_horizontal(pl.col(cleaned_acc_seg_temps_list)).alias("avg")
)
return avg_df["avg"].to_numpy()


def run_thermal_model(
current_draw: NDArray[np.float64],
t_end: float,
initial_temp: float,
):
"""
Run the thermal ODE solver over [0, t_end] for the given current profile.
"""
t_span = (0.0, t_end)
t_eval = np.linspace(t_span[0], t_span[1], len(current_draw), dtype=np.float64)
return thermal_ode_solve_ivp(
current_draw=current_draw,
t_span=t_span,
initial_temp=initial_temp,
t_eval=t_eval,
)


def plot_temperature(solution) -> None:
"""
Plot temperature vs time from a solve_ivp solution.
"""
t = solution.t
T = solution.y[0]

plt.figure()
plt.plot(t, T, label="Cell temperature")
plt.xlabel("Time [s]")
plt.ylabel("Temperature [°C]")
plt.title("Tractive Battery Thermal Model")
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()

def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="View tractive battery thermal model as a function of current draw."
)
parser.add_argument(
"--path_parquet",
required=True,
help="Path to the Parquet file containing current data.",
)
parser.add_argument(
"--column-name",
default="SME_TEMP_BusCurrent",
help="Name of the current column in the Parquet file "
"(default: SME_TEMP_BusCurrent).",
)
parser.add_argument(
"--t-end",
type=float,
default=60.0,
help="End time in seconds for the simulation (default: 60).",
)
parser.add_argument(
"--initial-temp",
type=float,
default=22.0,
help="Initial temperature in °C (default: 22).",
)
return parser.parse_args()


def main() -> None:
args = parse_args()
current = load_current_from_parquet(args.path_parquet, args.column_name)

solution = run_thermal_model(current, args.t_end, args.initial_temp)

if not solution.success:
raise RuntimeError(f"ODE solver failed: {solution.message}")

plot_temperature(solution)


if __name__ == "__main__":
main()
Loading