@@ -30,6 +30,7 @@ use std::fmt::Debug;
3030use std:: io:: { Read , BufRead , Write } ;
3131use std:: ops:: { Add , Sub } ;
3232
33+ use api:: ledger:: { CustomFree , CustomTransactionParser } ;
3334use commands:: { Command , CommandExecutor } ;
3435use commands:: ledger:: LedgerCommand ;
3536use commands:: pool:: PoolCommand ;
@@ -44,6 +45,7 @@ use utils::environment::EnvironmentUtils;
4445use utils:: sequence:: SequenceUtils ;
4546use self :: indy_crypto:: bls:: VerKey ;
4647use std:: path:: PathBuf ;
48+ use std:: sync:: Mutex ;
4749
4850use self :: indy_crypto:: utils:: json:: { JsonDecodable , JsonEncodable } ;
4951
@@ -52,6 +54,10 @@ pub struct PoolService {
5254 open_pools : RefCell < HashMap < i32 , Pool > > ,
5355}
5456
57+ lazy_static ! {
58+ static ref REGISTERED_SP_PARSERS : Mutex <HashMap <String , ( CustomTransactionParser , CustomFree ) >> = Mutex :: new( HashMap :: new( ) ) ;
59+ }
60+
5561struct Pool {
5662 name : String ,
5763 id : i32 ,
@@ -251,26 +257,26 @@ impl PoolWorker {
251257 }
252258
253259 fn process_actions ( & mut self , actions : Vec < ZMQLoopAction > ) -> Result < ( ) , PoolError > {
254- for action in & actions {
255- match action {
256- & ZMQLoopAction :: Terminate ( cmd_id) => {
260+ for action in actions {
261+ match action {
262+ ZMQLoopAction :: Terminate ( cmd_id) => {
257263 let res = self . handler . flush_requests ( Err ( PoolError :: Terminate ) ) ;
258264 if cmd_id >= 0 {
259265 CommandExecutor :: instance ( ) . send ( Command :: Pool ( PoolCommand :: CloseAck ( cmd_id, res) ) ) ?;
260266 }
261267 return Err ( PoolError :: Terminate ) ;
262268 }
263- & ZMQLoopAction :: Refresh ( cmd_id) => {
269+ ZMQLoopAction :: Refresh ( cmd_id) => {
264270 self . refresh ( cmd_id) ?;
265271 }
266- & ZMQLoopAction :: MessageToProcess ( ref msg) => {
272+ ZMQLoopAction :: MessageToProcess ( ref msg) => {
267273 if let Some ( new_mt) = self . handler . process_msg ( & msg. message , msg. node_idx ) ? {
268274 self . handler . flush_requests ( Ok ( ( ) ) ) ?;
269275 self . handler = PoolWorkerHandler :: TransactionHandler ( Default :: default ( ) ) ;
270276 self . connect_to_known_nodes ( Some ( & new_mt) ) ?;
271277 }
272278 }
273- & ZMQLoopAction :: RequestToSend ( ref req) => {
279+ ZMQLoopAction :: RequestToSend ( ref req) => {
274280 self . handler . send_request ( req. request . as_str ( ) , req. id ) . or_else ( |err| {
275281 CommandExecutor :: instance ( )
276282 . send ( Command :: Ledger ( LedgerCommand :: SubmitAck ( req. id , Err ( err) ) ) )
@@ -279,7 +285,7 @@ impl PoolWorker {
279285 } )
280286 } ) ?;
281287 }
282- & ZMQLoopAction :: Timeout => {
288+ ZMQLoopAction :: Timeout => {
283289 self . handler . process_timeout ( ) ?;
284290 }
285291 }
@@ -783,6 +789,26 @@ impl PoolService {
783789 Ok ( cmd_id)
784790 }
785791
792+ pub fn register_sp_parser ( txn_type : & str ,
793+ parser : CustomTransactionParser , free : CustomFree ) -> Result < ( ) , PoolError > {
794+ if transaction_handler:: REQUESTS_FOR_STATE_PROOFS . contains ( & txn_type) {
795+ return Err ( PoolError :: CommonError ( CommonError :: InvalidStructure (
796+ format ! ( "Try to override StateProof parser for default TXN_TYPE {}" , txn_type) ) ) ) ;
797+ }
798+ REGISTERED_SP_PARSERS . lock ( )
799+ . map ( |mut map| {
800+ map. insert ( txn_type. to_owned ( ) , ( parser, free) ) ;
801+ } )
802+ . map_err ( |_| PoolError :: CommonError ( CommonError :: InvalidState (
803+ "Can't register new SP parser: mutex lock error" . to_owned ( ) ) ) )
804+ }
805+
806+ pub fn get_sp_parser ( txn_type : & str ) -> Option < ( CustomTransactionParser , CustomFree ) > {
807+ REGISTERED_SP_PARSERS . lock ( ) . ok ( ) . and_then ( |map| {
808+ map. get ( txn_type) . map ( Clone :: clone)
809+ } )
810+ }
811+
786812 pub fn close ( & self , handle : i32 ) -> Result < i32 , PoolError > {
787813 let cmd_id: i32 = SequenceUtils :: get_next_id ( ) ;
788814 self . open_pools . try_borrow_mut ( ) . map_err ( CommonError :: from) ?
@@ -988,7 +1014,10 @@ mod tests {
9881014 cmd_sock : zmq:: Context :: new ( ) . socket ( zmq:: SocketType :: PAIR ) . unwrap ( ) ,
9891015 open_cmd_id : 0 ,
9901016 name : "" . to_string ( ) ,
991- handler : PoolWorkerHandler :: CatchupHandler ( Default :: default ( ) ) ,
1017+ handler : PoolWorkerHandler :: CatchupHandler ( CatchupHandler {
1018+ timeout : time:: now_utc ( ) . add ( Duration :: seconds ( 2 ) ) ,
1019+ ..Default :: default ( )
1020+ } ) ,
9921021 }
9931022 }
9941023 }
0 commit comments