-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontrol_algo.py
More file actions
130 lines (109 loc) · 5.77 KB
/
control_algo.py
File metadata and controls
130 lines (109 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import numpy as np
import pickle
import os
import tensorflow as tf
from tensorflow.keras.models import load_model
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from sklearn.preprocessing import MinMaxScaler
# Import custom components from your packages
from packages.custom_layers import TransformerEncoderBlock, registered_mean_pooling_op
from packages.model_utils import predict_sequence
# --- LOCAL POST-PROCESSING PARAMETERS (MUST MATCH main.py) ---
LOCAL_ZERO_TOLERANCE = 0.001
LOCAL_PREDICTION_BUFFER = 2
def run_control_algorithm():
print("--- Starting control algorithm ---")
control_dir = "controls"
# Load the trained model
try:
# custom_objects est crucial pour charger les modèles avec des couches personnalisées
# Ajout de safe_mode=False pour permettre le chargement des couches Lambda avec des fonctions Python
model = load_model(os.path.join(control_dir, 'transformer_like_model.keras'),
compile=False,
custom_objects={'TransformerEncoderBlock': TransformerEncoderBlock},
safe_mode=False)
print("Model loaded successfully.")
except Exception as e:
print(f"Error loading model from '{control_dir}/': {e}")
print("Please ensure 'main.py' has been run to save the model in the 'controls' directory.")
return
# Load the scalers
try:
with open(os.path.join(control_dir, 'coord_scaler.pkl'), 'rb') as f:
coord_scaler = pickle.load(f)
with open(os.path.join(control_dir, 'scalar_scaler.pkl'), 'rb') as f:
scalar_scaler = pickle.load(f)
print("Scalers loaded successfully.")
except Exception as e:
print(f"Error loading scalers from '{control_dir}/': {e}")
print("Please ensure 'main.py' has been run to save the scalers in the 'controls' directory.")
return
# Load the example data for prediction
try:
original_input_coords = np.load(os.path.join(control_dir, 'original_input_coords_example.npy')).tolist()
original_scalar = np.load(os.path.join(control_dir, 'original_scalar_example.npy')).tolist()
original_target_coords = np.load(os.path.join(control_dir, 'original_target_coords_example.npy')).tolist()
max_len_input_coords = np.load(os.path.join(control_dir, 'max_len_input_coords.npy')).item()
max_len_output_coords = np.load(os.path.join(control_dir, 'max_len_output_coords.npy')).item()
print("Example test data loaded successfully.")
except Exception as e:
print(f"Error loading example data from '{control_dir}/': {e}")
print("Please ensure 'main.py' has been run to save the example data in the 'controls' directory.")
return
print(f"\nInput data for control: {original_input_coords}")
print(f"Scalar input for control (Service Load, Joisting, Input Len, Output Len): {original_scalar}")
print(f"Original target for control: {original_target_coords}")
print(f"Loaded max_len_input_coords: {max_len_input_coords}")
print(f"Loaded max_len_output_coords: {max_len_output_coords}")
print("\n--- Executing prediction in control algo ---")
# Perform prediction using the loaded components and utility function
predicted_coords = predict_sequence(
original_input_coords, original_scalar,
model,
max_len_output_coords, max_len_input_coords,
coord_scaler, scalar_scaler,
LOCAL_ZERO_TOLERANCE, LOCAL_PREDICTION_BUFFER
)
print(f"Predicted Coords by model: {predicted_coords}")
print("\n--- Control Logic ---")
# Compare predicted vs. actual target
if original_target_coords and predicted_coords:
target_np = np.array(original_target_coords)
predicted_np = np.array(predicted_coords)
min_len = min(len(target_np), len(predicted_np))
if min_len > 0:
mse = mean_squared_error(target_np[:min_len], predicted_np[:min_len])
rmse = np.sqrt(mse)
print(f"RMSE of prediction vs. target (on {min_len} points): {rmse:.4f}")
if rmse < 0.1: # Example threshold
print("Conclusion: The prediction is good (low RMSE). Control action: Proceed as planned.")
else:
print("Conclusion: The prediction is less accurate (high RMSE). Control action: Re-evaluate or use fallback plan.")
else:
print("Cannot calculate RMSE: one of the sequences is empty after depadding.")
elif not original_target_coords and not predicted_coords:
print("Conclusion: Both target and prediction are empty. This is expected behavior for this sample.")
else:
print("Conclusion: Discrepancy between target and prediction (one is empty, the other is not). Investigate.")
# Plot results for visual inspection
plt.figure(figsize=(10, 6))
if original_input_coords:
input_coords_np = np.array(original_input_coords)
plt.plot(input_coords_np[:, 0], input_coords_np[:, 1], 'bo', label='Input Coords')
if original_target_coords:
target_coords_np = np.array(original_target_coords)
plt.plot(target_coords_np[:, 0], target_coords_np[:, 1], 'go', label='Target Coords (Actual)')
if predicted_coords:
predicted_coords_np = np.array(predicted_coords)
plt.plot(predicted_coords_np[:, 0], predicted_coords_np[:, 1], 'rx', label='Predicted Coords')
plt.title("Sequence Prediction: Control Algorithm Check")
plt.xlabel("X Coordinate")
plt.ylabel("Y Coordinate")
plt.legend()
plt.grid(True)
plt.gca().set_aspect('equal', adjustable='box') # Keep aspect ratio for spatial data
plt.show()
print("Control algorithm finished.")
if __name__ == "__main__":
run_control_algorithm()