|
| 1 | +// Copyright 2023-, Semiotic AI, Inc. |
| 2 | +// SPDX-License-Identifier: Apache-2.0 |
| 3 | +use std::{ |
| 4 | + sync::{ |
| 5 | + atomic::{AtomicU64, Ordering}, |
| 6 | + Arc, |
| 7 | + }, |
| 8 | + time::{SystemTime, UNIX_EPOCH}, |
| 9 | +}; |
| 10 | + |
| 11 | +use anyhow::{Error, Result}; |
| 12 | +use jsonrpsee::{ |
| 13 | + core::{async_trait, client::ClientT}, |
| 14 | + http_client::{HttpClient, HttpClientBuilder}, |
| 15 | + proc_macros::rpc, |
| 16 | + rpc_params, |
| 17 | + server::{ServerBuilder, ServerHandle}, |
| 18 | +}; |
| 19 | +use tokio::sync::Mutex; |
| 20 | + |
| 21 | +use tap_aggregator::jsonrpsee_helpers; |
| 22 | +use tap_core::{ |
| 23 | + adapters::{ |
| 24 | + collateral_adapter::CollateralAdapter, rav_storage_adapter::RAVStorageAdapter, |
| 25 | + receipt_checks_adapter::ReceiptChecksAdapter, |
| 26 | + receipt_storage_adapter::ReceiptStorageAdapter, |
| 27 | + }, |
| 28 | + tap_manager::{Manager, SignedRAV, SignedReceipt}, |
| 29 | + tap_receipt::ReceiptCheck, |
| 30 | + Error as TapCoreError, |
| 31 | +}; |
| 32 | +/// Rpc trait represents a JSON-RPC server that has a single async method `request`. |
| 33 | +/// This method is designed to handle incoming JSON-RPC requests. |
| 34 | +#[rpc(server)] |
| 35 | +pub trait Rpc { |
| 36 | + // This async method is designed to handle incoming JSON-RPC requests. |
| 37 | + #[method(name = "request")] |
| 38 | + async fn request( |
| 39 | + &self, |
| 40 | + request_id: u64, // Unique identifier for the request |
| 41 | + receipt: SignedReceipt, // Signed receipt associated with the request |
| 42 | + ) -> Result<(), jsonrpsee::types::ErrorObjectOwned>; // The result of the request, a JSON-RPC error if it fails |
| 43 | +} |
| 44 | + |
| 45 | +/// RpcManager is a struct that implements the `Rpc` trait and it represents a JSON-RPC server manager. |
| 46 | +/// It includes a manager, initial_checks, receipt_count, threshold and aggregator_client. |
| 47 | +/// Manager holds a Mutex-protected instance of a generic `Manager` object which is shared and can be accessed by multiple threads. |
| 48 | +/// initial_checks is a list of checks that needs to be performed for every incoming request. |
| 49 | +/// receipt_count is a thread-safe counter that increments with each receipt verified and stored. |
| 50 | +/// threshold is a limit to which receipt_count can increment, after reaching which RAV request is triggered. |
| 51 | +/// aggregator_client is an HTTP client used for making JSON-RPC requests to another server. |
| 52 | +pub struct RpcManager< |
| 53 | + CA: CollateralAdapter + Send + 'static, // An instance of CollateralAdapter, marked as thread-safe with Send and given 'static lifetime |
| 54 | + RCA: ReceiptChecksAdapter + Send + 'static, // An instance of ReceiptChecksAdapter |
| 55 | + RSA: ReceiptStorageAdapter + Send + 'static, // An instance of ReceiptStorageAdapter |
| 56 | + RAVSA: RAVStorageAdapter + Send + 'static, // An instance of RAVStorageAdapter |
| 57 | +> { |
| 58 | + manager: Arc<Mutex<Manager<CA, RCA, RSA, RAVSA>>>, // Manager object in a mutex for thread safety, reference counted with an Arc |
| 59 | + initial_checks: Vec<ReceiptCheck>, // Vector of initial checks to be performed on each request |
| 60 | + receipt_count: Arc<AtomicU64>, // Thread-safe atomic counter for receipts |
| 61 | + threshold: u64, // The count at which a RAV request will be triggered |
| 62 | + aggregator_client: (HttpClient, String), // HTTP client for sending requests to the aggregator server |
| 63 | +} |
| 64 | + |
| 65 | +/// Implementation for `RpcManager`, includes the constructor and the `request` method. |
| 66 | +/// Constructor initializes a new instance of `RpcManager`. |
| 67 | +/// `request` method handles incoming JSON-RPC requests and it verifies and stores the receipt from the request. |
| 68 | +impl< |
| 69 | + CA: CollateralAdapter + Send + 'static, |
| 70 | + RCA: ReceiptChecksAdapter + Send + 'static, |
| 71 | + RSA: ReceiptStorageAdapter + Send + 'static, |
| 72 | + RAVSA: RAVStorageAdapter + Send + 'static, |
| 73 | + > RpcManager<CA, RCA, RSA, RAVSA> |
| 74 | +{ |
| 75 | + pub fn new( |
| 76 | + collateral_adapter: CA, |
| 77 | + receipt_checks_adapter: RCA, |
| 78 | + receipt_storage_adapter: RSA, |
| 79 | + rav_storage_adapter: RAVSA, |
| 80 | + initial_checks: Vec<ReceiptCheck>, |
| 81 | + required_checks: Vec<ReceiptCheck>, |
| 82 | + threshold: u64, |
| 83 | + aggregate_server_address: String, |
| 84 | + aggregate_server_api_version: String, |
| 85 | + ) -> Result<Self> { |
| 86 | + Ok(Self { |
| 87 | + manager: Arc::new(Mutex::new(Manager::<CA, RCA, RSA, RAVSA>::new( |
| 88 | + collateral_adapter, |
| 89 | + receipt_checks_adapter, |
| 90 | + rav_storage_adapter, |
| 91 | + receipt_storage_adapter, |
| 92 | + required_checks, |
| 93 | + get_current_timestamp_u64_ns()?, |
| 94 | + ))), |
| 95 | + initial_checks, |
| 96 | + receipt_count: Arc::new(AtomicU64::new(0)), |
| 97 | + threshold, |
| 98 | + aggregator_client: ( |
| 99 | + HttpClientBuilder::default().build(format!("{}", aggregate_server_address))?, |
| 100 | + aggregate_server_api_version, |
| 101 | + ), |
| 102 | + }) |
| 103 | + } |
| 104 | +} |
| 105 | + |
| 106 | +#[async_trait] |
| 107 | +impl< |
| 108 | + CA: CollateralAdapter + Send + 'static, |
| 109 | + RCA: ReceiptChecksAdapter + Send + 'static, |
| 110 | + RSA: ReceiptStorageAdapter + Send + 'static, |
| 111 | + RAVSA: RAVStorageAdapter + Send + 'static, |
| 112 | + > RpcServer for RpcManager<CA, RCA, RSA, RAVSA> |
| 113 | +{ |
| 114 | + async fn request( |
| 115 | + &self, |
| 116 | + request_id: u64, |
| 117 | + receipt: SignedReceipt, |
| 118 | + ) -> Result<(), jsonrpsee::types::ErrorObjectOwned> { |
| 119 | + let verify_result; |
| 120 | + { |
| 121 | + let mut manager_guard = self.manager.lock().await; |
| 122 | + verify_result = match manager_guard.verify_and_store_receipt( |
| 123 | + receipt, |
| 124 | + request_id, |
| 125 | + self.initial_checks.clone(), |
| 126 | + ) { |
| 127 | + Ok(_) => Ok(()), |
| 128 | + Err(e) => Err(to_rpc_error( |
| 129 | + Box::new(e), |
| 130 | + "Failed to verify and store receipt", |
| 131 | + )), |
| 132 | + }; |
| 133 | + } |
| 134 | + |
| 135 | + // Increment the receipt count |
| 136 | + self.receipt_count.fetch_add(1, Ordering::Relaxed); |
| 137 | + let rav_request_valid = if self.receipt_count.load(Ordering::SeqCst) >= self.threshold { |
| 138 | + // Reset the counter after reaching the threshold |
| 139 | + self.receipt_count.store(0, Ordering::SeqCst); |
| 140 | + |
| 141 | + // Create the aggregate_receipts request params |
| 142 | + let time_stamp_buffer = 0; |
| 143 | + match request_rav(&self.manager, time_stamp_buffer, &self.aggregator_client).await { |
| 144 | + Ok(_) => Ok(()), |
| 145 | + Err(e) => Err(to_rpc_error(e.into(), "Failed to request rav")), |
| 146 | + } |
| 147 | + } else { |
| 148 | + Ok(()) |
| 149 | + }; |
| 150 | + |
| 151 | + // Combine the results |
| 152 | + match (verify_result, rav_request_valid) { |
| 153 | + (Ok(_), Ok(_)) => Ok(()), |
| 154 | + (Err(e), _) | (_, Err(e)) => Err(e), |
| 155 | + } |
| 156 | + } |
| 157 | +} |
| 158 | + |
| 159 | +/// run_server function initializes and starts a JSON-RPC server that handles incoming requests. |
| 160 | +pub async fn run_server< |
| 161 | + CA: CollateralAdapter + Send + 'static, |
| 162 | + RCA: ReceiptChecksAdapter + Send + 'static, |
| 163 | + RSA: ReceiptStorageAdapter + Send + 'static, |
| 164 | + RAVSA: RAVStorageAdapter + Send + 'static, |
| 165 | +>( |
| 166 | + port: u16, // Port on which the server will listen |
| 167 | + collateral_adapter: CA, // CollateralAdapter instance |
| 168 | + receipt_checks_adapter: RCA, // ReceiptChecksAdapter instance |
| 169 | + receipt_storage_adapter: RSA, // ReceiptStorageAdapter instance |
| 170 | + rav_storage_adapter: RAVSA, // RAVStorageAdapter instance |
| 171 | + initial_checks: Vec<ReceiptCheck>, // Vector of initial checks to be performed on each request |
| 172 | + required_checks: Vec<ReceiptCheck>, // Vector of required checks to be performed on each request |
| 173 | + threshold: u64, // The count at which a RAV request will be triggered |
| 174 | + aggregate_server_address: String, // Address of the aggregator server |
| 175 | + aggregate_server_api_version: String, // API version of the aggregator server |
| 176 | +) -> Result<(ServerHandle, std::net::SocketAddr)> { |
| 177 | + // Setting up the JSON RPC server |
| 178 | + println!("Starting server..."); |
| 179 | + let server = ServerBuilder::new() |
| 180 | + .http_only() |
| 181 | + .build(format!("127.0.0.1:{}", port)) |
| 182 | + .await?; |
| 183 | + let addr = server.local_addr()?; |
| 184 | + println!("Listening on: {}", addr); |
| 185 | + let rpc_manager = RpcManager::new( |
| 186 | + collateral_adapter, |
| 187 | + receipt_checks_adapter, |
| 188 | + receipt_storage_adapter, |
| 189 | + rav_storage_adapter, |
| 190 | + initial_checks, |
| 191 | + required_checks, |
| 192 | + threshold, |
| 193 | + aggregate_server_address, |
| 194 | + aggregate_server_api_version, |
| 195 | + )?; |
| 196 | + |
| 197 | + let handle = server.start(rpc_manager.into_rpc())?; |
| 198 | + Ok((handle, addr)) |
| 199 | +} |
| 200 | + |
| 201 | +// request_rav function creates a request for aggregate receipts (RAV), sends it to another server and verifies the result. |
| 202 | +async fn request_rav< |
| 203 | + CA: CollateralAdapter + Send + 'static, |
| 204 | + RCA: ReceiptChecksAdapter + Send + 'static, |
| 205 | + RSA: ReceiptStorageAdapter + Send + 'static, |
| 206 | + RAVSA: RAVStorageAdapter + Send + 'static, |
| 207 | +>( |
| 208 | + manager: &Arc<Mutex<Manager<CA, RCA, RSA, RAVSA>>>, // Mutex-protected manager object for thread safety |
| 209 | + time_stamp_buffer: u64, // Buffer for timestamping, see tap_core for details |
| 210 | + aggregator_client: &(HttpClient, String), // HttpClient for making requests to the tap_aggregator server |
| 211 | +) -> Result<()> { |
| 212 | + let rav; |
| 213 | + // Create the aggregate_receipts request params |
| 214 | + { |
| 215 | + let mut manager_guard = manager.lock().await; |
| 216 | + rav = manager_guard.create_rav_request(time_stamp_buffer)?; |
| 217 | + } |
| 218 | + match rav.invalid_receipts.is_empty() { |
| 219 | + true => Ok(()), |
| 220 | + false => Err(Error::msg("Invalid receipts found")), |
| 221 | + }?; |
| 222 | + |
| 223 | + // To-do: Need to add previous RAV, when tap_manager supports replacing receipts |
| 224 | + let params = rpc_params!(&aggregator_client.1, &rav.valid_receipts, None::<()>); |
| 225 | + |
| 226 | + // Call the aggregate_receipts method on the other server |
| 227 | + let remote_rav_result: jsonrpsee_helpers::JsonRpcResponse<SignedRAV> = aggregator_client |
| 228 | + .0 |
| 229 | + .request("aggregate_receipts", params) |
| 230 | + .await?; |
| 231 | + { |
| 232 | + let mut manager_guard = manager.lock().await; |
| 233 | + let _result = |
| 234 | + manager_guard.verify_and_store_rav(rav.expected_rav, remote_rav_result.data)?; |
| 235 | + } |
| 236 | + Ok(()) |
| 237 | +} |
| 238 | + |
| 239 | +// get_current_timestamp_u64_ns function returns current system time since UNIX_EPOCH as a 64-bit unsigned integer. |
| 240 | +fn get_current_timestamp_u64_ns() -> Result<u64> { |
| 241 | + Ok(SystemTime::now() |
| 242 | + .duration_since(UNIX_EPOCH) |
| 243 | + .map_err(|err| TapCoreError::InvalidSystemTime { |
| 244 | + source_error_message: err.to_string(), |
| 245 | + })? |
| 246 | + .as_nanos() as u64) |
| 247 | +} |
| 248 | + |
| 249 | +fn to_rpc_error(e: Box<dyn std::error::Error>, msg: &str) -> jsonrpsee::types::ErrorObjectOwned { |
| 250 | + jsonrpsee::types::ErrorObject::owned(-32000, format!("{} - {}", e.to_string(), msg), None::<()>) |
| 251 | +} |
0 commit comments