Skip to content
Open
9 changes: 4 additions & 5 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use async_trait::async_trait;
use hyper::body::Bytes;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{
BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, generate_batch_id_range,
BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
Expand Down Expand Up @@ -414,12 +414,11 @@ where
None => None,
};
let batch = batch.build()?;
let id = self.id_manager.next_request_id();
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
let id_range = self.id_manager.generate_batch_id_range(batch.len());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey again,

So it doesn't make sense to have this id_range anymore because we are using it to check whether all
requests in a batch was replied to.

Now in the code the actual ID is fetched from the id_manager, thus it's not necessarily true that the id_range and self.id_manager.next_request_id() contains the same ids.

Additionally, because it's possible to use a custom id generator we can't really on these ID ranges anymore anyway.

I reckon that we need to remove the generate_batch_id_range completely and just push the generated IDs to a Vec such as:

// this will contain the order of the request IDs in the batch
let mut ids = Vec::new();

for (method, params) in batch.into_iter() {
     let id = self.id_manager.next_request_id();
	 batch_request.push(RequestSer {
              jsonrpc: TwoPointZero,
		      id,
			  method: method.into(),
			  params: params.map(StdCow::Owned),
	});
}

// use ids to check whether to response has responded to all ids in the batch

The reason why we need this is because a server can submit respond to each call in the batch in arbitrary order.

The Response objects being returned from a batch call MAY be returned in any order within the Array. The Client SHOULD match contexts between the set of Request objects and the resulting set of Response objects based on the id member within each Object.

Copy link
Contributor

@niklasad1 niklasad1 Mar 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual logic to match a batch response to the batch request must be also be changed because one assumption is that the every ID is generated from a integer and can be parsed as one which is used to index into the response.

One extra test with an unordered batch response would be great to make sure everything works as intended.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback - makes sense.
According to that, the usage of Range type in BatchMessage and in related functions, should also be changed right? It should probably be replaces with something like Vec<Id<'static>>.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, correct

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI the refactor is in progress but I am facing some issues with the lifetimes. I need to either use Vec<Id<'a>> which means that a lifetime parameter will be added across a big part of the client or use Vec<Id<'static>> which is always not optimal.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just do the naive implementation now with Vec<Id<'static>> the default one is still number and cloning it should be "cheap".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated and test added


let mut batch_request = Vec::with_capacity(batch.len());
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
for (method, params) in batch.into_iter() {
let id = self.id_manager.next_request_id();
batch_request.push(RequestSer {
jsonrpc: TwoPointZero,
id,
Expand Down
9 changes: 4 additions & 5 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use tokio::sync::{mpsc, oneshot};
use tracing::instrument;

use self::utils::{InactivityCheck, IntervalStream};
use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};
use super::{FrontToBack, IdKind, RequestIdManager, subscription_channel};

pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<serde_json::Value>>;

Expand Down Expand Up @@ -532,12 +532,11 @@ impl ClientT for Client {
R: DeserializeOwned,
{
let batch = batch.build()?;
let id = self.id_manager.next_request_id();
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
let id_range = self.id_manager.generate_batch_id_range(batch.len());

let mut batches = Vec::with_capacity(batch.len());
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
for (method, params) in batch.into_iter() {
let id = self.id_manager.next_request_id();
batches.push(RequestSer {
jsonrpc: TwoPointZero,
id,
Expand Down
43 changes: 23 additions & 20 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ cfg_async_client! {

pub mod error;
pub use error::Error;
use jsonrpsee_types::request::IdGeneratorFn;

use std::fmt;
use std::ops::Range;
Expand Down Expand Up @@ -469,13 +470,29 @@ impl RequestIdManager {

/// Attempts to get the next request ID.
pub fn next_request_id(&self) -> Id<'static> {
self.id_kind.into_id(self.current_id.next())
match self.id_kind {
IdKind::Number => {
let id = self.current_id.next();
Id::Number(id)
}
IdKind::String => {
let id = self.current_id.next();
Id::Str(format!("{id}").into())
}
IdKind::Custom(generator) => generator.call(),
}
}

/// Get a handle to the `IdKind`.
pub fn as_id_kind(&self) -> IdKind {
self.id_kind
}

/// Generate a range of IDs for a batch request.
pub fn generate_batch_id_range(&self, batch_size: usize) -> Range<u64> {
let start_id = self.current_id.get();
start_id..(start_id + batch_size as u64)
}
}

/// JSON-RPC request object id data type.
Expand All @@ -485,16 +502,8 @@ pub enum IdKind {
String,
/// Number.
Number,
}

impl IdKind {
/// Generate an `Id` from number.
pub fn into_id(self, id: u64) -> Id<'static> {
match self {
IdKind::Number => Id::Number(id),
IdKind::String => Id::Str(format!("{id}").into()),
}
}
/// Custom generator.
Custom(IdGeneratorFn),
}

#[derive(Debug)]
Expand All @@ -511,16 +520,10 @@ impl CurrentId {
.try_into()
.expect("usize -> u64 infallible, there are no CPUs > 64 bits; qed")
}
}

/// Generate a range of IDs to be used in a batch request.
pub fn generate_batch_id_range(id: Id, len: u64) -> Result<Range<u64>, Error> {
let id_start = id.try_parse_inner_as_number()?;
let id_end = id_start
.checked_add(len)
.ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;

Ok(id_start..id_end)
fn get(&self) -> u64 {
self.0.load(Ordering::Relaxed) as u64
}
}

/// Represent a single entry in a batch response.
Expand Down
75 changes: 75 additions & 0 deletions examples/examples/core_client_with_request_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// Copyright 2019-2021 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any
// person obtaining a copy of this software and associated
// documentation files (the "Software"), to deal in the
// Software without restriction, including without
// limitation the rights to use, copy, modify, merge,
// publish, distribute, sublicense, and/or sell copies of
// the Software, and to permit persons to whom the Software
// is furnished to do so, subject to the following
// conditions:
//
// The above copyright notice and this permission notice
// shall be included in all copies or substantial portions
// of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
// ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
// TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
// PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
// SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
// CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
// OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::net::SocketAddr;

use jsonrpsee::client_transport::ws::{Url, WsTransportClientBuilder};
use jsonrpsee::core::client::{Client, ClientBuilder, ClientT, IdKind};
use jsonrpsee::rpc_params;
use jsonrpsee::server::{RpcModule, Server};
use jsonrpsee::types::{Id, IdGeneratorFn};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::FmtSubscriber::builder()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.try_init()
.expect("setting default subscriber failed");

let addr = run_server().await?;
let uri = Url::parse(&format!("ws://{}", addr))?;

let custom_generator = IdGeneratorFn::new(generate_timestamp_id);

let (tx, rx) = WsTransportClientBuilder::default().build(uri).await?;
let client: Client = ClientBuilder::default().id_format(IdKind::Custom(custom_generator)).build_with_tokio(tx, rx);

let response: String = client.request("say_hello", rpc_params![]).await?;
tracing::info!("response: {:?}", response);

Ok(())
}

async fn run_server() -> anyhow::Result<SocketAddr> {
let server = Server::builder().build("127.0.0.1:0").await?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _, _| "lo")?;
let addr = server.local_addr()?;

let handle = server.start(module);

// In this example we don't care about doing shutdown so let's it run forever.
// You may use the `ServerHandle` to shut it down or manage it yourself.
tokio::spawn(handle.stopped());

Ok(addr)
}

fn generate_timestamp_id() -> Id<'static> {
let timestamp =
std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).expect("Time went backwards").as_secs();
Id::Number(timestamp)
}
2 changes: 1 addition & 1 deletion types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ pub mod error;

pub use error::{ErrorCode, ErrorObject, ErrorObjectOwned};
pub use params::{Id, InvalidRequestId, Params, ParamsSequence, SubscriptionId, TwoPointZero};
pub use request::{InvalidRequest, Notification, NotificationSer, Request, RequestSer};
pub use request::{IdGeneratorFn, InvalidRequest, Notification, NotificationSer, Request, RequestSer};
pub use response::{Response, ResponsePayload, SubscriptionPayload, SubscriptionResponse, Success as ResponseSuccess};
34 changes: 33 additions & 1 deletion types/src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@
//! Types to handle JSON-RPC requests according to the [spec](https://www.jsonrpc.org/specification#request-object).
//! Some types come with a "*Ser" variant that implements [`serde::Serialize`]; these are used in the client.

use std::borrow::Cow;
use std::{
borrow::Cow,
fmt::{Debug, Formatter, Result},
};

use crate::{
Params,
Expand Down Expand Up @@ -173,6 +176,35 @@ impl<'a> NotificationSer<'a> {
}
}

/// Custom id generator function
pub struct IdGeneratorFn(fn() -> Id<'static>);

impl IdGeneratorFn {
/// Creates a new `IdGeneratorFn` from a function pointer.
pub fn new(generator: fn() -> Id<'static>) -> Self {
IdGeneratorFn(generator)
}

/// Calls the id generator function
pub fn call(&self) -> Id<'static> {
(self.0)()
}
}

impl Copy for IdGeneratorFn {}

impl Clone for IdGeneratorFn {
fn clone(&self) -> Self {
*self
}
}

impl Debug for IdGeneratorFn {
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
f.write_str("<CustomIdGenerator>")
}
}

#[cfg(test)]
mod test {
use super::{Cow, Id, InvalidRequest, Notification, NotificationSer, Request, RequestSer, TwoPointZero};
Expand Down