Skip to content
Merged
Show file tree
Hide file tree
Changes from 57 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
33ad778
feat: client rpc middleware
niklasad1 Jan 15, 2025
401dfc1
PoC works
niklasad1 Jan 29, 2025
11d7b4d
cargo fmt
niklasad1 Jan 29, 2025
622bffd
more refactoring
niklasad1 Feb 25, 2025
56be708
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Feb 26, 2025
93c9b6f
use Cow in Notification to avoid alloc
niklasad1 Feb 26, 2025
5929436
cleanup some todos
niklasad1 Feb 26, 2025
946afee
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Mar 10, 2025
ff64b91
rpc trait: return Result<MethodResponse, Err>
niklasad1 Mar 11, 2025
9515e40
remove infallible err
niklasad1 Mar 12, 2025
062ead3
make it compile
niklasad1 Mar 12, 2025
d52740b
fix tests
niklasad1 Mar 12, 2025
b508470
introduce client method response type
niklasad1 Mar 19, 2025
dec873f
fix faulty imports
niklasad1 Mar 20, 2025
3aa828d
minor cleanup
niklasad1 Mar 20, 2025
b1e3b41
introduce Batch/BatchEntry for middleware
niklasad1 Mar 21, 2025
46ef085
remove ignore for batch test
niklasad1 Mar 21, 2025
5759407
fix rustdocs
niklasad1 Mar 22, 2025
ee75d58
add rpc middleware for the async client
niklasad1 Mar 25, 2025
79216b5
remove serialize specific types
niklasad1 Mar 25, 2025
8849af5
commit missing file
niklasad1 Mar 25, 2025
6a6e3aa
no serde_json::Value
niklasad1 Mar 26, 2025
02fb917
more nit fixing
niklasad1 Mar 27, 2025
09b4e0b
more cleanup
niklasad1 Mar 27, 2025
763cad6
refactor method response client
niklasad1 Mar 28, 2025
0e2adb0
fix some nits
niklasad1 Mar 28, 2025
6b80964
add client middleware rpc
niklasad1 Mar 31, 2025
631d7c0
fix wasm build
niklasad1 Apr 1, 2025
4ddee29
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Apr 1, 2025
a7324d3
add client middleware example
niklasad1 Apr 1, 2025
38b5261
Update examples/examples/rpc_middleware_client.rs
niklasad1 Apr 1, 2025
9640517
ToJson -> RawValue
niklasad1 Apr 2, 2025
100bda0
Merge remote-tracking branch 'origin/master' into na-client-rpc-middl…
niklasad1 Apr 2, 2025
b051bd6
replace Future type with impl Trait
niklasad1 Apr 2, 2025
f03371a
revert changelog
niklasad1 Apr 2, 2025
cc08ebf
remove logger response future
niklasad1 Apr 2, 2025
9936607
some cleanup
niklasad1 Apr 3, 2025
4149181
move request timeout from transport to client
niklasad1 Apr 3, 2025
8161e01
more nit fixing
niklasad1 Apr 3, 2025
1f73b97
have pass over examples
niklasad1 Apr 4, 2025
de1461d
show proper batch middleware example
niklasad1 Apr 4, 2025
a148135
middleware: clean up batch type
niklasad1 Apr 4, 2025
5285fc4
fix wasm build
niklasad1 Apr 4, 2025
54c7fe3
Update Cargo.toml
niklasad1 Apr 5, 2025
cc7807e
doc: fix typo
niklasad1 Apr 5, 2025
f722acd
core: remove tracing mod
niklasad1 Apr 5, 2025
ed4fb5b
fix more clippy
niklasad1 Apr 5, 2025
76d56c5
refactor: use json rawvalue
niklasad1 Apr 7, 2025
9671152
replace StringError with SubscriptionErr
niklasad1 Apr 8, 2025
cdc8465
Merge remote-tracking branch 'origin/master' into na-json-raw-value
niklasad1 Apr 11, 2025
822b7ba
cleanup json APIs with RawValue
niklasad1 Apr 12, 2025
ebcd810
cleanup and fix tests
niklasad1 Apr 12, 2025
6088a06
remove from_json api
niklasad1 Apr 12, 2025
50ce885
revert faulty change in types
niklasad1 Apr 12, 2025
f1448aa
revert faulty change in types v2
niklasad1 Apr 12, 2025
b995cfd
fix rustdocs
niklasad1 Apr 12, 2025
add0baf
use explicit JSON null value instead of default
niklasad1 Apr 12, 2025
f9b3117
Merge remote-tracking branch 'origin/master' into na-json-raw-value
niklasad1 Apr 14, 2025
0039e3a
cleanup Rawvalue::from_string usage
niklasad1 Apr 14, 2025
03603e0
fix tests
niklasad1 Apr 14, 2025
cccd706
Update core/src/traits.rs
niklasad1 Apr 14, 2025
c0e8a34
Update benches/bench.rs
niklasad1 Apr 14, 2025
01f76fd
Update core/src/params.rs
niklasad1 Apr 14, 2025
601d623
cleanup expects on RawValue
niklasad1 Apr 15, 2025
dfd6df3
improve docs
niklasad1 Apr 15, 2025
51ce03f
replace manual serialization with to_raw_value
niklasad1 Apr 15, 2025
ca3e5bf
fix clippy
niklasad1 Apr 15, 2025
097f084
fix nit
niklasad1 Apr 15, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::
/// Run jsonrpsee WebSocket server for benchmarks.
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::server::ServerHandle) {
use jsonrpsee::server::{ServerBuilder, ServerConfig, SubscriptionMessage};
use jsonrpsee::server::{ServerBuilder, ServerConfig};
use serde_json::value::RawValue;

let config = ServerConfig::builder()
.max_request_body_size(u32::MAX)
Expand All @@ -150,8 +151,8 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se
UNSUB_METHOD_NAME,
|_params, pending, _ctx, _| async move {
let sink = pending.accept().await?;
let msg = SubscriptionMessage::from("Hello");
sink.send(msg).await?;
let json = RawValue::from_string("\"Hello\")".to_string()).unwrap();
sink.send(json).await?;

Ok(())
},
Expand Down
2 changes: 1 addition & 1 deletion client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ where
)
.await?
.into_method_call()
.expect("Method call must return a method call reponse; qed");
.expect("Method call must return a method call response; qed");

let rp = ResponseSuccess::try_from(method_response.into_inner())?;

Expand Down
2 changes: 1 addition & 1 deletion core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ impl ToJson for MethodResponse {
fn to_json(&self) -> Result<Box<RawValue>, serde_json::Error> {
match &self.inner {
MethodResponseKind::MethodCall(call) => call.to_json(),
MethodResponseKind::Notification => Ok(Box::<RawValue>::default()),
MethodResponseKind::Notification => Ok(RawValue::NULL.to_owned()),
MethodResponseKind::Batch(json) => serde_json::value::to_raw_value(json),
MethodResponseKind::Subscription(s) => serde_json::value::to_raw_value(&s.rp),
}
Expand Down
44 changes: 40 additions & 4 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,49 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

/// A type that returns the error as a `String` from `SubscriptionCallback`.
use serde::Serialize;
use serde_json::value::RawValue;

#[derive(Debug)]
pub(crate) enum InnerSubscriptionErr {
String(String),
Json(Box<RawValue>),
}

/// Error returned when a subscription fails.
///
/// It's recommended to use `SubscriptionErr::from_json` to create a new instance of this error
/// because using the `String` representation may not very ergonomic for clients to parse.
#[derive(Debug)]
pub struct StringError(pub(crate) String);
pub struct SubscriptionError(pub(crate) InnerSubscriptionErr);

impl Serialize for SubscriptionError {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
match &self.0 {
InnerSubscriptionErr::String(s) => serializer.serialize_str(s),
InnerSubscriptionErr::Json(json) => json.serialize(serializer),
}
}
}

impl<T: ToString> From<T> for StringError {
impl<T: ToString> From<T> for SubscriptionError {
fn from(val: T) -> Self {
StringError(val.to_string())
Self(InnerSubscriptionErr::String(val.to_string()))
}
}

impl SubscriptionError {
/// Create a new `SubscriptionErr` from a JSON value.
pub fn from_json(json: Box<RawValue>) -> Self {
Self(InnerSubscriptionErr::Json(json))
}

/// Create a new `SubscriptionErr` from a String.
pub fn from_string(s: String) -> Self {
Self::from(s)
}
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ cfg_client_or_server! {
}

pub use async_trait::async_trait;
pub use error::{RegisterMethodError, StringError};
pub use error::{RegisterMethodError, SubscriptionError};

/// JSON-RPC result.
pub type RpcResult<T> = std::result::Result<T, jsonrpsee_types::ErrorObjectOwned>;
Expand Down Expand Up @@ -99,7 +99,7 @@ pub use std::borrow::Cow;
pub const TEN_MB_SIZE_BYTES: u32 = 10 * 1024 * 1024;

/// The return type if the subscription wants to return `Result`.
pub type SubscriptionResult = Result<(), StringError>;
pub type SubscriptionResult = Result<(), SubscriptionError>;

/// Type erased error.
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
13 changes: 7 additions & 6 deletions core/src/server/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::server::SubscriptionMessage;
use serde_json::value::RawValue;
use tokio::sync::mpsc;

/// Error that may occur during [`crate::server::MethodSink::try_send`] or [`crate::server::SubscriptionSink::try_send`].
Expand Down Expand Up @@ -60,23 +61,23 @@ pub enum SendTimeoutError {
#[error("The remote peer closed the connection")]
pub struct PendingSubscriptionAcceptError;

impl From<mpsc::error::SendError<String>> for DisconnectError {
fn from(e: mpsc::error::SendError<String>) -> Self {
impl From<mpsc::error::SendError<Box<RawValue>>> for DisconnectError {
fn from(e: mpsc::error::SendError<Box<RawValue>>) -> Self {
DisconnectError(SubscriptionMessage::from_complete_message(e.0))
}
}

impl From<mpsc::error::TrySendError<String>> for TrySendError {
fn from(e: mpsc::error::TrySendError<String>) -> Self {
impl From<mpsc::error::TrySendError<Box<RawValue>>> for TrySendError {
fn from(e: mpsc::error::TrySendError<Box<RawValue>>) -> Self {
match e {
mpsc::error::TrySendError::Closed(m) => Self::Closed(SubscriptionMessage::from_complete_message(m)),
mpsc::error::TrySendError::Full(m) => Self::Full(SubscriptionMessage::from_complete_message(m)),
}
}
}

impl From<mpsc::error::SendTimeoutError<String>> for SendTimeoutError {
fn from(e: mpsc::error::SendTimeoutError<String>) -> Self {
impl From<mpsc::error::SendTimeoutError<Box<RawValue>>> for SendTimeoutError {
fn from(e: mpsc::error::SendTimeoutError<Box<RawValue>>) -> Self {
match e {
mpsc::error::SendTimeoutError::Closed(m) => Self::Closed(SubscriptionMessage::from_complete_message(m)),
mpsc::error::SendTimeoutError::Timeout(m) => Self::Timeout(SubscriptionMessage::from_complete_message(m)),
Expand Down
19 changes: 10 additions & 9 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,28 @@
use std::time::Duration;

use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload};
use serde_json::value::RawValue;
use tokio::sync::mpsc;

use super::{DisconnectError, SendTimeoutError, SubscriptionMessage, TrySendError};
use super::{DisconnectError, SendTimeoutError, TrySendError};

/// Sink that is used to send back the result to the server for a specific method.
#[derive(Clone, Debug)]
pub struct MethodSink {
/// Channel sender.
tx: mpsc::Sender<String>,
tx: mpsc::Sender<Box<RawValue>>,
/// Max response size in bytes for a executed call.
max_response_size: u32,
}

impl MethodSink {
/// Create a new `MethodSink` with unlimited response size.
pub fn new(tx: mpsc::Sender<String>) -> Self {
pub fn new(tx: mpsc::Sender<Box<RawValue>>) -> Self {
MethodSink { tx, max_response_size: u32::MAX }
}

/// Create a new `MethodSink` with a limited response size.
pub fn new_with_limit(tx: mpsc::Sender<String>, max_response_size: u32) -> Self {
pub fn new_with_limit(tx: mpsc::Sender<Box<RawValue>>, max_response_size: u32) -> Self {
MethodSink { tx, max_response_size }
}

Expand All @@ -74,25 +75,25 @@ impl MethodSink {
/// connection has been closed or if the message buffer is full.
///
/// Returns the message if the send fails such that either can be thrown away or re-sent later.
pub fn try_send(&mut self, msg: String) -> Result<(), TrySendError> {
pub fn try_send(&mut self, msg: Box<RawValue>) -> Result<(), TrySendError> {
self.tx.try_send(msg).map_err(Into::into)
}

/// Async send which will wait until there is space in channel buffer or that the subscription is disconnected.
pub async fn send(&self, msg: String) -> Result<(), DisconnectError> {
pub async fn send(&self, msg: Box<RawValue>) -> Result<(), DisconnectError> {
self.tx.send(msg).await.map_err(Into::into)
}

/// Send a JSON-RPC error to the client
pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> {
let payload = ResponsePayload::<()>::error_borrowed(err);
let json = serde_json::to_string(&Response::new(payload, id)).expect("valid JSON; qed");
let json = serde_json::value::to_raw_value(&Response::new(payload, id)).expect("valid JSON; qed");

self.send(json).await
}

/// Similar to `MethodSink::send` but only waits for a limited time.
pub async fn send_timeout(&self, msg: String, timeout: Duration) -> Result<(), SendTimeoutError> {
pub async fn send_timeout(&self, msg: Box<RawValue>, timeout: Duration) -> Result<(), SendTimeoutError> {
self.tx.send_timeout(msg, timeout).await.map_err(Into::into)
}

Expand All @@ -112,7 +113,7 @@ impl MethodSink {
// The permit is thrown away here because it's just
// a way to ensure that the return buffer has space.
Ok(_) => Ok(()),
Err(_) => Err(DisconnectError(SubscriptionMessage::empty())),
Err(_) => Err(DisconnectError(RawValue::NULL.to_owned().into())),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/server/method_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ impl MethodResponse {
/// Create notification response which is a response that doesn't expect a reply.
pub fn notification() -> Self {
Self {
json: Box::<RawValue>::default(),
json: RawValue::NULL.to_owned(),
success_or_error: MethodResponseResult::Success,
kind: ResponseKind::Notification,
on_close: None,
Expand Down
32 changes: 17 additions & 15 deletions core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@ use crate::error::RegisterMethodError;
use crate::id_providers::RandomIntegerIdProvider;
use crate::server::helpers::MethodSink;
use crate::server::subscription::{
BoundedSubscriptions, IntoSubscriptionCloseResponse, PendingSubscriptionSink, SubNotifResultOrError, Subscribers,
Subscription, SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit, SubscriptionState,
sub_message_to_json,
BoundedSubscriptions, IntoSubscriptionCloseResponse, PendingSubscriptionSink, Subscribers, Subscription,
SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit, SubscriptionState, sub_message_to_json,
};
use crate::server::{LOG_TARGET, MethodResponse, ResponsePayload};
use crate::traits::ToRpcParams;
Expand All @@ -51,7 +50,7 @@ use serde::de::DeserializeOwned;
use serde_json::value::RawValue;
use tokio::sync::{mpsc, oneshot};

use super::IntoResponse;
use super::{IntoResponse, sub_err_to_json};

/// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request,
/// implemented as a function pointer to a `Fn` function taking four arguments:
Expand Down Expand Up @@ -95,7 +94,7 @@ pub type MaxResponseSize = usize;
/// A tuple containing:
/// - Call result as a `String`,
/// - a [`mpsc::UnboundedReceiver<String>`] to receive future subscription results
pub type RawRpcResponse = (Box<RawValue>, mpsc::Receiver<String>);
pub type RawRpcResponse = (Box<RawValue>, mpsc::Receiver<Box<RawValue>>);

/// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked.
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -326,14 +325,16 @@ impl Methods {
/// async fn main() {
/// use jsonrpsee::{RpcModule, SubscriptionMessage};
/// use jsonrpsee::types::{response::Success, Response};
/// use jsonrpsee::core::JsonRawValue;
/// use futures_util::StreamExt;
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _, _| async {
/// let sink = pending.accept().await?;
///
/// // see comment above.
/// sink.send("one answer".into()).await?;
/// let msg = JsonRawValue::from_string("\"one answer\"".into()).unwrap();
/// sink.send(msg).await?;
///
/// Ok(())
/// }).unwrap();
Expand All @@ -343,15 +344,15 @@ impl Methods {
/// let sub_resp = stream.recv().await.unwrap();
/// assert_eq!(
/// format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result),
/// sub_resp
/// sub_resp.get()
/// );
/// }
/// ```
pub async fn raw_json_request(
&self,
request: &str,
buf_size: usize,
) -> Result<(Box<RawValue>, mpsc::Receiver<String>), serde_json::Error> {
) -> Result<(Box<RawValue>, mpsc::Receiver<Box<RawValue>>), serde_json::Error> {
tracing::trace!("[Methods::raw_json_request] Request: {:?}", request);
let req: Request = serde_json::from_str(request)?;
let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await;
Expand Down Expand Up @@ -422,12 +423,13 @@ impl Methods {
/// #[tokio::main]
/// async fn main() {
/// use jsonrpsee::{RpcModule, SubscriptionMessage};
/// use jsonrpsee::core::{EmptyServerParams, RpcResult};
/// use jsonrpsee::core::{EmptyServerParams, RpcResult, JsonRawValue};
///
/// let mut module = RpcModule::new(());
/// module.register_subscription("hi", "hi", "goodbye", |_, pending, _, _| async move {
/// let sink = pending.accept().await?;
/// sink.send("one answer".into()).await?;
/// let msg = JsonRawValue::from_string("\"one answer\"".into()).unwrap();
/// sink.send(msg).await?;
/// Ok(())
/// }).unwrap();
///
Expand Down Expand Up @@ -767,7 +769,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// //
/// // If you need some other behavior implement or custom format of the error field
/// // you need to manually handle that.
/// let msg = SubscriptionMessage::from_json(&sum)?;
/// let msg = serde_json::value::to_raw_value(&sum).unwrap();
///
/// // This fails only if the connection is closed
/// sink.send(msg).await?;
Expand Down Expand Up @@ -838,11 +840,11 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {

match response {
SubscriptionCloseResponse::Notif(msg) => {
let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &sub_id, method);
let json = sub_message_to_json(msg, &sub_id, method);
let _ = method_sink.send(json).await;
}
SubscriptionCloseResponse::NotifErr(msg) => {
let json = sub_message_to_json(msg, SubNotifResultOrError::Error, &sub_id, method);
SubscriptionCloseResponse::NotifErr(err) => {
let json = sub_err_to_json(err, &sub_id, method);
let _ = method_sink.send(json).await;
}
SubscriptionCloseResponse::None => (),
Expand Down Expand Up @@ -910,7 +912,7 @@ impl<Context: Send + Sync + 'static> RpcModule<Context> {
/// let sink = pending.accept().await.unwrap();
/// let sum = val + (*ctx);
///
/// let msg = SubscriptionMessage::from_json(&sum).unwrap();
/// let msg = serde_json::value::to_raw_value(&sum).unwrap();
///
/// // This fails only if the connection is closed
/// sink.send(msg).await.unwrap();
Expand Down
Loading
Loading