Skip to content
Open
23 changes: 9 additions & 14 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use async_trait::async_trait;
use hyper::body::Bytes;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{
BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, generate_batch_id_range,
BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
Expand Down Expand Up @@ -414,18 +414,18 @@ where
None => None,
};
let batch = batch.build()?;
let id = self.id_manager.next_request_id();
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
let mut ids = Vec::new();

let mut batch_request = Vec::with_capacity(batch.len());
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
for (method, params) in batch.into_iter() {
let id = self.id_manager.next_request_id();
batch_request.push(RequestSer {
jsonrpc: TwoPointZero,
id,
id: id.clone(),
method: method.into(),
params: params.map(StdCow::Owned),
});
ids.push(id);
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
Expand All @@ -447,7 +447,7 @@ where
}

for rp in json_rps {
let id = rp.id.try_parse_inner_as_number()?;
let id = rp.id.clone();

let res = match ResponseSuccess::try_from(rp) {
Ok(r) => {
Expand All @@ -461,13 +461,8 @@ where
}
};

let maybe_elem = id
.checked_sub(id_range.start)
.and_then(|p| p.try_into().ok())
.and_then(|p: usize| responses.get_mut(p));

if let Some(elem) = maybe_elem {
*elem = res;
if let Some(pos) = ids.iter().position(|stored_id| stored_id == &id) {
responses[pos] = res;
} else {
return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
}
Expand Down
39 changes: 39 additions & 0 deletions client/http-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use jsonrpsee_test_utils::TimeoutFutureExt;
use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::Id;
use jsonrpsee_types::error::ErrorObjectOwned;
use jsonrpsee_types::{Id as RequestId, IdGeneratorFn};

fn init_logger() {
let _ = tracing_subscriber::FmtSubscriber::builder()
Expand Down Expand Up @@ -257,6 +258,26 @@ async fn batch_request_out_of_order_response() {
assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string()]);
}

#[tokio::test]
async fn batch_request_with_custom_id_out_of_order_response() {
let mut batch_request = BatchRequestBuilder::new();
batch_request.insert("say_hello", rpc_params![]).unwrap();
batch_request.insert("say_goodbye", rpc_params![0_u64, 1, 2]).unwrap();
batch_request.insert("get_swag", rpc_params![]).unwrap();
let server_response = r#"[{"jsonrpc":"2.0","result":"here's your swag","id":2}, {"jsonrpc":"2.0","result":"hello","id":0}, {"jsonrpc":"2.0","result":"goodbye","id":1}]"#.to_string();
let res = run_batch_request_with_custom_id::<String>(batch_request, server_response, generate_predictable_id)
.with_default_timeout()
.await
.unwrap()
.unwrap();
assert_eq!(res.num_successful_calls(), 3);
assert_eq!(res.num_failed_calls(), 0);
assert_eq!(res.len(), 3);
let response: Vec<_> = res.into_ok().unwrap().collect();

assert_eq!(response, vec!["hello".to_string(), "goodbye".to_string(), "here's your swag".to_string(),]);
}

async fn run_batch_request_with_response<T: Send + DeserializeOwned + std::fmt::Debug + Clone + 'static>(
batch: BatchRequestBuilder<'_>,
response: String,
Expand All @@ -282,3 +303,21 @@ fn assert_jsonrpc_error_response(err: ClientError, exp: ErrorObjectOwned) {
e => panic!("Expected error: \"{err}\", got: {e:?}"),
};
}

async fn run_batch_request_with_custom_id<T: Send + DeserializeOwned + std::fmt::Debug + Clone + 'static>(
batch: BatchRequestBuilder<'_>,
response: String,
id_generator: fn() -> RequestId<'static>,
) -> Result<BatchResponse<T>, ClientError> {
let server_addr = http_server_with_hardcoded_response(response).with_default_timeout().await.unwrap();
let uri = format!("http://{server_addr}");
let client =
HttpClientBuilder::default().id_format(IdKind::Custom(IdGeneratorFn::new(id_generator))).build(&uri).unwrap();
client.batch_request(batch).with_default_timeout().await.unwrap()
}

fn generate_predictable_id() -> RequestId<'static> {
static COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
let id = COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
RequestId::Number(id.try_into().unwrap())
}
30 changes: 12 additions & 18 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::collections::BTreeMap;

use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::async_client::{Notification, LOG_TARGET};
use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError};
Expand All @@ -39,11 +41,10 @@ use jsonrpsee_types::{
ErrorObject, Id, InvalidRequestId, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
};
use serde_json::Value as JsonValue;
use std::ops::Range;

#[derive(Debug, Clone)]
pub(crate) struct InnerBatchResponse {
pub(crate) id: u64,
pub(crate) id: Id<'static>,
pub(crate) result: Result<JsonValue, ErrorObject<'static>>,
}

Expand All @@ -53,36 +54,29 @@ pub(crate) struct InnerBatchResponse {
pub(crate) fn process_batch_response(
manager: &mut RequestManager,
rps: Vec<InnerBatchResponse>,
range: Range<u64>,
ids: Vec<Id<'static>>,
) -> Result<(), InvalidRequestId> {
let mut responses = Vec::with_capacity(rps.len());

let start_idx = range.start;

let batch_state = match manager.complete_pending_batch(range.clone()) {
let batch_state = match manager.complete_pending_batch(ids.clone()) {
Some(state) => state,
None => {
tracing::debug!(target: LOG_TARGET, "Received unknown batch response");
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)));
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", ids)));
}
};

for _ in range {
let err_obj = ErrorObject::borrowed(0, "", None);
responses.push(Err(err_obj));
}
let mut responses_map: BTreeMap<Id<'static>, Result<_, ErrorObject>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use a BTreeMap here because there is nothing that prevents an ID (duplicate ids) to be used more than once with a custom ID generator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

ids.iter().map(|id| (id.clone(), Err(ErrorObject::borrowed(0, "", None)))).collect();

for rp in rps {
let maybe_elem =
rp.id.checked_sub(start_idx).and_then(|p| p.try_into().ok()).and_then(|p: usize| responses.get_mut(p));

if let Some(elem) = maybe_elem {
*elem = rp.result;
if let Some(entry) = responses_map.get_mut(&rp.id) {
*entry = rp.result;
} else {
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()));
}
}

let responses: Vec<_> = responses_map.into_values().collect();

let _ = batch_state.send_back.send(Ok(responses));
Ok(())
}
Expand Down
30 changes: 18 additions & 12 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
//! > **Note**: The spec allow number, string or null but this crate only supports numbers.
//! - SubscriptionId: unique ID generated by server

use std::{
collections::{hash_map::Entry, HashMap},
ops::Range,
};
use std::collections::{HashMap, hash_map::Entry};

use crate::{
client::{BatchEntry, Error, SubscriptionReceiver, SubscriptionSender},
Expand Down Expand Up @@ -91,7 +88,7 @@ pub(crate) struct RequestManager {
/// requests.
subscriptions: HashMap<SubscriptionId<'static>, RequestId>,
/// Pending batch requests.
batches: FxHashMap<Range<u64>, BatchState>,
batches: FxHashMap<Vec<Id<'static>>, BatchState>,
/// Registered Methods for incoming notifications.
notification_handlers: HashMap<String, SubscriptionSink>,
}
Expand Down Expand Up @@ -124,7 +121,7 @@ impl RequestManager {
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
pub(crate) fn insert_pending_batch(
&mut self,
batch: Range<u64>,
batch: Vec<Id<'static>>,
send_back: PendingBatchOneshot,
) -> Result<(), PendingBatchOneshot> {
if let Entry::Vacant(v) = self.batches.entry(batch) {
Expand Down Expand Up @@ -223,14 +220,23 @@ impl RequestManager {
/// Tries to complete a pending batch request.
///
/// Returns `Some` if the subscription was completed otherwise `None`.
pub(crate) fn complete_pending_batch(&mut self, batch: Range<u64>) -> Option<BatchState> {
match self.batches.entry(batch) {
Entry::Occupied(request) => {
let (_digest, state) = request.remove_entry();
Some(state)
pub(crate) fn complete_pending_batch(&mut self, batch: Vec<Id<'static>>) -> Option<BatchState> {
let mut matched_key = None;

for (key, _) in self.batches.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is inefficient/annoying compared to the old code 🤔

if key.len() == batch.len() && batch.iter().all(|id| key.contains(id)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could just use Vec partialeq/eq impl here.

matched_key = Some(key.clone());
break;
}
_ => None,
}

if let Some(key) = matched_key {
if let Some((_key, state)) = self.batches.remove_entry(&key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to use a hashmap here anymore, it should most likely be Vec<Vec<Id<'static'>> or something. Then just use do Vec::retain to remove the completed batch

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Not using retain but a similar approach. Let me know if that works for you

return Some(state);
}
}

None
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely this could improved if a different data structure is used to store batches on RequestManager

}

/// Tries to complete a pending call.
Expand Down
47 changes: 19 additions & 28 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use tokio::sync::{mpsc, oneshot};
use tracing::instrument;

use self::utils::{InactivityCheck, IntervalStream};
use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};
use super::{FrontToBack, IdKind, RequestIdManager, subscription_channel};

pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<serde_json::Value>>;

Expand Down Expand Up @@ -532,18 +532,18 @@ impl ClientT for Client {
R: DeserializeOwned,
{
let batch = batch.build()?;
let id = self.id_manager.next_request_id();
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
let mut ids = Vec::new();

let mut batches = Vec::with_capacity(batch.len());
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
for (method, params) in batch.into_iter() {
let id = self.id_manager.next_request_id();
batches.push(RequestSer {
jsonrpc: TwoPointZero,
id,
id: id.clone(),
method: method.into(),
params: params.map(StdCow::Owned),
});
ids.push(id);
}

let (send_back_tx, send_back_rx) = oneshot::channel();
Expand All @@ -555,7 +555,7 @@ impl ClientT for Client {
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: id_range, send_back: send_back_tx }))
.send(FrontToBack::Batch(BatchMessage { raw, ids, send_back: send_back_tx }))
.await
.is_err()
{
Expand Down Expand Up @@ -729,47 +729,38 @@ fn handle_backend_messages<R: TransportReceiverT>(
}
Some(b'[') => {
// Batch response.
if let Ok(raw_responses) = serde_json::from_slice::<Vec<&JsonRawValue>>(raw) {
if let Ok(raw_responses) = serde_json::from_slice::<Vec<Box<JsonRawValue>>>(raw) {
let mut batch = Vec::with_capacity(raw_responses.len());

let mut range = None;
let mut ids = Vec::new();
let mut got_notif = false;

for r in raw_responses {
if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
let id = response.id.try_parse_inner_as_number()?;
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id, result });
let json_string = r.get().to_string();

let r = range.get_or_insert(id..id);

if id < r.start {
r.start = id;
}

if id > r.end {
r.end = id;
}
} else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(r.get()) {
if let Ok(response) = serde_json::from_str::<Response<_>>(&json_string) {
let id = response.id.clone().into_owned();
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id: id.clone(), result });
ids.push(id);
} else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(&json_string) {
got_notif = true;
if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
messages.push(FrontToBack::SubscriptionClosed(sub_id));
}
} else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
got_notif = true;
process_subscription_close_response(&mut manager.lock(), response);
} else if let Ok(notif) = serde_json::from_str::<Notification>(r.get()) {
} else if let Ok(notif) = serde_json::from_str::<Notification>(&json_string) {
got_notif = true;
process_notification(&mut manager.lock(), notif);
} else {
return Err(unparse_error(raw));
};
}

if let Some(mut range) = range {
// the range is exclusive so need to add one.
range.end += 1;
process_batch_response(&mut manager.lock(), batch, range)?;
if ids.len().gt(&0) {
process_batch_response(&mut manager.lock(), batch, ids)?;
} else if !got_notif {
return Err(EmptyBatchRequest.into());
}
Expand Down
Loading