@@ -26,12 +26,14 @@ const DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC: (&str, &str) = (
2626 "bootstrapped-subscribe-topic" ,
2727 "cardano.sequence.bootstrapped" ,
2828) ;
29+ const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC : ( & str , & str ) =
30+ ( "blocks-subscribe-topic" , "cardano.block.proposed" ) ;
2931const DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC : ( & str , & str ) = (
3032 "protocol-parameters-subscribe-topic" ,
3133 "cardano.protocol.parameters" ,
3234) ;
33- const DEFAULT_BLOCKS_SUBSCRIBE_TOPIC : ( & str , & str ) =
34- ( "blocks- subscribe-topic" , "cardano.block.proposed " ) ;
35+ const DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC : ( & str , & str ) =
36+ ( "spo-state- subscribe-topic" , "cardano.spo.state " ) ;
3537
3638/// Block KES Validator module
3739#[ module(
@@ -50,6 +52,7 @@ impl BlockKesValidator {
5052 mut bootstrapped_subscription : Box < dyn Subscription < Message > > ,
5153 mut blocks_subscription : Box < dyn Subscription < Message > > ,
5254 mut protocol_parameters_subscription : Box < dyn Subscription < Message > > ,
55+ mut spo_state_subscription : Box < dyn Subscription < Message > > ,
5356 ) -> Result < ( ) > {
5457 let ( _, bootstrapped_message) = bootstrapped_subscription. read ( ) . await ?;
5558 let genesis = match bootstrapped_message. as_ref ( ) {
@@ -80,6 +83,7 @@ impl BlockKesValidator {
8083 if is_new_epoch {
8184 // read epoch boundary messages
8285 let protocol_parameters_message_f = protocol_parameters_subscription. read ( ) ;
86+ let spo_state_message_f = spo_state_subscription. read ( ) ;
8387
8488 let ( _, protocol_parameters_msg) = protocol_parameters_message_f. await ?;
8589 let span = info_span ! (
@@ -93,6 +97,19 @@ impl BlockKesValidator {
9397 }
9498 _ => error ! ( "Unexpected message type: {protocol_parameters_msg:?}" ) ,
9599 } ) ;
100+
101+ let ( _, spo_state_msg) = spo_state_message_f. await ?;
102+ let span = info_span ! (
103+ "block_kes_validator.handle_spo_state" ,
104+ epoch = block_info. epoch
105+ ) ;
106+ span. in_scope ( || match spo_state_msg. as_ref ( ) {
107+ Message :: Cardano ( ( block_info, CardanoMessage :: SPOState ( msg) ) ) => {
108+ Self :: check_sync ( & current_block, block_info) ;
109+ state. handle_spo_state ( msg) ;
110+ }
111+ _ => error ! ( "Unexpected message type: {spo_state_msg:?}" ) ,
112+ } ) ;
96113 }
97114
98115 let span =
@@ -133,25 +150,32 @@ impl BlockKesValidator {
133150 . get_string ( DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC . 0 )
134151 . unwrap_or ( DEFAULT_BOOTSTRAPPED_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
135152 info ! ( "Creating subscriber for bootstrapped on '{bootstrapped_subscribe_topic}'" ) ;
136- let protocol_parameters_subscribe_topic = config
137- . get_string ( DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC . 0 )
138- . unwrap_or ( DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
139- info ! ( "Creating subscriber for protocol parameters on '{protocol_parameters_subscribe_topic}'" ) ;
140153
141154 let blocks_subscribe_topic = config
142155 . get_string ( DEFAULT_BLOCKS_SUBSCRIBE_TOPIC . 0 )
143156 . unwrap_or ( DEFAULT_BLOCKS_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
144157 info ! ( "Creating blocks subscription on '{blocks_subscribe_topic}'" ) ;
145158
159+ let protocol_parameters_subscribe_topic = config
160+ . get_string ( DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC . 0 )
161+ . unwrap_or ( DEFAULT_PROTOCOL_PARAMETERS_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
162+ info ! ( "Creating subscriber for protocol parameters on '{protocol_parameters_subscribe_topic}'" ) ;
163+
164+ let spo_state_subscribe_topic = config
165+ . get_string ( DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC . 0 )
166+ . unwrap_or ( DEFAULT_SPO_STATE_SUBSCRIBE_TOPIC . 1 . to_string ( ) ) ;
167+ info ! ( "Creating spo state subscription on '{spo_state_subscribe_topic}'" ) ;
168+
146169 // publishers
147170 let kes_validation_publisher =
148171 KesValidationPublisher :: new ( context. clone ( ) , validation_kes_publisher_topic) ;
149172
150173 // Subscribers
151174 let bootstrapped_subscription = context. subscribe ( & bootstrapped_subscribe_topic) . await ?;
175+ let blocks_subscription = context. subscribe ( & blocks_subscribe_topic) . await ?;
152176 let protocol_parameters_subscription =
153177 context. subscribe ( & protocol_parameters_subscribe_topic) . await ?;
154- let blocks_subscription = context. subscribe ( & blocks_subscribe_topic ) . await ?;
178+ let spo_state_subscription = context. subscribe ( & spo_state_subscribe_topic ) . await ?;
155179
156180 // state history
157181 let history = Arc :: new ( Mutex :: new ( StateHistory :: < State > :: new (
@@ -167,6 +191,7 @@ impl BlockKesValidator {
167191 bootstrapped_subscription,
168192 blocks_subscription,
169193 protocol_parameters_subscription,
194+ spo_state_subscription,
170195 )
171196 . await
172197 . unwrap_or_else ( |e| error ! ( "Failed: {e}" ) ) ;
0 commit comments