Skip to content

Commit c1c7b90

Browse files
authored
Merge pull request hyperledger-indy#832 from jovfer/feature/sp_plugged
Feature Plugged StateProof parsing
2 parents dc57c32 + 96a3293 commit c1c7b90

File tree

9 files changed

+414
-89
lines changed

9 files changed

+414
-89
lines changed

doc/design/008-state-proof-pluggable-parsing/README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ extern fn CustomFree(data: *mut c_char) -> ErrorCode;
2525
Libindy API will contain call to register handler for specific transaction type:
2626
```rust
2727
extern fn indy_register_transaction_parser_for_sp(command_handle: i32,
28-
pool_handle: i32,
2928
txn_type: *const c_char,
3029
parser: CustomTransactionParser,
3130
free: CustomFree,

libindy/src/api/ledger.rs

Lines changed: 67 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1616,4 +1616,70 @@ pub extern fn indy_parse_get_revoc_reg_delta_response(command_handle: i32,
16161616
trace!("indy_parse_get_revoc_reg_delta_response: <<< res: {:?}", res);
16171617

16181618
res
1619-
}
1619+
}
1620+
1621+
/// Callback type for parsing Reply from Node to specific StateProof format
1622+
///
1623+
/// # params
1624+
/// reply_from_node: string representation of node's reply ("as is")
1625+
/// parsed_sp: out param to return serialized as string JSON with array of ParsedSP
1626+
///
1627+
/// # return
1628+
/// result ErrorCode
1629+
///
1630+
/// Note: this method allocate memory for result string `CustomFree` should be called to deallocate it
1631+
pub type CustomTransactionParser = extern fn(reply_from_node: *const c_char, parsed_sp: *mut *const c_char) -> ErrorCode;
1632+
1633+
/// Callback type to deallocate result buffer `parsed_sp` from `CustomTransactionParser`
1634+
pub type CustomFree = extern fn(data: *const c_char) -> ErrorCode;
1635+
1636+
1637+
/// Register callbacks (see type description for `CustomTransactionParser` and `CustomFree`
1638+
///
1639+
/// # params
1640+
/// command_handle: command handle to map callback to caller context.
1641+
/// txn_type: type of transaction to apply `parse` callback.
1642+
/// parse: required callback to parse reply for state proof.
1643+
/// free: required callback to deallocate memory.
1644+
/// cb: Callback that takes command result as parameter.
1645+
///
1646+
/// # returns
1647+
/// Status of callbacks registration.
1648+
///
1649+
/// # errors
1650+
/// Common*
1651+
#[no_mangle]
1652+
pub extern fn indy_register_transaction_parser_for_sp(command_handle: i32,
1653+
txn_type: *const c_char,
1654+
parser: Option<CustomTransactionParser>,
1655+
free: Option<CustomFree>,
1656+
cb: Option<extern fn(command_handle_: i32, err: ErrorCode)>) -> ErrorCode {
1657+
trace!("indy_register_transaction_parser_for_sp: >>> txn_type {:?}, parser {:?}, free {:?}",
1658+
txn_type, parser, free);
1659+
1660+
check_useful_c_str!(txn_type, ErrorCode::CommonInvalidParam2);
1661+
check_useful_c_callback!(parser, ErrorCode::CommonInvalidParam3);
1662+
check_useful_c_callback!(free, ErrorCode::CommonInvalidParam4);
1663+
check_useful_c_callback!(cb, ErrorCode::CommonInvalidParam5);
1664+
1665+
trace!("indy_register_transaction_parser_for_sp: entities: txn_type {}, parser {:?}, free {:?}",
1666+
txn_type, parser, free);
1667+
1668+
let res = CommandExecutor::instance()
1669+
.send(Command::Ledger(LedgerCommand::RegisterSPParser(
1670+
txn_type,
1671+
parser,
1672+
free,
1673+
Box::new(move |res| {
1674+
let res = result_to_err_code!(res);
1675+
trace!("indy_register_transaction_parser_for_sp: res: {:?}", res);
1676+
cb(command_handle, res)
1677+
}),
1678+
)));
1679+
1680+
let res = result_to_err_code!(res);
1681+
1682+
trace!("indy_register_transaction_parser_for_sp: <<< res: {:?}", res);
1683+
1684+
res
1685+
}

libindy/src/commands/ledger.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ extern crate indy_crypto;
33

44
use self::serde_json::Value;
55

6+
use api::ledger::{CustomFree, CustomTransactionParser};
7+
68
use errors::common::CommonError;
79
use errors::pool::PoolError;
810
use errors::crypto::CryptoError;
@@ -170,7 +172,12 @@ pub enum LedgerCommand {
170172
Box<Fn(Result<String, IndyError>) + Send>),
171173
ParseGetRevocRegDeltaResponse(
172174
String, // get revocation registry delta response
173-
Box<Fn(Result<(String, String, u64), IndyError>) + Send>)
175+
Box<Fn(Result<(String, String, u64), IndyError>) + Send>),
176+
RegisterSPParser(
177+
String, // txn type
178+
CustomTransactionParser,
179+
CustomFree,
180+
Box<Fn(Result<(), IndyError>) + Send>),
174181
}
175182

176183
pub struct LedgerCommandExecutor {
@@ -212,6 +219,10 @@ impl LedgerCommandExecutor {
212219
.expect("Expect callback to process ack command")
213220
(result.map_err(IndyError::from));
214221
}
222+
LedgerCommand::RegisterSPParser(txn_type, parser, free, cb) => {
223+
info!(target: "ledger_command_executor", "RegisterSPParser command received");
224+
cb(self.register_sp_parser(&txn_type, parser, free));
225+
}
215226
LedgerCommand::SignRequest(wallet_handle, submitter_did, request_json, cb) => {
216227
info!(target: "ledger_command_executor", "SignRequest command received");
217228
cb(self.sign_request(wallet_handle, &submitter_did, &request_json));
@@ -335,6 +346,15 @@ impl LedgerCommandExecutor {
335346
};
336347
}
337348

349+
fn register_sp_parser(&self, txn_type: &str,
350+
parser: CustomTransactionParser, free: CustomFree) -> Result<(), IndyError> {
351+
debug!("register_sp_parser >>> txn_type: {:?}, parser: {:?}, free: {:?}",
352+
txn_type, parser, free);
353+
354+
PoolService::register_sp_parser(txn_type, parser, free)
355+
.map_err(IndyError::from)
356+
}
357+
338358
fn sign_and_submit_request(&self,
339359
pool_handle: i32,
340360
wallet_handle: i32,

libindy/src/services/pool/mod.rs

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use std::fmt::Debug;
3030
use std::io::{Read, BufRead, Write};
3131
use std::ops::{Add, Sub};
3232

33+
use api::ledger::{CustomFree, CustomTransactionParser};
3334
use commands::{Command, CommandExecutor};
3435
use commands::ledger::LedgerCommand;
3536
use commands::pool::PoolCommand;
@@ -44,6 +45,7 @@ use utils::environment::EnvironmentUtils;
4445
use utils::sequence::SequenceUtils;
4546
use self::indy_crypto::bls::VerKey;
4647
use std::path::PathBuf;
48+
use std::sync::Mutex;
4749

4850
use self::indy_crypto::utils::json::{JsonDecodable, JsonEncodable};
4951

@@ -52,6 +54,10 @@ pub struct PoolService {
5254
open_pools: RefCell<HashMap<i32, Pool>>,
5355
}
5456

57+
lazy_static! {
58+
static ref REGISTERED_SP_PARSERS: Mutex<HashMap<String, (CustomTransactionParser, CustomFree)>> = Mutex::new(HashMap::new());
59+
}
60+
5561
struct Pool {
5662
name: String,
5763
id: i32,
@@ -251,26 +257,26 @@ impl PoolWorker {
251257
}
252258

253259
fn process_actions(&mut self, actions: Vec<ZMQLoopAction>) -> Result<(), PoolError> {
254-
for action in &actions {
255-
match action {
256-
&ZMQLoopAction::Terminate(cmd_id) => {
260+
for action in actions {
261+
match action {
262+
ZMQLoopAction::Terminate(cmd_id) => {
257263
let res = self.handler.flush_requests(Err(PoolError::Terminate));
258264
if cmd_id >= 0 {
259265
CommandExecutor::instance().send(Command::Pool(PoolCommand::CloseAck(cmd_id, res)))?;
260266
}
261267
return Err(PoolError::Terminate);
262268
}
263-
&ZMQLoopAction::Refresh(cmd_id) => {
269+
ZMQLoopAction::Refresh(cmd_id) => {
264270
self.refresh(cmd_id)?;
265271
}
266-
&ZMQLoopAction::MessageToProcess(ref msg) => {
272+
ZMQLoopAction::MessageToProcess(ref msg) => {
267273
if let Some(new_mt) = self.handler.process_msg(&msg.message, msg.node_idx)? {
268274
self.handler.flush_requests(Ok(()))?;
269275
self.handler = PoolWorkerHandler::TransactionHandler(Default::default());
270276
self.connect_to_known_nodes(Some(&new_mt))?;
271277
}
272278
}
273-
&ZMQLoopAction::RequestToSend(ref req) => {
279+
ZMQLoopAction::RequestToSend(ref req) => {
274280
self.handler.send_request(req.request.as_str(), req.id).or_else(|err| {
275281
CommandExecutor::instance()
276282
.send(Command::Ledger(LedgerCommand::SubmitAck(req.id, Err(err))))
@@ -279,7 +285,7 @@ impl PoolWorker {
279285
})
280286
})?;
281287
}
282-
&ZMQLoopAction::Timeout => {
288+
ZMQLoopAction::Timeout => {
283289
self.handler.process_timeout()?;
284290
}
285291
}
@@ -783,6 +789,26 @@ impl PoolService {
783789
Ok(cmd_id)
784790
}
785791

792+
pub fn register_sp_parser(txn_type: &str,
793+
parser: CustomTransactionParser, free: CustomFree) -> Result<(), PoolError> {
794+
if transaction_handler::REQUESTS_FOR_STATE_PROOFS.contains(&txn_type) {
795+
return Err(PoolError::CommonError(CommonError::InvalidStructure(
796+
format!("Try to override StateProof parser for default TXN_TYPE {}", txn_type))));
797+
}
798+
REGISTERED_SP_PARSERS.lock()
799+
.map(|mut map| {
800+
map.insert(txn_type.to_owned(), (parser, free));
801+
})
802+
.map_err(|_| PoolError::CommonError(CommonError::InvalidState(
803+
"Can't register new SP parser: mutex lock error".to_owned())))
804+
}
805+
806+
pub fn get_sp_parser(txn_type: &str) -> Option<(CustomTransactionParser, CustomFree)> {
807+
REGISTERED_SP_PARSERS.lock().ok().and_then(|map| {
808+
map.get(txn_type).map(Clone::clone)
809+
})
810+
}
811+
786812
pub fn close(&self, handle: i32) -> Result<i32, PoolError> {
787813
let cmd_id: i32 = SequenceUtils::get_next_id();
788814
self.open_pools.try_borrow_mut().map_err(CommonError::from)?
@@ -988,7 +1014,10 @@ mod tests {
9881014
cmd_sock: zmq::Context::new().socket(zmq::SocketType::PAIR).unwrap(),
9891015
open_cmd_id: 0,
9901016
name: "".to_string(),
991-
handler: PoolWorkerHandler::CatchupHandler(Default::default()),
1017+
handler: PoolWorkerHandler::CatchupHandler(CatchupHandler {
1018+
timeout: time::now_utc().add(Duration::seconds(2)),
1019+
..Default::default()
1020+
}),
9921021
}
9931022
}
9941023
}

0 commit comments

Comments
 (0)