@@ -47,6 +47,8 @@ pub struct ProcessingConsumer {
4747 config : Option < Arc < StdRwLock < crate :: config:: Config > > > ,
4848 /// Last known configuration version (for change detection)
4949 last_config_version : Arc < AtomicU64 > ,
50+ /// Last known node parameters for fine-grained change detection
51+ last_node_parameters : Arc < RwLock < HashMap < String , serde_json:: Value > > > ,
5052}
5153
5254/// Processing statistics
@@ -87,6 +89,7 @@ impl ProcessingConsumer {
8789 visualization_state : None ,
8890 config : None ,
8991 last_config_version : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
92+ last_node_parameters : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
9093 }
9194 }
9295
@@ -117,6 +120,7 @@ impl ProcessingConsumer {
117120 visualization_state : Some ( visualization_state) ,
118121 config : None ,
119122 last_config_version : Arc :: new ( AtomicU64 :: new ( 0 ) ) ,
123+ last_node_parameters : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
120124 }
121125 }
122126
@@ -154,6 +158,7 @@ impl ProcessingConsumer {
154158 visualization_state : Some ( visualization_state) ,
155159 config : Some ( config) ,
156160 last_config_version : Arc :: new ( AtomicU64 :: new ( initial_hash) ) ,
161+ last_node_parameters : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) ,
157162 }
158163 }
159164
@@ -730,13 +735,45 @@ impl ProcessingConsumer {
730735 let consumer_id = self . consumer_id . clone ( ) ;
731736 let running = Arc :: clone ( & self . running ) ;
732737 let last_config_version = Arc :: clone ( & self . last_config_version ) ;
738+ let last_node_parameters = Arc :: clone ( & self . last_node_parameters ) ;
733739
734740 tokio:: spawn ( async move {
735741 info ! (
736742 "ProcessingConsumer '{}': Starting configuration monitoring" ,
737743 consumer_id
738744 ) ;
739745
746+ // Initialize last_node_parameters with current configuration
747+ {
748+ let initial_node_configs = {
749+ let config_read = config. read ( ) . unwrap ( ) ;
750+ config_read
751+ . processing
752+ . default_graph
753+ . nodes
754+ . iter ( )
755+ . map ( |node_config| ( node_config. id . clone ( ) , node_config. parameters . clone ( ) ) )
756+ . collect :: < Vec < _ > > ( )
757+ } ;
758+
759+ let mut last_params = last_node_parameters. write ( ) . await ;
760+
761+ for ( node_id, node_params) in initial_node_configs {
762+ if let Ok ( parameters) = serde_json:: to_value ( & node_params) {
763+ last_params. insert ( node_id. clone ( ) , parameters) ;
764+ debug ! (
765+ "ProcessingConsumer '{}': Initialized parameters for node '{}'" ,
766+ consumer_id, node_id
767+ ) ;
768+ }
769+ }
770+
771+ info ! (
772+ "ProcessingConsumer '{}': Initialized {} node parameter sets for change detection" ,
773+ consumer_id, last_params. len( )
774+ ) ;
775+ }
776+
740777 while running. load ( Ordering :: Relaxed ) {
741778 // Check for configuration changes every 1 second
742779 tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
@@ -745,6 +782,7 @@ impl ProcessingConsumer {
745782 & config,
746783 & processing_graph,
747784 & last_config_version,
785+ & last_node_parameters,
748786 & consumer_id,
749787 )
750788 . await
@@ -794,6 +832,7 @@ impl ProcessingConsumer {
794832 config : & Arc < StdRwLock < crate :: config:: Config > > ,
795833 processing_graph : & Arc < RwLock < ProcessingGraph > > ,
796834 last_config_version : & Arc < AtomicU64 > ,
835+ last_node_parameters : & Arc < RwLock < HashMap < String , serde_json:: Value > > > ,
797836 consumer_id : & str ,
798837 ) -> Result < bool > {
799838 let ( current_hash, node_configs, graph_config) = {
@@ -818,19 +857,58 @@ impl ProcessingConsumer {
818857 consumer_id, last_hash, current_hash
819858 ) ;
820859
821- // Apply hot-reload updates to compatible nodes
822- let mut hot_reload_updates = HashMap :: new ( ) ;
823- let mut graph = processing_graph. write ( ) . await ;
860+ // Detect which specific nodes have changed by comparing individual parameters
861+ let changed_nodes = {
862+ let mut last_params = last_node_parameters. write ( ) . await ;
863+ let mut changed = HashMap :: new ( ) ;
864+
865+ for ( node_id, node_config) in & node_configs {
866+ if let Ok ( current_params) = serde_json:: to_value ( & node_config. parameters ) {
867+ // Check if this node's parameters have changed
868+ let has_changed = match last_params. get ( node_id) {
869+ Some ( last_params_for_node) => last_params_for_node != & current_params,
870+ None => true , // New node
871+ } ;
824872
825- for ( node_id, node_config) in node_configs {
826- // Convert node configuration to parameters JSON
827- if let Ok ( parameters) = serde_json:: to_value ( & node_config. parameters ) {
828- hot_reload_updates. insert ( node_id, parameters) ;
873+ if has_changed {
874+ debug ! (
875+ "ProcessingConsumer '{}': Node '{}' parameters changed" ,
876+ consumer_id, node_id
877+ ) ;
878+ changed. insert ( node_id. clone ( ) , current_params. clone ( ) ) ;
879+ }
880+
881+ // Update the stored parameters
882+ last_params. insert ( node_id. clone ( ) , current_params) ;
883+ }
829884 }
885+
886+ changed
887+ } ;
888+
889+ if changed_nodes. is_empty ( ) {
890+ info ! (
891+ "ProcessingConsumer '{}': Configuration hash changed but no individual node parameters changed" ,
892+ consumer_id
893+ ) ;
894+ last_config_version. store ( current_hash, Ordering :: Relaxed ) ;
895+ return Ok ( true ) ;
830896 }
831897
832- // Apply batch updates to all nodes
833- let results = graph. update_multiple_node_configs ( & hot_reload_updates) ;
898+ info ! (
899+ "ProcessingConsumer '{}': {} nodes have changed parameters: {}" ,
900+ consumer_id,
901+ changed_nodes. len( ) ,
902+ changed_nodes
903+ . keys( )
904+ . map( |s| s. as_str( ) )
905+ . collect:: <Vec <_>>( )
906+ . join( ", " )
907+ ) ;
908+
909+ // Apply hot-reload updates only to nodes that actually changed
910+ let mut graph = processing_graph. write ( ) . await ;
911+ let results = graph. update_multiple_node_configs ( & changed_nodes) ;
834912
835913 // Log results
836914 let mut updated_count = 0 ;
0 commit comments