Skip to content
Open
23 changes: 9 additions & 14 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use async_trait::async_trait;
use hyper::body::Bytes;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{
BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT, generate_batch_id_range,
BatchResponse, ClientT, Error, IdKind, RequestIdManager, Subscription, SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
Expand Down Expand Up @@ -414,18 +414,18 @@ where
None => None,
};
let batch = batch.build()?;
let id = self.id_manager.next_request_id();
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
let mut ids = Vec::new();

let mut batch_request = Vec::with_capacity(batch.len());
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
for (method, params) in batch.into_iter() {
let id = self.id_manager.next_request_id();
batch_request.push(RequestSer {
jsonrpc: TwoPointZero,
id,
id: id.clone(),
method: method.into(),
params: params.map(StdCow::Owned),
});
ids.push(id);
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
Expand All @@ -447,7 +447,7 @@ where
}

for rp in json_rps {
let id = rp.id.try_parse_inner_as_number()?;
let id = rp.id.clone();

let res = match ResponseSuccess::try_from(rp) {
Ok(r) => {
Expand All @@ -461,13 +461,8 @@ where
}
};

let maybe_elem = id
.checked_sub(id_range.start)
.and_then(|p| p.try_into().ok())
.and_then(|p: usize| responses.get_mut(p));

if let Some(elem) = maybe_elem {
*elem = res;
if let Some(pos) = ids.iter().position(|stored_id| stored_id == &id) {
responses[pos] = res;
} else {
return Err(InvalidRequestId::NotPendingRequest(id.to_string()).into());
}
Expand Down
30 changes: 12 additions & 18 deletions core/src/client/async_client/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::collections::BTreeMap;

use crate::client::async_client::manager::{RequestManager, RequestStatus};
use crate::client::async_client::{Notification, LOG_TARGET};
use crate::client::{subscription_channel, Error, RequestMessage, TransportSenderT, TrySubscriptionSendError};
Expand All @@ -39,11 +41,10 @@ use jsonrpsee_types::{
ErrorObject, Id, InvalidRequestId, RequestSer, Response, ResponseSuccess, SubscriptionId, SubscriptionResponse,
};
use serde_json::Value as JsonValue;
use std::ops::Range;

#[derive(Debug, Clone)]
pub(crate) struct InnerBatchResponse {
pub(crate) id: u64,
pub(crate) id: Id<'static>,
pub(crate) result: Result<JsonValue, ErrorObject<'static>>,
}

Expand All @@ -53,36 +54,29 @@ pub(crate) struct InnerBatchResponse {
pub(crate) fn process_batch_response(
manager: &mut RequestManager,
rps: Vec<InnerBatchResponse>,
range: Range<u64>,
ids: Vec<Id<'static>>,
) -> Result<(), InvalidRequestId> {
let mut responses = Vec::with_capacity(rps.len());

let start_idx = range.start;

let batch_state = match manager.complete_pending_batch(range.clone()) {
let batch_state = match manager.complete_pending_batch(ids.clone()) {
Some(state) => state,
None => {
tracing::debug!(target: LOG_TARGET, "Received unknown batch response");
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", range)));
return Err(InvalidRequestId::NotPendingRequest(format!("{:?}", ids)));
}
};

for _ in range {
let err_obj = ErrorObject::borrowed(0, "", None);
responses.push(Err(err_obj));
}
let mut responses_map: BTreeMap<Id<'static>, Result<_, ErrorObject>> =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use a BTreeMap here because there is nothing that prevents an ID (duplicate ids) to be used more than once with a custom ID generator

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

ids.iter().map(|id| (id.clone(), Err(ErrorObject::borrowed(0, "", None)))).collect();

for rp in rps {
let maybe_elem =
rp.id.checked_sub(start_idx).and_then(|p| p.try_into().ok()).and_then(|p: usize| responses.get_mut(p));

if let Some(elem) = maybe_elem {
*elem = rp.result;
if let Some(entry) = responses_map.get_mut(&rp.id) {
*entry = rp.result;
} else {
return Err(InvalidRequestId::NotPendingRequest(rp.id.to_string()));
}
}

let responses: Vec<_> = responses_map.into_values().collect();

let _ = batch_state.send_back.send(Ok(responses));
Ok(())
}
Expand Down
30 changes: 18 additions & 12 deletions core/src/client/async_client/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@
//! > **Note**: The spec allow number, string or null but this crate only supports numbers.
//! - SubscriptionId: unique ID generated by server

use std::{
collections::{hash_map::Entry, HashMap},
ops::Range,
};
use std::collections::{HashMap, hash_map::Entry};

use crate::{
client::{BatchEntry, Error, SubscriptionReceiver, SubscriptionSender},
Expand Down Expand Up @@ -91,7 +88,7 @@ pub(crate) struct RequestManager {
/// requests.
subscriptions: HashMap<SubscriptionId<'static>, RequestId>,
/// Pending batch requests.
batches: FxHashMap<Range<u64>, BatchState>,
batches: FxHashMap<Vec<Id<'static>>, BatchState>,
/// Registered Methods for incoming notifications.
notification_handlers: HashMap<String, SubscriptionSink>,
}
Expand Down Expand Up @@ -124,7 +121,7 @@ impl RequestManager {
/// Returns `Ok` if the pending request was successfully inserted otherwise `Err`.
pub(crate) fn insert_pending_batch(
&mut self,
batch: Range<u64>,
batch: Vec<Id<'static>>,
send_back: PendingBatchOneshot,
) -> Result<(), PendingBatchOneshot> {
if let Entry::Vacant(v) = self.batches.entry(batch) {
Expand Down Expand Up @@ -223,14 +220,23 @@ impl RequestManager {
/// Tries to complete a pending batch request.
///
/// Returns `Some` if the subscription was completed otherwise `None`.
pub(crate) fn complete_pending_batch(&mut self, batch: Range<u64>) -> Option<BatchState> {
match self.batches.entry(batch) {
Entry::Occupied(request) => {
let (_digest, state) = request.remove_entry();
Some(state)
pub(crate) fn complete_pending_batch(&mut self, batch: Vec<Id<'static>>) -> Option<BatchState> {
let mut matched_key = None;

for (key, _) in self.batches.iter() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is inefficient/annoying compared to the old code 🤔

if key.len() == batch.len() && batch.iter().all(|id| key.contains(id)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could just use Vec partialeq/eq impl here.

matched_key = Some(key.clone());
break;
}
_ => None,
}

if let Some(key) = matched_key {
if let Some((_key, state)) = self.batches.remove_entry(&key) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't make sense to use a hashmap here anymore, it should most likely be Vec<Vec<Id<'static'>> or something. Then just use do Vec::retain to remove the completed batch

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Not using retain but a similar approach. Let me know if that works for you

return Some(state);
}
}

None
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely this could improved if a different data structure is used to store batches on RequestManager

}

/// Tries to complete a pending call.
Expand Down
47 changes: 19 additions & 28 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use tokio::sync::{mpsc, oneshot};
use tracing::instrument;

use self::utils::{InactivityCheck, IntervalStream};
use super::{generate_batch_id_range, subscription_channel, FrontToBack, IdKind, RequestIdManager};
use super::{FrontToBack, IdKind, RequestIdManager, subscription_channel};

pub(crate) type Notification<'a> = jsonrpsee_types::Notification<'a, Option<serde_json::Value>>;

Expand Down Expand Up @@ -532,18 +532,18 @@ impl ClientT for Client {
R: DeserializeOwned,
{
let batch = batch.build()?;
let id = self.id_manager.next_request_id();
let id_range = generate_batch_id_range(id, batch.len() as u64)?;
let mut ids = Vec::new();

let mut batches = Vec::with_capacity(batch.len());
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
for (method, params) in batch.into_iter() {
let id = self.id_manager.next_request_id();
batches.push(RequestSer {
jsonrpc: TwoPointZero,
id,
id: id.clone(),
method: method.into(),
params: params.map(StdCow::Owned),
});
ids.push(id);
}

let (send_back_tx, send_back_rx) = oneshot::channel();
Expand All @@ -555,7 +555,7 @@ impl ClientT for Client {
if self
.to_back
.clone()
.send(FrontToBack::Batch(BatchMessage { raw, ids: id_range, send_back: send_back_tx }))
.send(FrontToBack::Batch(BatchMessage { raw, ids, send_back: send_back_tx }))
.await
.is_err()
{
Expand Down Expand Up @@ -729,47 +729,38 @@ fn handle_backend_messages<R: TransportReceiverT>(
}
Some(b'[') => {
// Batch response.
if let Ok(raw_responses) = serde_json::from_slice::<Vec<&JsonRawValue>>(raw) {
if let Ok(raw_responses) = serde_json::from_slice::<Vec<Box<JsonRawValue>>>(raw) {
let mut batch = Vec::with_capacity(raw_responses.len());

let mut range = None;
let mut ids = Vec::new();
let mut got_notif = false;

for r in raw_responses {
if let Ok(response) = serde_json::from_str::<Response<_>>(r.get()) {
let id = response.id.try_parse_inner_as_number()?;
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id, result });
let json_string = r.get().to_string();

let r = range.get_or_insert(id..id);

if id < r.start {
r.start = id;
}

if id > r.end {
r.end = id;
}
} else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(r.get()) {
if let Ok(response) = serde_json::from_str::<Response<_>>(&json_string) {
let id = response.id.clone().into_owned();
let result = ResponseSuccess::try_from(response).map(|s| s.result);
batch.push(InnerBatchResponse { id: id.clone(), result });
ids.push(id);
} else if let Ok(response) = serde_json::from_str::<SubscriptionResponse<_>>(&json_string) {
got_notif = true;
if let Some(sub_id) = process_subscription_response(&mut manager.lock(), response) {
messages.push(FrontToBack::SubscriptionClosed(sub_id));
}
} else if let Ok(response) = serde_json::from_slice::<SubscriptionError<_>>(raw) {
got_notif = true;
process_subscription_close_response(&mut manager.lock(), response);
} else if let Ok(notif) = serde_json::from_str::<Notification>(r.get()) {
} else if let Ok(notif) = serde_json::from_str::<Notification>(&json_string) {
got_notif = true;
process_notification(&mut manager.lock(), notif);
} else {
return Err(unparse_error(raw));
};
}

if let Some(mut range) = range {
// the range is exclusive so need to add one.
range.end += 1;
process_batch_response(&mut manager.lock(), batch, range)?;
if ids.len().gt(&0) {
process_batch_response(&mut manager.lock(), batch, ids)?;
} else if !got_notif {
return Err(EmptyBatchRequest.into());
}
Expand Down
38 changes: 15 additions & 23 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ cfg_async_client! {

pub mod error;
pub use error::Error;
use jsonrpsee_types::request::IdGeneratorFn;

use std::fmt;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -334,7 +334,7 @@ struct BatchMessage {
/// Serialized batch request.
raw: String,
/// Request IDs.
ids: Range<u64>,
ids: Vec<Id<'static>>,
/// One-shot channel over which we send back the result of this request.
send_back: oneshot::Sender<Result<Vec<BatchEntry<'static, JsonValue>>, Error>>,
}
Expand Down Expand Up @@ -469,7 +469,17 @@ impl RequestIdManager {

/// Attempts to get the next request ID.
pub fn next_request_id(&self) -> Id<'static> {
self.id_kind.into_id(self.current_id.next())
match self.id_kind {
IdKind::Number => {
let id = self.current_id.next();
Id::Number(id)
}
IdKind::String => {
let id = self.current_id.next();
Id::Str(format!("{id}").into())
}
IdKind::Custom(generator) => generator.call(),
}
}

/// Get a handle to the `IdKind`.
Expand All @@ -485,16 +495,8 @@ pub enum IdKind {
String,
/// Number.
Number,
}

impl IdKind {
/// Generate an `Id` from number.
pub fn into_id(self, id: u64) -> Id<'static> {
match self {
IdKind::Number => Id::Number(id),
IdKind::String => Id::Str(format!("{id}").into()),
}
}
/// Custom generator.
Custom(IdGeneratorFn),
}

#[derive(Debug)]
Expand All @@ -513,16 +515,6 @@ impl CurrentId {
}
}

/// Generate a range of IDs to be used in a batch request.
pub fn generate_batch_id_range(id: Id, len: u64) -> Result<Range<u64>, Error> {
let id_start = id.try_parse_inner_as_number()?;
let id_end = id_start
.checked_add(len)
.ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?;

Ok(id_start..id_end)
}

/// Represent a single entry in a batch response.
pub type BatchEntry<'a, R> = Result<R, ErrorObject<'a>>;

Expand Down
Loading