From 51ad234a6c4ed836d554e03a47bd7e707a81788a Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Thu, 13 Mar 2025 11:26:48 +0100 Subject: [PATCH 1/9] implement custom generator on IdKind --- core/src/client/mod.rs | 6 +- .../examples/core_client_with_request_id.rs | 75 +++++++++++++++++++ types/src/lib.rs | 2 +- types/src/request.rs | 34 ++++++++- 4 files changed, 114 insertions(+), 3 deletions(-) create mode 100644 examples/examples/core_client_with_request_id.rs diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index cea2442eaa..6ff6f5004f 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -33,6 +33,7 @@ cfg_async_client! { pub mod error; pub use error::Error; +use jsonrpsee_types::request::IdGeneratorFn; use std::fmt; use std::ops::Range; @@ -485,14 +486,17 @@ pub enum IdKind { String, /// Number. Number, + /// Custom generator. + Custom(IdGeneratorFn), } impl IdKind { - /// Generate an `Id` from number. + /// Generate an `Id` from number or from a registered generator. pub fn into_id(self, id: u64) -> Id<'static> { match self { IdKind::Number => Id::Number(id), IdKind::String => Id::Str(format!("{id}").into()), + IdKind::Custom(generator) => generator.call(), } } } diff --git a/examples/examples/core_client_with_request_id.rs b/examples/examples/core_client_with_request_id.rs new file mode 100644 index 0000000000..18d4985358 --- /dev/null +++ b/examples/examples/core_client_with_request_id.rs @@ -0,0 +1,75 @@ +// Copyright 2019-2021 Parity Technologies (UK) Ltd. +// +// Permission is hereby granted, free of charge, to any +// person obtaining a copy of this software and associated +// documentation files (the "Software"), to deal in the +// Software without restriction, including without +// limitation the rights to use, copy, modify, merge, +// publish, distribute, sublicense, and/or sell copies of +// the Software, and to permit persons to whom the Software +// is furnished to do so, subject to the following +// conditions: +// +// The above copyright notice and this permission notice +// shall be included in all copies or substantial portions +// of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF +// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED +// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR +// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +use std::net::SocketAddr; + +use jsonrpsee::client_transport::ws::{Url, WsTransportClientBuilder}; +use jsonrpsee::core::client::{Client, ClientBuilder, ClientT, IdKind}; +use jsonrpsee::rpc_params; +use jsonrpsee::server::{RpcModule, Server}; +use jsonrpsee::types::{Id, IdGeneratorFn}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); + + let addr = run_server().await?; + let uri = Url::parse(&format!("ws://{}", addr))?; + + let custom_generator = IdGeneratorFn::new(generate_timestamp_id); + + let (tx, rx) = WsTransportClientBuilder::default().build(uri).await?; + let client: Client = ClientBuilder::default().id_format(IdKind::Custom(custom_generator)).build_with_tokio(tx, rx); + + let response: String = client.request("say_hello", rpc_params![]).await?; + tracing::info!("response: {:?}", response); + + Ok(()) +} + +async fn run_server() -> anyhow::Result { + let server = Server::builder().build("127.0.0.1:0").await?; + let mut module = RpcModule::new(()); + module.register_method("say_hello", |_, _, _| "lo")?; + let addr = server.local_addr()?; + + let handle = server.start(module); + + // In this example we don't care about doing shutdown so let's it run forever. + // You may use the `ServerHandle` to shut it down or manage it yourself. + tokio::spawn(handle.stopped()); + + Ok(addr) +} + +fn generate_timestamp_id() -> Id<'static> { + let timestamp = + std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs(); + Id::Number(timestamp) +} diff --git a/types/src/lib.rs b/types/src/lib.rs index 48b6d87692..4658814fe4 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -45,5 +45,5 @@ pub mod error; pub use error::{ErrorCode, ErrorObject, ErrorObjectOwned}; pub use params::{Id, InvalidRequestId, Params, ParamsSequence, SubscriptionId, TwoPointZero}; -pub use request::{InvalidRequest, Notification, NotificationSer, Request, RequestSer}; +pub use request::{IdGeneratorFn, InvalidRequest, Notification, NotificationSer, Request, RequestSer}; pub use response::{Response, ResponsePayload, SubscriptionPayload, SubscriptionResponse, Success as ResponseSuccess}; diff --git a/types/src/request.rs b/types/src/request.rs index b954586c26..a61fb0a42a 100644 --- a/types/src/request.rs +++ b/types/src/request.rs @@ -27,7 +27,10 @@ //! Types to handle JSON-RPC requests according to the [spec](https://www.jsonrpc.org/specification#request-object). //! Some types come with a "*Ser" variant that implements [`serde::Serialize`]; these are used in the client. -use std::borrow::Cow; +use std::{ + borrow::Cow, + fmt::{Debug, Formatter, Result}, +}; use crate::{ Params, @@ -173,6 +176,35 @@ impl<'a> NotificationSer<'a> { } } +/// Custom id generator function +pub struct IdGeneratorFn(fn() -> Id<'static>); + +impl IdGeneratorFn { + /// Creates a new `IdGeneratorFn` from a function pointer. + pub fn new(generator: fn() -> Id<'static>) -> Self { + IdGeneratorFn(generator) + } + + /// Calls the id generator function + pub fn call(&self) -> Id<'static> { + (self.0)() + } +} + +impl Copy for IdGeneratorFn {} + +impl Clone for IdGeneratorFn { + fn clone(&self) -> Self { + *self + } +} + +impl Debug for IdGeneratorFn { + fn fmt(&self, f: &mut Formatter<'_>) -> Result { + f.write_str("") + } +} + #[cfg(test)] mod test { use super::{Cow, Id, InvalidRequest, Notification, NotificationSer, Request, RequestSer, TwoPointZero}; From 0ee2516be5df85837b3d8a13f888cc785317c436 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Fri, 14 Mar 2025 10:14:55 +0100 Subject: [PATCH 2/9] refactor next request id to handle the next id --- client/http-client/src/client.rs | 4 ++-- core/src/client/async_client/mod.rs | 6 +++--- core/src/client/mod.rs | 23 +++++++++++------------ 3 files changed, 16 insertions(+), 17 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 1955f0452c..95efeddd62 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -418,8 +418,8 @@ where let id_range = generate_batch_id_range(id, batch.len() as u64)?; let mut batch_request = Vec::with_capacity(batch.len()); - for ((method, params), id) in batch.into_iter().zip(id_range.clone()) { - let id = self.id_manager.as_id_kind().into_id(id); + for ((method, params), _id) in batch.into_iter().zip(id_range.clone()) { + let id = self.id_manager.next_request_id(); batch_request.push(RequestSer { jsonrpc: TwoPointZero, id, diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 3ed6353dea..dd9a29da10 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -64,7 +64,7 @@ use tokio::sync::{mpsc, oneshot}; use tracing::instrument; use self::utils::{InactivityCheck, IntervalStream}; -use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager}; +use super::{FrontToBack, IdKind, RequestIdManager, generate_batch_id_range, subscription_channel}; pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option>; @@ -536,8 +536,8 @@ impl ClientT for Client { let id_range = generate_batch_id_range(id, batch.len() as u64)?; let mut batches = Vec::with_capacity(batch.len()); - for ((method, params), id) in batch.into_iter().zip(id_range.clone()) { - let id = self.id_manager.as_id_kind().into_id(id); + for ((method, params), _id) in batch.into_iter().zip(id_range.clone()) { + let id = self.id_manager.next_request_id(); batches.push(RequestSer { jsonrpc: TwoPointZero, id, diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 6ff6f5004f..75092bdb12 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -470,7 +470,17 @@ impl RequestIdManager { /// Attempts to get the next request ID. pub fn next_request_id(&self) -> Id<'static> { - self.id_kind.into_id(self.current_id.next()) + match self.id_kind { + IdKind::Number => { + let id = self.current_id.next(); + Id::Number(id) + } + IdKind::String => { + let id = self.current_id.next(); + Id::Str(format!("{id}").into()) + } + IdKind::Custom(generator) => generator.call(), + } } /// Get a handle to the `IdKind`. @@ -490,17 +500,6 @@ pub enum IdKind { Custom(IdGeneratorFn), } -impl IdKind { - /// Generate an `Id` from number or from a registered generator. - pub fn into_id(self, id: u64) -> Id<'static> { - match self { - IdKind::Number => Id::Number(id), - IdKind::String => Id::Str(format!("{id}").into()), - IdKind::Custom(generator) => generator.call(), - } - } -} - #[derive(Debug)] struct CurrentId(AtomicUsize); From 8f7b3818cab66294f6acd0b56f2c8f2b393bee29 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Sat, 15 Mar 2025 21:09:11 +0100 Subject: [PATCH 3/9] move batch id generation to request id manager --- client/http-client/src/client.rs | 8 +++----- core/src/client/async_client/mod.rs | 8 +++----- core/src/client/mod.rs | 23 +++++++++++++---------- 3 files changed, 19 insertions(+), 20 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 95efeddd62..acb45278e2 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -36,7 +36,7 @@ use async_trait::async_trait; use hyper::body::Bytes; use hyper::http::HeaderMap; use jsonrpsee_core::client::{ - BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, generate_batch_id_range, + BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, }; use jsonrpsee_core::params::BatchRequestBuilder; use jsonrpsee_core::traits::ToRpcParams; @@ -414,12 +414,10 @@ where None => None, }; let batch = batch.build()?; - let id = self.id_manager.next_request_id(); - let id_range = generate_batch_id_range(id, batch.len() as u64)?; + let (id_range, ids) = self.id_manager.generate_batch_id_range(batch.len()); let mut batch_request = Vec::with_capacity(batch.len()); - for ((method, params), _id) in batch.into_iter().zip(id_range.clone()) { - let id = self.id_manager.next_request_id(); + for ((method, params), id) in batch.into_iter().zip(ids) { batch_request.push(RequestSer { jsonrpc: TwoPointZero, id, diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index dd9a29da10..d44579cb79 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -64,7 +64,7 @@ use tokio::sync::{mpsc, oneshot}; use tracing::instrument; use self::utils::{InactivityCheck, IntervalStream}; -use super::{FrontToBack, IdKind, RequestIdManager, generate_batch_id_range, subscription_channel}; +use super::{FrontToBack, IdKind, RequestIdManager, subscription_channel}; pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option>; @@ -532,12 +532,10 @@ impl ClientT for Client { R: DeserializeOwned, { let batch = batch.build()?; - let id = self.id_manager.next_request_id(); - let id_range = generate_batch_id_range(id, batch.len() as u64)?; + let (id_range, ids) = self.id_manager.generate_batch_id_range(batch.len()); let mut batches = Vec::with_capacity(batch.len()); - for ((method, params), _id) in batch.into_iter().zip(id_range.clone()) { - let id = self.id_manager.next_request_id(); + for ((method, params), id) in batch.into_iter().zip(ids) { batches.push(RequestSer { jsonrpc: TwoPointZero, id, diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 75092bdb12..0dbaf16640 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -487,6 +487,19 @@ impl RequestIdManager { pub fn as_id_kind(&self) -> IdKind { self.id_kind } + + /// Generate a range of IDs and the corresponding list of request IDs for a batch request. + pub fn generate_batch_id_range(&self, batch_size: usize) -> (Range, Vec>) { + let start_id = self.current_id.next(); + let id_range = start_id..(start_id + batch_size as u64); + + let ids = match self.id_kind { + IdKind::Number => id_range.clone().map(Id::Number).collect(), + _ => (0..batch_size).map(|_| self.next_request_id()).collect(), + }; + + (id_range, ids) + } } /// JSON-RPC request object id data type. @@ -516,16 +529,6 @@ impl CurrentId { } } -/// Generate a range of IDs to be used in a batch request. -pub fn generate_batch_id_range(id: Id, len: u64) -> Result, Error> { - let id_start = id.try_parse_inner_as_number()?; - let id_end = id_start - .checked_add(len) - .ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?; - - Ok(id_start..id_end) -} - /// Represent a single entry in a batch response. pub type BatchEntry<'a, R> = Result>; From 150f07d87b3e0e78dc7cc9afaf7fe63793ebae38 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Sat, 15 Mar 2025 21:41:41 +0100 Subject: [PATCH 4/9] simplify implementation --- client/http-client/src/client.rs | 5 +++-- core/src/client/async_client/mod.rs | 5 +++-- core/src/client/mod.rs | 19 ++++++++----------- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index acb45278e2..7844817cb1 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -414,10 +414,11 @@ where None => None, }; let batch = batch.build()?; - let (id_range, ids) = self.id_manager.generate_batch_id_range(batch.len()); + let id_range = self.id_manager.generate_batch_id_range(batch.len()); let mut batch_request = Vec::with_capacity(batch.len()); - for ((method, params), id) in batch.into_iter().zip(ids) { + for (method, params) in batch.into_iter() { + let id = self.id_manager.next_request_id(); batch_request.push(RequestSer { jsonrpc: TwoPointZero, id, diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index d44579cb79..33fd067f1d 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -532,10 +532,11 @@ impl ClientT for Client { R: DeserializeOwned, { let batch = batch.build()?; - let (id_range, ids) = self.id_manager.generate_batch_id_range(batch.len()); + let id_range = self.id_manager.generate_batch_id_range(batch.len()); let mut batches = Vec::with_capacity(batch.len()); - for ((method, params), id) in batch.into_iter().zip(ids) { + for (method, params) in batch.into_iter() { + let id = self.id_manager.next_request_id(); batches.push(RequestSer { jsonrpc: TwoPointZero, id, diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 0dbaf16640..ea6919e747 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -488,17 +488,10 @@ impl RequestIdManager { self.id_kind } - /// Generate a range of IDs and the corresponding list of request IDs for a batch request. - pub fn generate_batch_id_range(&self, batch_size: usize) -> (Range, Vec>) { - let start_id = self.current_id.next(); - let id_range = start_id..(start_id + batch_size as u64); - - let ids = match self.id_kind { - IdKind::Number => id_range.clone().map(Id::Number).collect(), - _ => (0..batch_size).map(|_| self.next_request_id()).collect(), - }; - - (id_range, ids) + /// Generate a range of IDs for a batch request. + pub fn generate_batch_id_range(&self, batch_size: usize) -> Range { + let start_id = self.current_id.get(); + start_id..(start_id + batch_size as u64) } } @@ -527,6 +520,10 @@ impl CurrentId { .try_into() .expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed") } + + fn get(&self) -> u64 { + self.0.load(Ordering::Relaxed) as u64 + } } /// Represent a single entry in a batch response. From 3d714a06c5387a1ebeb1304b6860e979e3eedde1 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Thu, 20 Mar 2025 21:54:23 +0100 Subject: [PATCH 5/9] refactor: replaces ranges with vectors of ids --- client/http-client/src/client.rs | 16 ++++------ core/src/client/async_client/helpers.rs | 30 ++++++++----------- core/src/client/async_client/manager.rs | 30 +++++++++++-------- core/src/client/async_client/mod.rs | 40 ++++++++++--------------- core/src/client/mod.rs | 13 +------- 5 files changed, 53 insertions(+), 76 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 7844817cb1..8c4f7940f4 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -414,17 +414,18 @@ where None => None, }; let batch = batch.build()?; - let id_range = self.id_manager.generate_batch_id_range(batch.len()); + let mut ids = Vec::new(); let mut batch_request = Vec::with_capacity(batch.len()); for (method, params) in batch.into_iter() { let id = self.id_manager.next_request_id(); batch_request.push(RequestSer { jsonrpc: TwoPointZero, - id, + id: id.clone(), method: method.into(), params: params.map(StdCow::Owned), }); + ids.push(id); } let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); @@ -446,7 +447,7 @@ where } for rp in json_rps { - let id = rp.id.try_parse_inner_as_number()?; + let id = rp.id.clone(); let res = match ResponseSuccess::try_from(rp) { Ok(r) => { @@ -460,13 +461,8 @@ where } }; - let maybe_elem = id - .checked_sub(id_range.start) - .and_then(|p| p.try_into().ok()) - .and_then(|p: usize| responses.get_mut(p)); - - if let Some(elem) = maybe_elem { - *elem = res; + if let Some(pos) = ids.iter().position(|stored_id| stored_id == &id) { + responses[pos] = res; } else { return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into()); } diff --git a/core/src/client/async_client/helpers.rs b/core/src/client/async_client/helpers.rs index 21c54b85a9..89c644187f 100644 --- a/core/src/client/async_client/helpers.rs +++ b/core/src/client/async_client/helpers.rs @@ -24,6 +24,8 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use std::collections::BTreeMap; + use crate::client::async_client::manager::{RequestManager, RequestStatus}; use crate::client::async_client::{Notification, LOG_TARGET}; use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError}; @@ -39,11 +41,10 @@ use jsonrpsee_types::{ ErrorObject, Id, InvalidRequestId, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse, }; use serde_json::Value as JsonValue; -use std::ops::Range; #[derive(Debug, Clone)] pub(crate) struct InnerBatchResponse { - pub(crate) id: u64, + pub(crate) id: Id<'static>, pub(crate) result: Result>, } @@ -53,36 +54,29 @@ pub(crate) struct InnerBatchResponse { pub(crate) fn process_batch_response( manager: &mut RequestManager, rps: Vec, - range: Range, + ids: Vec>, ) -> Result<(), InvalidRequestId> { - let mut responses = Vec::with_capacity(rps.len()); - - let start_idx = range.start; - - let batch_state = match manager.complete_pending_batch(range.clone()) { + let batch_state = match manager.complete_pending_batch(ids.clone()) { Some(state) => state, None => { tracing::debug!(target: LOG_TARGET, "Received unknown batch response"); - return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range))); + return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", ids))); } }; - for _ in range { - let err_obj = ErrorObject::borrowed(0, "", None); - responses.push(Err(err_obj)); - } + let mut responses_map: BTreeMap, Result<_, ErrorObject>> = + ids.iter().map(|id| (id.clone(), Err(ErrorObject::borrowed(0, "", None)))).collect(); for rp in rps { - let maybe_elem = - rp.id.checked_sub(start_idx).and_then(|p| p.try_into().ok()).and_then(|p: usize| responses.get_mut(p)); - - if let Some(elem) = maybe_elem { - *elem = rp.result; + if let Some(entry) = responses_map.get_mut(&rp.id) { + *entry = rp.result; } else { return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string())); } } + let responses: Vec<_> = responses_map.into_values().collect(); + let _ = batch_state.send_back.send(Ok(responses)); Ok(()) } diff --git a/core/src/client/async_client/manager.rs b/core/src/client/async_client/manager.rs index f638b80164..3d76b68011 100644 --- a/core/src/client/async_client/manager.rs +++ b/core/src/client/async_client/manager.rs @@ -32,10 +32,7 @@ //! > **Note**: The spec allow number, string or null but this crate only supports numbers. //! - SubscriptionId: unique ID generated by server -use std::{ - collections::{hash_map::Entry, HashMap}, - ops::Range, -}; +use std::collections::{HashMap, hash_map::Entry}; use crate::{ client::{BatchEntry, Error, SubscriptionReceiver, SubscriptionSender}, @@ -91,7 +88,7 @@ pub(crate) struct RequestManager { /// requests. subscriptions: HashMap, RequestId>, /// Pending batch requests. - batches: FxHashMap, BatchState>, + batches: FxHashMap>, BatchState>, /// Registered Methods for incoming notifications. notification_handlers: HashMap, } @@ -124,7 +121,7 @@ impl RequestManager { /// Returns `Ok` if the pending request was successfully inserted otherwise `Err`. pub(crate) fn insert_pending_batch( &mut self, - batch: Range, + batch: Vec>, send_back: PendingBatchOneshot, ) -> Result<(), PendingBatchOneshot> { if let Entry::Vacant(v) = self.batches.entry(batch) { @@ -223,14 +220,23 @@ impl RequestManager { /// Tries to complete a pending batch request. /// /// Returns `Some` if the subscription was completed otherwise `None`. - pub(crate) fn complete_pending_batch(&mut self, batch: Range) -> Option { - match self.batches.entry(batch) { - Entry::Occupied(request) => { - let (_digest, state) = request.remove_entry(); - Some(state) + pub(crate) fn complete_pending_batch(&mut self, batch: Vec>) -> Option { + let mut matched_key = None; + + for (key, _) in self.batches.iter() { + if key.len() == batch.len() && batch.iter().all(|id| key.contains(id)) { + matched_key = Some(key.clone()); + break; } - _ => None, } + + if let Some(key) = matched_key { + if let Some((_key, state)) = self.batches.remove_entry(&key) { + return Some(state); + } + } + + None } /// Tries to complete a pending call. diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 33fd067f1d..58a1ed86ee 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -532,17 +532,18 @@ impl ClientT for Client { R: DeserializeOwned, { let batch = batch.build()?; - let id_range = self.id_manager.generate_batch_id_range(batch.len()); + let mut ids = Vec::new(); let mut batches = Vec::with_capacity(batch.len()); for (method, params) in batch.into_iter() { let id = self.id_manager.next_request_id(); batches.push(RequestSer { jsonrpc: TwoPointZero, - id, + id: id.clone(), method: method.into(), params: params.map(StdCow::Owned), }); + ids.push(id); } let (send_back_tx, send_back_rx) = oneshot::channel(); @@ -554,7 +555,7 @@ impl ClientT for Client { if self .to_back .clone() - .send(FrontToBack::Batch(BatchMessage { raw, ids: id_range, send_back: send_back_tx })) + .send(FrontToBack::Batch(BatchMessage { raw, ids, send_back: send_back_tx })) .await .is_err() { @@ -728,28 +729,21 @@ fn handle_backend_messages( } Some(b'[') => { // Batch response. - if let Ok(raw_responses) = serde_json::from_slice::>(raw) { + if let Ok(raw_responses) = serde_json::from_slice::>>(raw) { let mut batch = Vec::with_capacity(raw_responses.len()); - let mut range = None; + let mut ids = Vec::new(); let mut got_notif = false; for r in raw_responses { - if let Ok(response) = serde_json::from_str::>(r.get()) { - let id = response.id.try_parse_inner_as_number()?; - let result = ResponseSuccess::try_from(response).map(|s| s.result); - batch.push(InnerBatchResponse { id, result }); - - let r = range.get_or_insert(id..id); - - if id < r.start { - r.start = id; - } + let json_string = r.get().to_string(); - if id > r.end { - r.end = id; - } - } else if let Ok(response) = serde_json::from_str::>(r.get()) { + if let Ok(response) = serde_json::from_str::>(&json_string) { + let id = response.id.clone().into_owned(); + let result = ResponseSuccess::try_from(response).map(|s| s.result); + batch.push(InnerBatchResponse { id: id.clone(), result }); + ids.push(id); + } else if let Ok(response) = serde_json::from_str::>(&json_string) { got_notif = true; if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) { messages.push(FrontToBack::SubscriptionClosed(sub_id)); @@ -757,7 +751,7 @@ fn handle_backend_messages( } else if let Ok(response) = serde_json::from_slice::>(raw) { got_notif = true; process_subscription_close_response(&mut manager.lock(), response); - } else if let Ok(notif) = serde_json::from_str::(r.get()) { + } else if let Ok(notif) = serde_json::from_str::(&json_string) { got_notif = true; process_notification(&mut manager.lock(), notif); } else { @@ -765,10 +759,8 @@ fn handle_backend_messages( }; } - if let Some(mut range) = range { - // the range is exclusive so need to add one. - range.end += 1; - process_batch_response(&mut manager.lock(), batch, range)?; + if ids.len().gt(&0) { + process_batch_response(&mut manager.lock(), batch, ids)?; } else if !got_notif { return Err(EmptyBatchRequest.into()); } diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index ea6919e747..65384a7faf 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -36,7 +36,6 @@ pub use error::Error; use jsonrpsee_types::request::IdGeneratorFn; use std::fmt; -use std::ops::Range; use std::pin::Pin; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; @@ -335,7 +334,7 @@ struct BatchMessage { /// Serialized batch request. raw: String, /// Request IDs. - ids: Range, + ids: Vec>, /// One-shot channel over which we send back the result of this request. send_back: oneshot::Sender>, Error>>, } @@ -487,12 +486,6 @@ impl RequestIdManager { pub fn as_id_kind(&self) -> IdKind { self.id_kind } - - /// Generate a range of IDs for a batch request. - pub fn generate_batch_id_range(&self, batch_size: usize) -> Range { - let start_id = self.current_id.get(); - start_id..(start_id + batch_size as u64) - } } /// JSON-RPC request object id data type. @@ -520,10 +513,6 @@ impl CurrentId { .try_into() .expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed") } - - fn get(&self) -> u64 { - self.0.load(Ordering::Relaxed) as u64 - } } /// Represent a single entry in a batch response. From 556ee8a8c4bcc085769f053e7e7b17fa83fde175 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Fri, 21 Mar 2025 11:20:01 +0100 Subject: [PATCH 6/9] add test with an unordered batch response --- client/http-client/src/tests.rs | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/client/http-client/src/tests.rs b/client/http-client/src/tests.rs index 5b620cf1b4..2c66b8adc8 100644 --- a/client/http-client/src/tests.rs +++ b/client/http-client/src/tests.rs @@ -257,6 +257,37 @@ async fn batch_request_out_of_order_response() { assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]); } +#[tokio::test] +async fn batch_multiple_requests_out_of_order_response() { + let mut batch_request = BatchRequestBuilder::new(); + batch_request.insert("say_hello", rpc_params![]).unwrap(); + batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap(); + batch_request.insert("get_swag", rpc_params![]).unwrap(); + batch_request.insert("test_echo", rpc_params!["fourth"]).unwrap(); + batch_request.insert("test_echo", rpc_params!["fifth"]).unwrap(); + let server_response = r#"[{"jsonrpc":"2.0","result":"fifth","id":4}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"fourth","id":3}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string(); + let res = run_batch_request_with_response::(batch_request, server_response) + .with_default_timeout() + .await + .unwrap() + .unwrap(); + assert_eq!(res.num_successful_calls(), 5); + assert_eq!(res.num_failed_calls(), 0); + assert_eq!(res.len(), 5); + let response: Vec<_> = res.into_ok().unwrap().collect(); + + assert_eq!( + response, + vec![ + "hello".to_string(), + "goodbye".to_string(), + "here's your swag".to_string(), + "fourth".to_string(), + "fifth".to_string() + ] + ); +} + async fn run_batch_request_with_response( batch: BatchRequestBuilder<'_>, response: String, From e4005ac2d566355cc7fe43cf8dec6e1e8ee0b5b6 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Fri, 21 Mar 2025 15:41:55 +0100 Subject: [PATCH 7/9] change test to use custom id generator --- client/http-client/src/tests.rs | 42 ++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/client/http-client/src/tests.rs b/client/http-client/src/tests.rs index 2c66b8adc8..78efbedca5 100644 --- a/client/http-client/src/tests.rs +++ b/client/http-client/src/tests.rs @@ -34,6 +34,7 @@ use jsonrpsee_test_utils::TimeoutFutureExt; use jsonrpsee_test_utils::helpers::*; use jsonrpsee_test_utils::mocks::Id; use jsonrpsee_types::error::ErrorObjectOwned; +use jsonrpsee_types::{Id as RequestId, IdGeneratorFn}; fn init_logger() { let _ = tracing_subscriber::FmtSubscriber::builder() @@ -258,34 +259,23 @@ async fn batch_request_out_of_order_response() { } #[tokio::test] -async fn batch_multiple_requests_out_of_order_response() { +async fn batch_request_with_custom_id_out_of_order_response() { let mut batch_request = BatchRequestBuilder::new(); batch_request.insert("say_hello", rpc_params![]).unwrap(); batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap(); batch_request.insert("get_swag", rpc_params![]).unwrap(); - batch_request.insert("test_echo", rpc_params!["fourth"]).unwrap(); - batch_request.insert("test_echo", rpc_params!["fifth"]).unwrap(); - let server_response = r#"[{"jsonrpc":"2.0","result":"fifth","id":4}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"fourth","id":3}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string(); - let res = run_batch_request_with_response::(batch_request, server_response) + let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string(); + let res = run_batch_request_with_custom_id::(batch_request, server_response, generate_predictable_id) .with_default_timeout() .await .unwrap() .unwrap(); - assert_eq!(res.num_successful_calls(), 5); + assert_eq!(res.num_successful_calls(), 3); assert_eq!(res.num_failed_calls(), 0); - assert_eq!(res.len(), 5); + assert_eq!(res.len(), 3); let response: Vec<_> = res.into_ok().unwrap().collect(); - assert_eq!( - response, - vec![ - "hello".to_string(), - "goodbye".to_string(), - "here's your swag".to_string(), - "fourth".to_string(), - "fifth".to_string() - ] - ); + assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string(),]); } async fn run_batch_request_with_response( @@ -313,3 +303,21 @@ fn assert_jsonrpc_error_response(err: ClientError, exp: ErrorObjectOwned) { e => panic!("Expected error: \"{err}\", got: {e:?}"), }; } + +async fn run_batch_request_with_custom_id( + batch: BatchRequestBuilder<'_>, + response: String, + id_generator: fn() -> RequestId<'static>, +) -> Result, ClientError> { + let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap(); + let uri = format!("http://{server_addr}"); + let client = + HttpClientBuilder::default().id_format(IdKind::Custom(IdGeneratorFn::new(id_generator))).build(&uri).unwrap(); + client.batch_request(batch).with_default_timeout().await.unwrap() +} + +fn generate_predictable_id() -> RequestId<'static> { + static COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0); + let id = COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + RequestId::Number(id.try_into().unwrap()) +} From ebae283c21e260606c13f2a6027569e740f36f93 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Mon, 14 Apr 2025 23:12:31 +0300 Subject: [PATCH 8/9] fix example --- .../examples/core_client_with_request_id.rs | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/examples/examples/core_client_with_request_id.rs b/examples/examples/core_client_with_request_id.rs index f07a13b4f6..0e5b4b72bc 100644 --- a/examples/examples/core_client_with_request_id.rs +++ b/examples/examples/core_client_with_request_id.rs @@ -27,28 +27,27 @@ use std::net::SocketAddr; use jsonrpsee::client_transport::ws::{Url, WsTransportClientBuilder}; -use jsonrpsee::core::client::{Client, ClientBuilder, ClientT, IdKind}; +use jsonrpsee::core::client::{ClientBuilder, ClientT, IdKind}; use jsonrpsee::rpc_params; use jsonrpsee::server::{RpcModule, Server}; use jsonrpsee::types::{Id, IdGeneratorFn}; #[tokio::main] async fn main() -> anyhow::Result<()> { - // tracing_subscriber::FmtSubscriber::builder() - // .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - // .try_init() - // .expect("setting default subscriber failed"); + tracing_subscriber::FmtSubscriber::builder() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .try_init() + .expect("setting default subscriber failed"); - // let addr = run_server().await?; - // let uri = Url::parse(&format!("ws://{}", addr))?; + let addr = run_server().await?; + let uri = Url::parse(&format!("ws://{}", addr))?; - // let custom_generator = IdGeneratorFn::new(generate_timestamp_id); + let custom_generator = IdGeneratorFn::new(generate_timestamp_id); - // let (tx, rx) = WsTransportClientBuilder::default().build(uri).await?; - // let client: Client = ClientBuilder::default().id_format(IdKind::Custom(custom_generator)).build_with_tokio(tx, rx); - - // let response: String = client.request("say_hello", rpc_params![]).await?; - // tracing::info!("response: {:?}", response); + let (tx, rx) = WsTransportClientBuilder::default().build(uri).await?; + let client = ClientBuilder::default().id_format(IdKind::Custom(custom_generator)).build_with_tokio(tx, rx); + let response: String = client.request("say_hello", rpc_params![]).await?; + tracing::info!("response: {:?}", response); Ok(()) } From 607226a83f029e5f28b1d95e82d661f89a745616 Mon Sep 17 00:00:00 2001 From: PanGan21 Date: Tue, 15 Apr 2025 10:10:47 +0300 Subject: [PATCH 9/9] refactor batches from hashmap to vec --- core/src/client/async_client/manager.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/core/src/client/async_client/manager.rs b/core/src/client/async_client/manager.rs index 0f786c60b0..31cf934878 100644 --- a/core/src/client/async_client/manager.rs +++ b/core/src/client/async_client/manager.rs @@ -87,7 +87,7 @@ pub(crate) struct RequestManager { /// requests. subscriptions: HashMap, RequestId>, /// Pending batch requests. - batches: FxHashMap>, BatchState>, + batches: Vec<(Vec>, BatchState)>, /// Registered Methods for incoming notifications. notification_handlers: HashMap, } @@ -123,12 +123,12 @@ impl RequestManager { batch: Vec>, send_back: PendingBatchOneshot, ) -> Result<(), PendingBatchOneshot> { - if let Entry::Vacant(v) = self.batches.entry(batch) { - v.insert(BatchState { send_back }); - Ok(()) - } else { - Err(send_back) + if self.batches.iter().any(|(existing_batch, _)| existing_batch == &batch) { + return Err(send_back); } + + self.batches.push((batch, BatchState { send_back })); + Ok(()) } /// Tries to insert a new pending subscription and reserves a slot for a "potential" unsubscription request. @@ -230,7 +230,8 @@ impl RequestManager { } if let Some(key) = matched_key { - if let Some((_key, state)) = self.batches.remove_entry(&key) { + if let Some(pos) = self.batches.iter().position(|(existing_batch, _)| existing_batch == &key) { + let (_, state) = self.batches.remove(pos); return Some(state); } }