Skip to content

Commit 6ce9b80

Browse files
committed
wip: hot-reloading endpoint
1 parent 3328a92 commit 6ce9b80

File tree

2 files changed

+154
-26
lines changed

2 files changed

+154
-26
lines changed

rust/src/processing/consumer.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ impl ProcessingConsumer {
796796
last_config_version: &Arc<AtomicU64>,
797797
consumer_id: &str,
798798
) -> Result<bool> {
799-
let (current_hash, node_configs) = {
799+
let (current_hash, node_configs, graph_config) = {
800800
let config_read = config.read().unwrap();
801801
let hash = Self::calculate_config_hash(&config_read.processing);
802802
let node_configs = config_read
@@ -806,7 +806,8 @@ impl ProcessingConsumer {
806806
.iter()
807807
.map(|node_config| (node_config.id.clone(), node_config.clone()))
808808
.collect::<HashMap<String, _>>();
809-
(hash, node_configs)
809+
let graph_config = config_read.processing.default_graph.clone();
810+
(hash, node_configs, graph_config)
810811
};
811812

812813
let last_hash = last_config_version.load(Ordering::Relaxed);
@@ -875,6 +876,32 @@ impl ProcessingConsumer {
875876
"ProcessingConsumer '{}': {} nodes require processing graph rebuild for full configuration update",
876877
consumer_id, needs_rebuild_count
877878
);
879+
880+
// Reconstruct the processing graph from the updated configuration
881+
match ProcessingGraph::from_config(&graph_config) {
882+
Ok(new_graph) => {
883+
// Update the processing graph
884+
{
885+
let mut graph_write = processing_graph.write().await;
886+
*graph_write = new_graph;
887+
}
888+
889+
info!(
890+
"ProcessingConsumer '{}': Processing graph successfully reconstructed with {} nodes requiring rebuild",
891+
consumer_id, needs_rebuild_count
892+
);
893+
}
894+
Err(e) => {
895+
error!(
896+
"ProcessingConsumer '{}': Failed to reconstruct processing graph: {}",
897+
consumer_id, e
898+
);
899+
return Err(anyhow::anyhow!(
900+
"Failed to reconstruct processing graph: {}",
901+
e
902+
));
903+
}
904+
}
878905
}
879906

880907
Ok(true)

rust/src/visualization/api/graph/graph.rs

Lines changed: 125 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ pub async fn get_graph(
135135

136136
/// Post new node configuration
137137
///
138-
/// **Endpoint:** `POST /api/graph/config/<node_id>`
138+
/// **Endpoint:** `POST /api/graph/config`
139139
///
140140
/// This endpoint allows updating the configuration of processing nodes that support hot-reloading.
141141
/// The configuration changes are applied to the shared configuration state and will be automatically
@@ -205,17 +205,17 @@ pub async fn get_graph(
205205
/// - `403 Forbidden`: Token lacks required `admin:api` scope
206206
/// - `500 Internal Server Error`: Server error processing the request or configuration lock failure
207207
#[openapi_protect_post(
208-
"/api/graph/config/<node_id>",
208+
"/api/graph/config",
209209
"admin:api",
210210
tag = "Processing",
211211
data = "<new_config>"
212212
)]
213213
pub async fn post_node_config(
214214
config: &ConfigState,
215215
shared_state: &State<SharedVisualizationState>,
216-
node_id: String,
217216
new_config: Json<NodeConfig>,
218217
) -> Result<Json<serde_json::Value>, status::BadRequest<String>> {
218+
let node_id = new_config.id.clone();
219219
// Chain all validations using match expressions to avoid early returns
220220
match shared_state.get_processing_graph().await {
221221
Some(graph) => {
@@ -236,36 +236,105 @@ pub async fn post_node_config(
236236
{
237237
Some(node_config) => {
238238
// Extract the parameters from the new NodeConfig
239-
let new_node_config = new_config.into_inner();
240-
let new_params = &new_node_config.parameters;
239+
let new_node_config = new_config.into_inner(); // Validate that id and node_type match the existing node
240+
if new_node_config.id != node_config.id {
241+
let err_msg = format!(
242+
"ID mismatch: request ID '{}' does not match existing node ID '{}'",
243+
new_node_config.id, node_config.id
244+
);
245+
return rocket::Either::Right(Err(status::BadRequest(
246+
err_msg,
247+
)));
248+
}
249+
if new_node_config.node_type != node_config.node_type {
250+
let err_msg = format!(
251+
"Node type mismatch: request node_type '{}' does not match existing node_type '{}'",
252+
new_node_config.node_type, node_config.node_type
253+
);
254+
return rocket::Either::Right(Err(status::BadRequest(
255+
err_msg,
256+
)));
257+
}
241258

242-
// Merge the new configuration with the existing parameters
243-
if let Some(new_params_obj) = new_params.as_object() {
259+
let new_params = &new_node_config.parameters; // Validate parameter compatibility before merging
260+
let validation_result = if let Some(new_params_obj) =
261+
new_params.as_object()
262+
{
244263
if let Some(existing_params_obj) =
245-
node_config.parameters.as_object_mut()
264+
node_config.parameters.as_object()
246265
{
247-
// Update existing parameters with new values
248-
for (key, value) in new_params_obj {
249-
existing_params_obj
250-
.insert(key.clone(), value.clone());
266+
// Validate each new parameter
267+
let mut validation_errors = Vec::new();
268+
for (key, new_value) in new_params_obj {
269+
match existing_params_obj.get(key) {
270+
Some(existing_value) => {
271+
// Check type compatibility
272+
if !are_json_types_compatible(
273+
existing_value,
274+
new_value,
275+
) {
276+
validation_errors.push(format!(
277+
"Parameter '{}' type mismatch. Expected: {}, Got: {}",
278+
key,
279+
get_json_type_name(existing_value),
280+
get_json_type_name(new_value)
281+
));
282+
}
283+
}
284+
None => {
285+
validation_errors.push(format!(
286+
"Parameter '{}' does not exist in node '{}' configuration",
287+
key, node_id
288+
));
289+
}
290+
}
291+
}
292+
if validation_errors.is_empty() {
293+
Ok(())
294+
} else {
295+
Err(validation_errors.join("; "))
251296
}
252297
} else {
253-
// If existing parameters is not an object, replace entirely
254-
node_config.parameters = new_params.clone();
298+
Ok(()) // If existing params is not an object, allow replacement
255299
}
256300
} else {
257-
// If new params is not an object, replace entirely
258-
node_config.parameters = new_params.clone();
259-
}
301+
Ok(()) // If new params is not an object, allow replacement
302+
};
260303

261-
// Log the configuration update
262-
info!(
263-
"Updated configuration for node '{}' via API. Hot-reload will be detected by monitoring thread.",
264-
node_id
265-
);
304+
match validation_result {
305+
Ok(()) => {
306+
// Validation passed, proceed with merging
307+
if let Some(new_params_obj) = new_params.as_object() {
308+
if let Some(existing_params_obj) =
309+
node_config.parameters.as_object_mut()
310+
{
311+
// Update existing parameters with new values (already validated)
312+
for (key, value) in new_params_obj {
313+
existing_params_obj
314+
.insert(key.clone(), value.clone());
315+
}
316+
} else {
317+
// If existing parameters is not an object, replace entirely
318+
node_config.parameters = new_params.clone();
319+
}
320+
} else {
321+
// If new params is not an object, replace entirely
322+
node_config.parameters = new_params.clone();
323+
}
324+
325+
// Log the configuration update
326+
info!(
327+
"Updated configuration for node '{}' via API. Hot-reload will be detected by monitoring thread.",
328+
node_id
329+
);
266330

267-
// Return the updated parameters
268-
Ok(Json(node_config.parameters.clone()))
331+
// Return the updated parameters
332+
Ok(Json(node_config.parameters.clone()))
333+
}
334+
Err(validation_error) => {
335+
Err(status::BadRequest(validation_error))
336+
}
337+
}
269338
}
270339
None => Err(status::BadRequest(format!(
271340
"Node '{}' not found in configuration",
@@ -293,6 +362,38 @@ pub async fn post_node_config(
293362
}
294363
}
295364

365+
/// Check if two JSON values have compatible types
366+
///
367+
/// Compatible types are:
368+
/// - Same primitive types (bool, string, number)
369+
/// - Both objects (for nested parameter structures)
370+
/// - Both arrays (for array parameters)
371+
fn are_json_types_compatible(existing: &serde_json::Value, new: &serde_json::Value) -> bool {
372+
use serde_json::Value::*;
373+
match (existing, new) {
374+
(Bool(_), Bool(_)) => true,
375+
(Number(_), Number(_)) => true,
376+
(String(_), String(_)) => true,
377+
(Array(_), Array(_)) => true,
378+
(Object(_), Object(_)) => true,
379+
(Null, Null) => true,
380+
_ => false,
381+
}
382+
}
383+
384+
/// Get a human-readable name for a JSON value type
385+
fn get_json_type_name(value: &serde_json::Value) -> &'static str {
386+
use serde_json::Value::*;
387+
match value {
388+
Bool(_) => "boolean",
389+
Number(_) => "number",
390+
String(_) => "string",
391+
Array(_) => "array",
392+
Object(_) => "object",
393+
Null => "null",
394+
}
395+
}
396+
296397
/// Centralized function to get all graph routes with OpenAPI documentation
297398
pub fn get_graph_routes() -> (Vec<rocket::Route>, OpenApi) {
298399
openapi_get_routes_spec![get_graph_statistics, get_graph, post_node_config]

0 commit comments

Comments
 (0)