diff --git a/benches/bench.rs b/benches/bench.rs index 1f45168dcc..88f04c49ff 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -113,7 +113,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) { // Construct the serialized array request using the `RawValue` directly. crit.bench_function("jsonrpsee_types_array_params_baseline", |b| { b.iter(|| { - let params = serde_json::value::RawValue::from_string("[1, 2]".to_string()).unwrap(); + let params = serde_json::value::to_raw_value(&[1, 2]).unwrap(); let request = Request::borrowed("say_hello", Some(¶ms), Id::Number(0)); v2_serialize(request); @@ -136,8 +136,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) { // Construct the serialized object request using the `RawValue` directly. crit.bench_function("jsonrpsee_types_object_params_baseline", |b| { b.iter(|| { - let params = serde_json::value::RawValue::from_string(r#"{"key": 1}"#.to_string()).unwrap(); - + let params = serde_json::value::to_raw_value(r#"{"key": 1}"#).unwrap(); let request = Request::borrowed("say_hello", Some(¶ms), Id::Number(0)); v2_serialize(request); }) diff --git a/benches/helpers.rs b/benches/helpers.rs index 7443194cde..825eb51cd5 100644 --- a/benches/helpers.rs +++ b/benches/helpers.rs @@ -131,7 +131,7 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee:: /// Run jsonrpsee WebSocket server for benchmarks. #[cfg(not(feature = "jsonrpc-crate"))] pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::server::ServerHandle) { - use jsonrpsee::server::{ServerBuilder, ServerConfig, SubscriptionMessage}; + use jsonrpsee::server::{ServerBuilder, ServerConfig}; let config = ServerConfig::builder() .max_request_body_size(u32::MAX) @@ -150,8 +150,8 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::se UNSUB_METHOD_NAME, |_params, pending, _ctx, _| async move { let sink = pending.accept().await?; - let msg = SubscriptionMessage::from("Hello"); - sink.send(msg).await?; + let json = serde_json::value::to_raw_value(&"Hello").unwrap(); + sink.send(json).await?; Ok(()) }, diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 68958aac22..031514ed48 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -787,7 +787,7 @@ impl ToJson for MethodResponse { fn to_json(&self) -> Result, serde_json::Error> { match &self.inner { MethodResponseKind::MethodCall(call) => call.to_json(), - MethodResponseKind::Notification => Ok(Box::::default()), + MethodResponseKind::Notification => Ok(RawValue::NULL.to_owned()), MethodResponseKind::Batch(json) => serde_json::value::to_raw_value(json), MethodResponseKind::Subscription(s) => serde_json::value::to_raw_value(&s.rp), } diff --git a/core/src/error.rs b/core/src/error.rs index df6cb72e09..8eb1fbe4be 100644 --- a/core/src/error.rs +++ b/core/src/error.rs @@ -24,13 +24,53 @@ // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -/// A type that returns the error as a `String` from `SubscriptionCallback`. -#[derive(Debug)] -pub struct StringError(pub(crate) String); +use serde::Serialize; +use serde_json::value::RawValue; -impl From for StringError { +#[derive(Debug, Clone)] +pub(crate) enum InnerSubscriptionErr { + String(String), + Json(Box), +} + +/// Error returned when a subscription fails where the error is returned +/// as special error notification with the following format: +/// +/// ```json +/// {"jsonrpc":"2.0", "method":"subscription_error", "params": {"subscription": "sub_id", "error": }} +/// ``` +/// +/// It's recommended to use [`SubscriptionError::from_json`] to create a new instance of this error +/// if the underlying error is a JSON value. That will ensure that the error is serialized correctly. +/// +/// SubscriptionError::from will serialize the error as a string, which is not +/// recommended and should only by used in the value of a `String` type. +/// It's mainly provided for convenience and to allow for easy conversion any type that implements StdError. +#[derive(Debug, Clone)] +pub struct SubscriptionError(pub(crate) InnerSubscriptionErr); + +impl Serialize for SubscriptionError { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match &self.0 { + InnerSubscriptionErr::String(s) => serializer.serialize_str(s), + InnerSubscriptionErr::Json(json) => json.serialize(serializer), + } + } +} + +impl From for SubscriptionError { fn from(val: T) -> Self { - StringError(val.to_string()) + Self(InnerSubscriptionErr::String(val.to_string())) + } +} + +impl SubscriptionError { + /// Create a new `SubscriptionError` from a JSON value. + pub fn from_json(json: Box) -> Self { + Self(InnerSubscriptionErr::Json(json)) } } diff --git a/core/src/lib.rs b/core/src/lib.rs index 374aae2fda..7bca3d9e0c 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -61,7 +61,7 @@ cfg_client_or_server! { } pub use async_trait::async_trait; -pub use error::{RegisterMethodError, StringError}; +pub use error::{RegisterMethodError, SubscriptionError}; /// JSON-RPC result. pub type RpcResult = std::result::Result; @@ -99,7 +99,7 @@ pub use std::borrow::Cow; pub const TEN_MB_SIZE_BYTES: u32 = 10 * 1024 * 1024; /// The return type if the subscription wants to return `Result`. -pub type SubscriptionResult = Result<(), StringError>; +pub type SubscriptionResult = Result<(), SubscriptionError>; /// Type erased error. pub type BoxError = Box; diff --git a/core/src/params.rs b/core/src/params.rs index 996264a20c..1290263be3 100644 --- a/core/src/params.rs +++ b/core/src/params.rs @@ -33,6 +33,7 @@ use serde_json::value::RawValue; /// Helper module for building parameters. mod params_builder { use serde::Serialize; + use serde_json::value::RawValue; /// Initial number of bytes for a parameter length. const PARAM_BYTES_CAPACITY: usize = 128; @@ -106,8 +107,8 @@ mod params_builder { Ok(()) } - /// Finish the building process and return a JSON compatible string. - pub(crate) fn build(mut self) -> Option { + /// Finish the building process and return a JSON object. + pub(crate) fn build(mut self) -> Option> { if self.bytes.is_empty() { return None; } @@ -120,7 +121,8 @@ mod params_builder { } // Safety: This is safe because JSON does not emit invalid UTF-8. - Some(unsafe { String::from_utf8_unchecked(self.bytes) }) + let json_str = unsafe { String::from_utf8_unchecked(self.bytes) }; + Some(RawValue::from_string(json_str).expect("Valid JSON String; qed")) } } } @@ -164,7 +166,7 @@ impl Default for ObjectParams { impl ToRpcParams for ObjectParams { fn to_rpc_params(self) -> Result>, serde_json::Error> { - if let Some(json) = self.0.build() { RawValue::from_string(json).map(Some) } else { Ok(None) } + Ok(self.0.build()) } } @@ -206,7 +208,7 @@ impl Default for ArrayParams { impl ToRpcParams for ArrayParams { fn to_rpc_params(self) -> Result>, serde_json::Error> { - if let Some(json) = self.0.build() { RawValue::from_string(json).map(Some) } else { Ok(None) } + Ok(self.0.build()) } } diff --git a/core/src/server/error.rs b/core/src/server/error.rs index e92a29e112..e3bd834c73 100644 --- a/core/src/server/error.rs +++ b/core/src/server/error.rs @@ -25,6 +25,7 @@ // DEALINGS IN THE SOFTWARE. use crate::server::SubscriptionMessage; +use serde_json::value::RawValue; use tokio::sync::mpsc; /// Error that may occur during [`crate::server::MethodSink::try_send`] or [`crate::server::SubscriptionSink::try_send`]. @@ -60,14 +61,14 @@ pub enum SendTimeoutError { #[error("The remote peer closed the connection")] pub struct PendingSubscriptionAcceptError; -impl From> for DisconnectError { - fn from(e: mpsc::error::SendError) -> Self { +impl From>> for DisconnectError { + fn from(e: mpsc::error::SendError>) -> Self { DisconnectError(SubscriptionMessage::from_complete_message(e.0)) } } -impl From> for TrySendError { - fn from(e: mpsc::error::TrySendError) -> Self { +impl From>> for TrySendError { + fn from(e: mpsc::error::TrySendError>) -> Self { match e { mpsc::error::TrySendError::Closed(m) => Self::Closed(SubscriptionMessage::from_complete_message(m)), mpsc::error::TrySendError::Full(m) => Self::Full(SubscriptionMessage::from_complete_message(m)), @@ -75,8 +76,8 @@ impl From> for TrySendError { } } -impl From> for SendTimeoutError { - fn from(e: mpsc::error::SendTimeoutError) -> Self { +impl From>> for SendTimeoutError { + fn from(e: mpsc::error::SendTimeoutError>) -> Self { match e { mpsc::error::SendTimeoutError::Closed(m) => Self::Closed(SubscriptionMessage::from_complete_message(m)), mpsc::error::SendTimeoutError::Timeout(m) => Self::Timeout(SubscriptionMessage::from_complete_message(m)), diff --git a/core/src/server/helpers.rs b/core/src/server/helpers.rs index cf480312ea..c923759fdd 100644 --- a/core/src/server/helpers.rs +++ b/core/src/server/helpers.rs @@ -27,27 +27,28 @@ use std::time::Duration; use jsonrpsee_types::{ErrorCode, ErrorObject, Id, InvalidRequest, Response, ResponsePayload}; +use serde_json::value::RawValue; use tokio::sync::mpsc; -use super::{DisconnectError, SendTimeoutError, SubscriptionMessage, TrySendError}; +use super::{DisconnectError, SendTimeoutError, TrySendError}; /// Sink that is used to send back the result to the server for a specific method. #[derive(Clone, Debug)] pub struct MethodSink { /// Channel sender. - tx: mpsc::Sender, + tx: mpsc::Sender>, /// Max response size in bytes for a executed call. max_response_size: u32, } impl MethodSink { /// Create a new `MethodSink` with unlimited response size. - pub fn new(tx: mpsc::Sender) -> Self { + pub fn new(tx: mpsc::Sender>) -> Self { MethodSink { tx, max_response_size: u32::MAX } } /// Create a new `MethodSink` with a limited response size. - pub fn new_with_limit(tx: mpsc::Sender, max_response_size: u32) -> Self { + pub fn new_with_limit(tx: mpsc::Sender>, max_response_size: u32) -> Self { MethodSink { tx, max_response_size } } @@ -74,25 +75,25 @@ impl MethodSink { /// connection has been closed or if the message buffer is full. /// /// Returns the message if the send fails such that either can be thrown away or re-sent later. - pub fn try_send(&mut self, msg: String) -> Result<(), TrySendError> { + pub fn try_send(&mut self, msg: Box) -> Result<(), TrySendError> { self.tx.try_send(msg).map_err(Into::into) } /// Async send which will wait until there is space in channel buffer or that the subscription is disconnected. - pub async fn send(&self, msg: String) -> Result<(), DisconnectError> { + pub async fn send(&self, msg: Box) -> Result<(), DisconnectError> { self.tx.send(msg).await.map_err(Into::into) } /// Send a JSON-RPC error to the client pub async fn send_error<'a>(&self, id: Id<'a>, err: ErrorObject<'a>) -> Result<(), DisconnectError> { let payload = ResponsePayload::<()>::error_borrowed(err); - let json = serde_json::to_string(&Response::new(payload, id)).expect("valid JSON; qed"); + let json = serde_json::value::to_raw_value(&Response::new(payload, id)).expect("valid JSON; qed"); self.send(json).await } /// Similar to `MethodSink::send` but only waits for a limited time. - pub async fn send_timeout(&self, msg: String, timeout: Duration) -> Result<(), SendTimeoutError> { + pub async fn send_timeout(&self, msg: Box, timeout: Duration) -> Result<(), SendTimeoutError> { self.tx.send_timeout(msg, timeout).await.map_err(Into::into) } @@ -112,7 +113,7 @@ impl MethodSink { // The permit is thrown away here because it's just // a way to ensure that the return buffer has space. Ok(_) => Ok(()), - Err(_) => Err(DisconnectError(SubscriptionMessage::empty())), + Err(_) => Err(DisconnectError(RawValue::NULL.to_owned().into())), } } } diff --git a/core/src/server/method_response.rs b/core/src/server/method_response.rs index 9e8d7dc9f2..b419ad194d 100644 --- a/core/src/server/method_response.rs +++ b/core/src/server/method_response.rs @@ -191,7 +191,7 @@ impl MethodResponse { Ok(_) => { // Safety - serde_json does not emit invalid UTF-8. let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) }; - let json = RawValue::from_string(result).expect("JSON serialization infallible; qed"); + let json = RawValue::from_string(result).expect("Valid JSON String; qed"); Self { json, success_or_error, kind, on_close: rp.on_exit, extensions: Extensions::new() } } @@ -261,7 +261,7 @@ impl MethodResponse { /// Create notification response which is a response that doesn't expect a reply. pub fn notification() -> Self { Self { - json: Box::::default(), + json: RawValue::NULL.to_owned(), success_or_error: MethodResponseResult::Success, kind: ResponseKind::Notification, on_close: None, @@ -370,7 +370,7 @@ impl BatchResponseBuilder { } else { self.result.pop(); self.result.push(']'); - let json = RawValue::from_string(self.result).expect("JSON serialization infallible; qed"); + let json = RawValue::from_string(self.result).expect("BatchResponse builds a valid JSON String; qed"); BatchResponse { json, extensions: self.extensions } } } diff --git a/core/src/server/rpc_module.rs b/core/src/server/rpc_module.rs index 38937f13a8..abacf7a2ae 100644 --- a/core/src/server/rpc_module.rs +++ b/core/src/server/rpc_module.rs @@ -34,9 +34,8 @@ use crate::error::RegisterMethodError; use crate::id_providers::RandomIntegerIdProvider; use crate::server::helpers::MethodSink; use crate::server::subscription::{ - BoundedSubscriptions, IntoSubscriptionCloseResponse, PendingSubscriptionSink, SubNotifResultOrError, Subscribers, - Subscription, SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit, SubscriptionState, - sub_message_to_json, + BoundedSubscriptions, IntoSubscriptionCloseResponse, PendingSubscriptionSink, Subscribers, Subscription, + SubscriptionCloseResponse, SubscriptionKey, SubscriptionPermit, SubscriptionState, sub_message_to_json, }; use crate::server::{LOG_TARGET, MethodResponse, ResponsePayload}; use crate::traits::ToRpcParams; @@ -51,7 +50,7 @@ use serde::de::DeserializeOwned; use serde_json::value::RawValue; use tokio::sync::{mpsc, oneshot}; -use super::IntoResponse; +use super::{IntoResponse, sub_err_to_json}; /// A `MethodCallback` is an RPC endpoint, callable with a standard JSON-RPC request, /// implemented as a function pointer to a `Fn` function taking four arguments: @@ -95,7 +94,7 @@ pub type MaxResponseSize = usize; /// A tuple containing: /// - Call result as a `String`, /// - a [`mpsc::UnboundedReceiver`] to receive future subscription results -pub type RawRpcResponse = (Box, mpsc::Receiver); +pub type RawRpcResponse = (Box, mpsc::Receiver>); /// The error that can occur when [`Methods::call`] or [`Methods::subscribe`] is invoked. #[derive(thiserror::Error, Debug)] @@ -326,6 +325,7 @@ impl Methods { /// async fn main() { /// use jsonrpsee::{RpcModule, SubscriptionMessage}; /// use jsonrpsee::types::{response::Success, Response}; + /// use jsonrpsee::core::to_json_raw_value; /// use futures_util::StreamExt; /// /// let mut module = RpcModule::new(()); @@ -333,7 +333,8 @@ impl Methods { /// let sink = pending.accept().await?; /// /// // see comment above. - /// sink.send("one answer".into()).await?; + /// let msg = to_json_raw_value(&"one answer").unwrap(); + /// sink.send(msg).await?; /// /// Ok(()) /// }).unwrap(); @@ -343,7 +344,7 @@ impl Methods { /// let sub_resp = stream.recv().await.unwrap(); /// assert_eq!( /// format!(r#"{{"jsonrpc":"2.0","method":"hi","params":{{"subscription":{},"result":"one answer"}}}}"#, resp.result), - /// sub_resp + /// sub_resp.get() /// ); /// } /// ``` @@ -351,7 +352,7 @@ impl Methods { &self, request: &str, buf_size: usize, - ) -> Result<(Box, mpsc::Receiver), serde_json::Error> { + ) -> Result<(Box, mpsc::Receiver>), serde_json::Error> { tracing::trace!("[Methods::raw_json_request] Request: {:?}", request); let req: Request = serde_json::from_str(request)?; let (resp, rx) = self.inner_call(req, buf_size, mock_subscription_permit()).await; @@ -422,12 +423,13 @@ impl Methods { /// #[tokio::main] /// async fn main() { /// use jsonrpsee::{RpcModule, SubscriptionMessage}; - /// use jsonrpsee::core::{EmptyServerParams, RpcResult}; + /// use jsonrpsee::core::{EmptyServerParams, RpcResult, to_json_raw_value}; /// /// let mut module = RpcModule::new(()); /// module.register_subscription("hi", "hi", "goodbye", |_, pending, _, _| async move { /// let sink = pending.accept().await?; - /// sink.send("one answer".into()).await?; + /// let msg = to_json_raw_value(&"one answer").unwrap(); + /// sink.send(msg).await?; /// Ok(()) /// }).unwrap(); /// @@ -767,7 +769,7 @@ impl RpcModule { /// // /// // If you need some other behavior implement or custom format of the error field /// // you need to manually handle that. - /// let msg = SubscriptionMessage::from_json(&sum)?; + /// let msg = serde_json::value::to_raw_value(&sum).unwrap(); /// /// // This fails only if the connection is closed /// sink.send(msg).await?; @@ -838,11 +840,11 @@ impl RpcModule { match response { SubscriptionCloseResponse::Notif(msg) => { - let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &sub_id, method); + let json = sub_message_to_json(msg, &sub_id, method); let _ = method_sink.send(json).await; } - SubscriptionCloseResponse::NotifErr(msg) => { - let json = sub_message_to_json(msg, SubNotifResultOrError::Error, &sub_id, method); + SubscriptionCloseResponse::NotifErr(err) => { + let json = sub_err_to_json(err, sub_id, method); let _ = method_sink.send(json).await; } SubscriptionCloseResponse::None => (), @@ -910,7 +912,7 @@ impl RpcModule { /// let sink = pending.accept().await.unwrap(); /// let sum = val + (*ctx); /// - /// let msg = SubscriptionMessage::from_json(&sum).unwrap(); + /// let msg = serde_json::value::to_raw_value(&sum).unwrap(); /// /// // This fails only if the connection is closed /// sink.send(msg).await.unwrap(); diff --git a/core/src/server/subscription.rs b/core/src/server/subscription.rs index 6ffd5f3c71..b74b55e6ab 100644 --- a/core/src/server/subscription.rs +++ b/core/src/server/subscription.rs @@ -31,12 +31,14 @@ use super::{MethodResponse, MethodsError, ResponsePayload}; use crate::server::LOG_TARGET; use crate::server::error::{DisconnectError, PendingSubscriptionAcceptError, SendTimeoutError, TrySendError}; use crate::server::rpc_module::ConnectionId; -use crate::{error::StringError, traits::IdProvider}; +use crate::{error::SubscriptionError, traits::IdProvider}; use jsonrpsee_types::SubscriptionPayload; -use jsonrpsee_types::{ErrorObjectOwned, Id, SubscriptionId, SubscriptionResponse, response::SubscriptionError}; +use jsonrpsee_types::response::SubscriptionPayloadError; +use jsonrpsee_types::{ErrorObjectOwned, Id, SubscriptionId, SubscriptionResponse}; use parking_lot::Mutex; use rustc_hash::FxHashMap; use serde::{Serialize, de::DeserializeOwned}; +use serde_json::value::RawValue; use std::{sync::Arc, time::Duration}; use tokio::sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot}; @@ -88,14 +90,14 @@ pub enum SubscriptionCloseResponse { /// } /// } /// ``` - NotifErr(SubscriptionMessage), + NotifErr(SubscriptionError), } -impl IntoSubscriptionCloseResponse for Result<(), StringError> { +impl IntoSubscriptionCloseResponse for Result<(), SubscriptionError> { fn into_response(self) -> SubscriptionCloseResponse { match self { Ok(()) => SubscriptionCloseResponse::None, - Err(e) => SubscriptionCloseResponse::NotifErr(e.0.into()), + Err(e) => SubscriptionCloseResponse::NotifErr(e), } } } @@ -116,61 +118,37 @@ impl IntoSubscriptionCloseResponse for SubscriptionCloseResponse { #[derive(Debug, Clone)] pub enum SubscriptionMessageInner { /// Complete JSON message. - Complete(String), + Complete(Box), /// Need subscription ID and method name. - NeedsData(String), + NeedsData(Box), } /// Subscription message. #[derive(Debug, Clone)] pub struct SubscriptionMessage(pub(crate) SubscriptionMessageInner); -impl SubscriptionMessage { - /// Create a new subscription message from JSON. - /// - /// Fails if the value couldn't be serialized. - pub fn from_json(t: &impl Serialize) -> Result { - serde_json::to_string(t).map(|json| SubscriptionMessage(SubscriptionMessageInner::NeedsData(json))) +impl From> for SubscriptionMessage { + fn from(json: Box) -> Self { + Self(SubscriptionMessageInner::NeedsData(json)) } +} - /// Create a subscription message this is more efficient than [`SubscriptionMessage::from_json`] +impl SubscriptionMessage { + /// Create a subscription message this is more efficient than [`SubscriptionMessage::from`] /// because it only allocates once. /// /// Fails if the json `result` couldn't be serialized. pub fn new(method: &str, subscription: SubscriptionId, result: &impl Serialize) -> Result { - let json = serde_json::to_string(&SubscriptionResponse::new( + let json = serde_json::value::to_raw_value(&SubscriptionResponse::new( method.into(), SubscriptionPayload { subscription, result }, ))?; Ok(Self::from_complete_message(json)) } - pub(crate) fn from_complete_message(msg: String) -> Self { + pub(crate) fn from_complete_message(msg: Box) -> Self { SubscriptionMessage(SubscriptionMessageInner::Complete(msg)) } - - pub(crate) fn empty() -> Self { - Self::from_complete_message(String::new()) - } -} - -impl From for SubscriptionMessage -where - T: AsRef, -{ - fn from(s: T) -> Self { - // Add "" - let json_str = { - let s = s.as_ref(); - let mut res = String::with_capacity(s.len() + 2); - res.push('"'); - res.push_str(s); - res.push('"'); - res - }; - - SubscriptionMessage(SubscriptionMessageInner::NeedsData(json_str)) - } } /// Represent a unique subscription entry based on [`SubscriptionId`] and [`ConnectionId`]. @@ -180,21 +158,6 @@ pub struct SubscriptionKey { pub(crate) sub_id: SubscriptionId<'static>, } -#[derive(Debug, Clone, Copy)] -pub(crate) enum SubNotifResultOrError { - Result, - Error, -} - -impl SubNotifResultOrError { - pub(crate) const fn as_str(&self) -> &str { - match self { - Self::Result => "result", - Self::Error => "error", - } - } -} - /// Represents a subscription until it is unsubscribed. /// // NOTE: The reason why we use `mpsc` here is because it allows `IsUnsubscribed::unsubscribed` @@ -259,7 +222,7 @@ impl PendingSubscriptionSink { /// once reject has been called. pub async fn reject(self, err: impl Into) { let err = MethodResponse::subscription_error(self.id, err.into()); - _ = self.inner.send(err.as_json().get().to_owned()).await; + _ = self.inner.send(err.to_json()).await; _ = self.subscribe.send(err); } @@ -282,7 +245,7 @@ impl PendingSubscriptionSink { // // The same message is sent twice here because one is sent directly to the transport layer and // the other one is sent internally to accept the subscription. - self.inner.send(response.as_json().get().to_owned()).await.map_err(|_| PendingSubscriptionAcceptError)?; + self.inner.send(response.to_json()).await.map_err(|_| PendingSubscriptionAcceptError)?; self.subscribe.send(response).map_err(|_| PendingSubscriptionAcceptError)?; if success { @@ -367,24 +330,32 @@ impl SubscriptionSink { /// # Cancel safety /// /// This method is cancel-safe and dropping a future loses its spot in the waiting queue. - pub async fn send(&self, msg: SubscriptionMessage) -> Result<(), DisconnectError> { + pub async fn send(&self, msg: impl Into) -> Result<(), DisconnectError> { + let msg = msg.into(); + // Only possible to trigger when the connection is dropped. if self.is_closed() { return Err(DisconnectError(msg)); } - let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &self.uniq_sub.sub_id, self.method); + let json = sub_message_to_json(msg, &self.uniq_sub.sub_id, self.method); self.inner.send(json).await } /// Similar to `SubscriptionSink::send` but only waits for a limited time. - pub async fn send_timeout(&self, msg: SubscriptionMessage, timeout: Duration) -> Result<(), SendTimeoutError> { + pub async fn send_timeout( + &self, + msg: impl Into, + timeout: Duration, + ) -> Result<(), SendTimeoutError> { + let msg = msg.into(); + // Only possible to trigger when the connection is dropped. if self.is_closed() { return Err(SendTimeoutError::Closed(msg)); } - let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &self.uniq_sub.sub_id, self.method); + let json = sub_message_to_json(msg, &self.uniq_sub.sub_id, self.method); self.inner.send_timeout(json, timeout).await } @@ -394,13 +365,15 @@ impl SubscriptionSink { /// /// This differs from [`SubscriptionSink::send`] where it will until there is capacity /// in the channel. - pub fn try_send(&mut self, msg: SubscriptionMessage) -> Result<(), TrySendError> { + pub fn try_send(&mut self, msg: impl Into) -> Result<(), TrySendError> { + let msg = msg.into(); + // Only possible to trigger when the connection is dropped. if self.is_closed() { return Err(TrySendError::Closed(msg)); } - let json = sub_message_to_json(msg, SubNotifResultOrError::Result, &self.uniq_sub.sub_id, self.method); + let json = sub_message_to_json(msg, &self.uniq_sub.sub_id, self.method); self.inner.try_send(json) } @@ -444,7 +417,7 @@ impl Drop for SubscriptionSink { /// Wrapper struct that maintains a subscription "mainly" for testing. #[derive(Debug)] pub struct Subscription { - pub(crate) rx: mpsc::Receiver, + pub(crate) rx: mpsc::Receiver>, pub(crate) sub_id: SubscriptionId<'static>, } @@ -468,12 +441,14 @@ impl Subscription { // clippy complains about this but it doesn't compile without the extra res binding. #[allow(clippy::let_and_return)] - let res = match serde_json::from_str::>(&raw) { + let res = match serde_json::from_str::>(raw.get()) { Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))), - Err(e) => match serde_json::from_str::>(&raw) { - Ok(_) => None, - Err(_) => Some(Err(e.into())), - }, + Err(e) => { + match serde_json::from_str::>(raw.get()) { + Ok(_) => None, + Err(_) => Some(Err(e.into())), + } + } }; res } @@ -522,21 +497,21 @@ pub struct SubscriptionState<'a> { pub subscription_permit: SubscriptionPermit, } -pub(crate) fn sub_message_to_json( - msg: SubscriptionMessage, - result_or_err: SubNotifResultOrError, - sub_id: &SubscriptionId, - method: &str, -) -> String { - let result_or_err = result_or_err.as_str(); - +pub(crate) fn sub_message_to_json(msg: SubscriptionMessage, sub_id: &SubscriptionId, method: &str) -> Box { match msg.0 { SubscriptionMessageInner::Complete(msg) => msg, - SubscriptionMessageInner::NeedsData(result) => { - let sub_id = serde_json::to_string(&sub_id).expect("valid JSON; qed"); - format!( - r#"{{"jsonrpc":"2.0","method":"{method}","params":{{"subscription":{sub_id},"{result_or_err}":{result}}}}}"#, - ) - } + SubscriptionMessageInner::NeedsData(result) => serde_json::value::to_raw_value(&SubscriptionResponse::new( + method.into(), + SubscriptionPayload { subscription: sub_id.clone(), result }, + )) + .expect("Serialize infallible; qed"), } } + +pub(crate) fn sub_err_to_json(error: SubscriptionError, sub_id: SubscriptionId, method: &str) -> Box { + serde_json::value::to_raw_value(&jsonrpsee_types::response::SubscriptionError::new( + method.into(), + SubscriptionPayloadError { subscription: sub_id, error }, + )) + .expect("Serialize infallible; qed") +} diff --git a/core/src/traits.rs b/core/src/traits.rs index 64e158c913..6e1968d980 100644 --- a/core/src/traits.rs +++ b/core/src/traits.rs @@ -50,7 +50,7 @@ use serde_json::value::RawValue; /// impl ToRpcParams for ManualParam { /// fn to_rpc_params(self) -> Result>, serde_json::Error> { /// // Manually define a valid JSONRPC parameter. -/// RawValue::from_string("[1, \"2\", 3]".to_string()).map(Some) +/// serde_json::value::to_raw_value(&[1,2,3]).map(Some) /// } /// } /// ``` @@ -85,8 +85,8 @@ pub trait ToRpcParams { macro_rules! to_rpc_params_impl { () => { fn to_rpc_params(self) -> Result>, serde_json::Error> { - let json = serde_json::to_string(&self)?; - RawValue::from_string(json).map(Some) + let json = serde_json::value::to_raw_value(&self)?; + Ok(Some(json)) } }; } diff --git a/examples/examples/client_subscription_drop_oldest_item.rs b/examples/examples/client_subscription_drop_oldest_item.rs index 46284969ed..12414c7ff9 100644 --- a/examples/examples/client_subscription_drop_oldest_item.rs +++ b/examples/examples/client_subscription_drop_oldest_item.rs @@ -31,7 +31,7 @@ use futures::{Stream, StreamExt}; use jsonrpsee::core::DeserializeOwned; use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; use jsonrpsee::rpc_params; -use jsonrpsee::server::{RpcModule, Server, SubscriptionMessage}; +use jsonrpsee::server::{RpcModule, Server}; use jsonrpsee::ws_client::WsClientBuilder; use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::errors::BroadcastStreamRecvError; @@ -107,8 +107,8 @@ async fn run_server() -> anyhow::Result { let sub = pending.accept().await.unwrap(); for i in 0..usize::MAX { - let msg = SubscriptionMessage::from_json(&i).unwrap(); - sub.send(msg).await.unwrap(); + let json = serde_json::value::to_raw_value(&i).unwrap(); + sub.send(json).await.unwrap(); tokio::time::sleep(Duration::from_millis(10)).await; } diff --git a/examples/examples/proc_macro.rs b/examples/examples/proc_macro.rs index a7f43d2ddd..3e8bb32a8c 100644 --- a/examples/examples/proc_macro.rs +++ b/examples/examples/proc_macro.rs @@ -28,7 +28,7 @@ use std::net::SocketAddr; use jsonrpsee::core::{SubscriptionResult, async_trait, client::Subscription}; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::server::{PendingSubscriptionSink, Server, SubscriptionMessage}; +use jsonrpsee::server::{PendingSubscriptionSink, Server}; use jsonrpsee::types::ErrorObjectOwned; use jsonrpsee::ws_client::WsClientBuilder; @@ -74,8 +74,8 @@ impl RpcServer for RpcServerImpl { _keys: Option>, ) -> SubscriptionResult { let sink = pending.accept().await?; - let msg = SubscriptionMessage::from_json(&vec![[0; 32]])?; - sink.send(msg).await?; + let json = serde_json::value::to_raw_value(&vec![[0; 32]])?; + sink.send(json).await?; Ok(()) } @@ -83,8 +83,8 @@ impl RpcServer for RpcServerImpl { fn s(&self, pending: PendingSubscriptionSink, _keys: Option>) { tokio::spawn(async move { let sink = pending.accept().await.unwrap(); - let msg = SubscriptionMessage::from_json(&vec![[0; 32]]).unwrap(); - sink.send(msg).await.unwrap(); + let json = serde_json::value::to_raw_value(&vec![[0; 32]]).unwrap(); + sink.send(json).await.unwrap(); }); } } diff --git a/examples/examples/server_with_connection_details.rs b/examples/examples/server_with_connection_details.rs index 9291f45c0b..24fc4cdc39 100644 --- a/examples/examples/server_with_connection_details.rs +++ b/examples/examples/server_with_connection_details.rs @@ -29,7 +29,7 @@ use std::net::SocketAddr; use jsonrpsee::core::middleware::{Batch, Notification, Request, RpcServiceT}; use jsonrpsee::core::{SubscriptionResult, async_trait}; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::server::{PendingSubscriptionSink, SubscriptionMessage}; +use jsonrpsee::server::PendingSubscriptionSink; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::{ConnectionId, Extensions}; @@ -93,7 +93,9 @@ impl RpcServer for RpcServerImpl { .get::() .cloned() .ok_or_else(|| ErrorObject::owned(0, "No connection details found", None::<()>))?; - sink.send(SubscriptionMessage::from_json(&conn_id).unwrap()).await?; + let json = serde_json::value::to_raw_value(&conn_id) + .map_err(|e| ErrorObject::owned(0, format!("Failed to serialize connection ID: {e}"), None::<()>))?; + sink.send(json).await?; Ok(()) } @@ -102,7 +104,8 @@ impl RpcServer for RpcServerImpl { tokio::spawn(async move { let sink = pending.accept().await.unwrap(); - sink.send(SubscriptionMessage::from_json(&conn_id).unwrap()).await.unwrap(); + let json = serde_json::value::to_raw_value(&conn_id).unwrap(); + sink.send(json).await.unwrap(); }); Ok(()) } diff --git a/examples/examples/ws_pubsub_broadcast.rs b/examples/examples/ws_pubsub_broadcast.rs index 1e57f09905..df68af5ad0 100644 --- a/examples/examples/ws_pubsub_broadcast.rs +++ b/examples/examples/ws_pubsub_broadcast.rs @@ -33,7 +33,6 @@ use futures::future::{self, Either}; use jsonrpsee::PendingSubscriptionSink; use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; use jsonrpsee::core::middleware::RpcServiceBuilder; -use jsonrpsee::core::server::SubscriptionMessage; use jsonrpsee::rpc_params; use jsonrpsee::server::{RpcModule, Server, ServerConfig}; use jsonrpsee::ws_client::WsClientBuilder; @@ -115,12 +114,12 @@ async fn pipe_from_stream_with_bounded_buffer( // received new item from the stream. Either::Right((Some(Ok(item)), c)) => { - let notif = SubscriptionMessage::from_json(&item)?; + let msg = serde_json::value::to_raw_value(&item)?; // NOTE: this will block until there a spot in the queue // and you might want to do something smarter if it's // critical that "the most recent item" must be sent when it is produced. - if sink.send(notif).await.is_err() { + if sink.send(msg).await.is_err() { break Ok(()); } diff --git a/examples/examples/ws_pubsub_with_params.rs b/examples/examples/ws_pubsub_with_params.rs index 35ddb94c30..39bd77c52f 100644 --- a/examples/examples/ws_pubsub_with_params.rs +++ b/examples/examples/ws_pubsub_with_params.rs @@ -30,7 +30,7 @@ use std::time::Duration; use futures::{Stream, StreamExt}; use jsonrpsee::core::Serialize; use jsonrpsee::core::client::{Subscription, SubscriptionClientT}; -use jsonrpsee::server::{RpcModule, Server, ServerConfig, SubscriptionMessage, TrySendError}; +use jsonrpsee::server::{RpcModule, Server, ServerConfig, TrySendError}; use jsonrpsee::ws_client::WsClientBuilder; use jsonrpsee::{PendingSubscriptionSink, rpc_params}; use tokio::time::interval; @@ -125,7 +125,7 @@ pub async fn pipe_from_stream_and_drop( Some(item) => item, None => break Err(anyhow::anyhow!("Subscription was closed")), }; - let msg = SubscriptionMessage::from_json(&item)?; + let msg = serde_json::value::to_raw_value(&item)?; match sink.try_send(msg) { Ok(_) => (), Err(TrySendError::Closed(_)) => break Err(anyhow::anyhow!("Subscription was closed")), diff --git a/proc-macros/src/lib.rs b/proc-macros/src/lib.rs index fe2946c37c..b2d681cb56 100644 --- a/proc-macros/src/lib.rs +++ b/proc-macros/src/lib.rs @@ -148,7 +148,7 @@ pub(crate) mod visitor; /// - `namespace`: add a prefix to all the methods and subscriptions in this RPC. For example, with namespace `foo` and /// method `spam`, the resulting method name will be `foo_spam`. /// - `namespace_separator`: customize the separator used between namespace and method name. Defaults to `_`. -/// For example, `namespace = "foo", namespace_separator = "."` results in method names like `foo.bar` instead of `foo_bar`. +/// For example, `namespace = "foo", namespace_separator = "."` results in method names like `foo.bar` instead of `foo_bar`. /// - `server_bounds`: replace *all* auto-generated trait bounds with the user-defined ones for the server /// implementation. /// - `client_bounds`: replace *all* auto-generated trait bounds with the user-defined ones for the client @@ -233,7 +233,7 @@ pub(crate) mod visitor; /// mod rpc_impl { /// use jsonrpsee::{proc_macros::rpc, Extensions}; /// use jsonrpsee::server::{PendingSubscriptionSink, SubscriptionMessage, IntoSubscriptionCloseResponse, SubscriptionCloseResponse}; -/// use jsonrpsee::core::{async_trait, RpcResult, SubscriptionResult}; +/// use jsonrpsee::core::{async_trait, RpcResult, SubscriptionResult, to_json_raw_value}; /// /// enum CloseResponse { /// None, @@ -247,7 +247,10 @@ pub(crate) mod visitor; /// CloseResponse::None => SubscriptionCloseResponse::None, /// // Send a close response as an ordinary subscription notification /// // when the subscription is terminated. -/// CloseResponse::Failed => SubscriptionCloseResponse::Notif("failed".into()), +/// CloseResponse::Failed => { +/// let err = to_json_raw_value(&"Failed").unwrap(); +/// SubscriptionCloseResponse::Notif(err.into()) +/// } /// } /// } /// } @@ -332,7 +335,8 @@ pub(crate) mod visitor; /// // as subscription responses. /// async fn sub_override_notif_method(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { /// let mut sink = pending.accept().await?; -/// sink.send("Response_A".into()).await?; +/// let msg = to_json_raw_value(&"Response_A").unwrap(); +/// sink.send(msg).await?; /// Ok(()) /// } /// @@ -340,8 +344,8 @@ pub(crate) mod visitor; /// async fn sub(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { /// let sink = pending.accept().await?; /// -/// let msg1 = SubscriptionMessage::from("Response_A"); -/// let msg2 = SubscriptionMessage::from("Response_B"); +/// let msg1 = to_json_raw_value(&"Response_A").unwrap(); +/// let msg2 = to_json_raw_value(&"Response_B").unwrap(); /// /// sink.send(msg1).await?; /// sink.send(msg2).await?; @@ -357,12 +361,13 @@ pub(crate) mod visitor; /// return CloseResponse::None; /// }; /// -/// if sink.send("Response_A".into()).await.is_ok() { +/// let msg = to_json_raw_value(&"Response_A").unwrap(); +/// +/// if sink.send(msg).await.is_ok() { /// CloseResponse::Failed /// } else { /// CloseResponse::None /// } -/// /// } /// } /// } diff --git a/proc-macros/tests/ui/correct/basic.rs b/proc-macros/tests/ui/correct/basic.rs index dc5180b86b..194f12583c 100644 --- a/proc-macros/tests/ui/correct/basic.rs +++ b/proc-macros/tests/ui/correct/basic.rs @@ -4,9 +4,8 @@ use std::net::SocketAddr; use jsonrpsee::core::client::ClientT; use jsonrpsee::core::params::ArrayParams; -use jsonrpsee::core::{RpcResult, SubscriptionResult, async_trait}; +use jsonrpsee::core::{RpcResult, SubscriptionResult, async_trait, to_json_raw_value}; use jsonrpsee::proc_macros::rpc; -use jsonrpsee::server::SubscriptionMessage; use jsonrpsee::types::ErrorObject; use jsonrpsee::ws_client::*; use jsonrpsee::{Extensions, PendingSubscriptionSink, rpc_params}; @@ -97,15 +96,18 @@ impl RpcServer for RpcServerImpl { async fn sub(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { let sink = pending.accept().await?; - sink.send("Response_A".into()).await?; - sink.send("Response_B".into()).await?; + let msg1 = to_json_raw_value(&"Response_A").unwrap(); + let msg2 = to_json_raw_value(&"Response_B").unwrap(); + + sink.send(msg1).await?; + sink.send(msg2).await?; Ok(()) } async fn sub_with_params(&self, pending: PendingSubscriptionSink, val: u32) -> SubscriptionResult { let sink = pending.accept().await?; - let msg = SubscriptionMessage::from_json(&val)?; + let msg = serde_json::value::to_raw_value(&val).unwrap(); sink.send(msg.clone()).await?; sink.send(msg).await?; @@ -116,7 +118,7 @@ impl RpcServer for RpcServerImpl { async fn sub_with_override_notif_method(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { let sink = pending.accept().await?; - let msg = SubscriptionMessage::from_json(&1)?; + let msg = serde_json::value::to_raw_value(&1).unwrap(); sink.send(msg).await?; Ok(()) @@ -128,7 +130,8 @@ impl RpcServer for RpcServerImpl { .get::() .cloned() .ok_or_else(|| ErrorObject::owned(0, "No connection details found", None::<()>))?; - sink.send(SubscriptionMessage::from_json(&conn_id).unwrap()).await?; + let json = serde_json::value::to_raw_value(&conn_id).unwrap(); + sink.send(json).await?; Ok(()) } } diff --git a/proc-macros/tests/ui/correct/only_server.rs b/proc-macros/tests/ui/correct/only_server.rs index b71571ad34..2ad73ebd90 100644 --- a/proc-macros/tests/ui/correct/only_server.rs +++ b/proc-macros/tests/ui/correct/only_server.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use jsonrpsee::core::{async_trait, RpcResult, SubscriptionResult}; +use jsonrpsee::core::{RpcResult, SubscriptionResult, async_trait, to_json_raw_value}; use jsonrpsee::proc_macros::rpc; use jsonrpsee::server::{PendingSubscriptionSink, ServerBuilder}; @@ -31,8 +31,10 @@ impl RpcServer for RpcServerImpl { async fn sub(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { let sink = pending.accept().await?; - sink.send("Response_A".into()).await?; - sink.send("Response_B".into()).await?; + let msg1 = to_json_raw_value(&"Response_A").unwrap(); + let msg2 = to_json_raw_value(&"Response_B").unwrap(); + sink.send(msg1).await?; + sink.send(msg2).await?; Ok(()) } diff --git a/server/src/server.rs b/server/src/server.rs index bd23c9a9a1..875e562a17 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -43,7 +43,6 @@ use crate::{Extensions, HttpBody, HttpRequest, HttpResponse, LOG_TARGET}; use futures_util::future::{self, Either, FutureExt}; use futures_util::io::{BufReader, BufWriter}; - use hyper::body::Bytes; use hyper_util::rt::{TokioExecutor, TokioIo}; use jsonrpsee_core::id_providers::RandomIntegerIdProvider; @@ -52,7 +51,6 @@ use jsonrpsee_core::server::helpers::prepare_error; use jsonrpsee_core::server::{BoundedSubscriptions, ConnectionId, MethodResponse, MethodSink, Methods}; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_core::{BoxError, JsonRawValue, TEN_MB_SIZE_BYTES}; - use jsonrpsee_types::error::{ BATCHES_NOT_SUPPORTED_CODE, BATCHES_NOT_SUPPORTED_MSG, ErrorCode, reject_too_big_batch_request, }; @@ -1022,7 +1020,7 @@ where let response = match server.receive_request(&request) { Ok(response) => { - let (tx, rx) = mpsc::channel::(this.server_cfg.message_buffer_capacity as usize); + let (tx, rx) = mpsc::channel(this.server_cfg.message_buffer_capacity as usize); let sink = MethodSink::new(tx); // On each method call the `pending_calls` is cloned diff --git a/server/src/tests/helpers.rs b/server/src/tests/helpers.rs index 11610dd20f..5195d4566a 100644 --- a/server/src/tests/helpers.rs +++ b/server/src/tests/helpers.rs @@ -9,7 +9,7 @@ use crate::{ use futures_util::FutureExt; use jsonrpsee_core::server::Methods; -use jsonrpsee_core::{DeserializeOwned, RpcResult, StringError}; +use jsonrpsee_core::{DeserializeOwned, RpcResult, SubscriptionError}; use jsonrpsee_test_utils::TimeoutFutureExt; use jsonrpsee_types::{ErrorObject, ErrorObjectOwned, Response, ResponseSuccess, error::ErrorCode}; use tokio::net::TcpListener; @@ -88,7 +88,7 @@ pub(crate) async fn server_with_handles() -> (SocketAddr, ServerHandle) { }) .unwrap(); module - .register_subscription::, _, _>( + .register_subscription::, _, _>( "subscribe_hello", "subscribe_hello", "unsubscribe_hello", diff --git a/server/src/tests/ws.rs b/server/src/tests/ws.rs index 919163824e..d30d23f161 100644 --- a/server/src/tests/ws.rs +++ b/server/src/tests/ws.rs @@ -31,7 +31,7 @@ use crate::tests::helpers::{Metrics, deser_call, init_logger, server_with_contex use crate::types::SubscriptionId; use crate::{BatchRequestConfig, RegisterMethodError, ServerConfig}; use crate::{RpcModule, ServerBuilder}; -use jsonrpsee_core::server::{SendTimeoutError, SubscriptionMessage}; +use jsonrpsee_core::server::SendTimeoutError; use jsonrpsee_core::traits::IdProvider; use jsonrpsee_test_utils::TimeoutFutureExt; use jsonrpsee_test_utils::helpers::*; @@ -729,8 +729,8 @@ async fn ws_server_backpressure_works() { "unsubscribe_with_backpressure_aggregation", move |_, pending, mut backpressure_tx, _| async move { let sink = pending.accept().await?; - let n = SubscriptionMessage::from_json(&1)?; - let bp = SubscriptionMessage::from_json(&2)?; + let n = serde_json::value::to_raw_value(&1).unwrap(); + let bp = serde_json::value::to_raw_value(&2).unwrap(); let mut msg = n.clone(); diff --git a/server/src/transport/ws.rs b/server/src/transport/ws.rs index ba55a807ab..016a9b6d42 100644 --- a/server/src/transport/ws.rs +++ b/server/src/transport/ws.rs @@ -16,9 +16,9 @@ use jsonrpsee_core::middleware::{RpcServiceBuilder, RpcServiceT}; use jsonrpsee_core::server::{BoundedSubscriptions, MethodResponse, MethodSink, Methods}; use jsonrpsee_types::Id; use jsonrpsee_types::error::{ErrorCode, reject_too_big_request}; +use serde_json::value::RawValue; use soketto::connection::Error as SokettoError; use soketto::data::ByteSlice125; - use tokio::sync::{mpsc, oneshot}; use tokio::time::{interval, interval_at}; use tokio_stream::wrappers::ReceiverStream; @@ -34,8 +34,8 @@ enum Incoming { Pong, } -pub(crate) async fn send_message(sender: &mut Sender, response: String) -> Result<(), SokettoError> { - sender.send_text_owned(response).await?; +pub(crate) async fn send_message(sender: &mut Sender, response: Box) -> Result<(), SokettoError> { + sender.send_text_owned(String::from(Box::::from(response))).await?; sender.flush().await } @@ -56,7 +56,7 @@ pub(crate) struct BackgroundTaskParams { pub(crate) ws_receiver: Receiver, pub(crate) rpc_service: S, pub(crate) sink: MethodSink, - pub(crate) rx: mpsc::Receiver, + pub(crate) rx: mpsc::Receiver>, pub(crate) pending_calls_completed: mpsc::Receiver<()>, pub(crate) on_session_close: Option, pub(crate) extensions: http::Extensions, @@ -167,7 +167,7 @@ where let (json, mut on_close, _) = rp.into_parts(); // The connection is closed, just quit. - if sink.send(String::from(Box::::from(json))).await.is_err() { + if sink.send(json).await.is_err() { return; } @@ -195,7 +195,7 @@ where /// A task that waits for new messages via the `rx channel` and sends them out on the `WebSocket`. async fn send_task( - rx: mpsc::Receiver, + rx: mpsc::Receiver>, mut ws_sender: Sender, ping_config: Option, stop: oneshot::Receiver<()>, @@ -429,7 +429,7 @@ where match server.receive_request(&req) { Ok(response) => { - let (tx, rx) = mpsc::channel::(server_cfg.message_buffer_capacity as usize); + let (tx, rx) = mpsc::channel(server_cfg.message_buffer_capacity as usize); let sink = MethodSink::new(tx); // On each method call the `pending_calls` is cloned diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 9c89343e9e..c3780a471a 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -7,6 +7,9 @@ edition.workspace = true license.workspace = true publish = false +[lints.clippy] +manual_async_fn = { level = "allow", priority = -1 } + [dev-dependencies] anyhow = { workspace = true } fast-socks5 = { workspace = true } diff --git a/tests/proc-macro-core/src/lib.rs b/tests/proc-macro-core/src/lib.rs index cb93b49140..0ffb2165ab 100644 --- a/tests/proc-macro-core/src/lib.rs +++ b/tests/proc-macro-core/src/lib.rs @@ -1,9 +1,9 @@ //! Test module for the proc-macro API to make sure that it compiles with the core features. -use jsonrpsee::core::{SubscriptionResult, async_trait}; +use jsonrpsee::PendingSubscriptionSink; +use jsonrpsee::core::{SubscriptionResult, async_trait, to_json_raw_value}; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::ErrorObjectOwned; -use jsonrpsee::{PendingSubscriptionSink, SubscriptionMessage}; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] @@ -52,7 +52,8 @@ impl ApiServer for () { async fn sub(&self, pending: PendingSubscriptionSink, _: PubSubKind, _: PubSubParams) -> SubscriptionResult { let sink = pending.accept().await?; - sink.send(SubscriptionMessage::from("msg")).await?; + let msg = to_json_raw_value("msg")?; + sink.send(msg).await?; Ok(()) } diff --git a/tests/tests/helpers.rs b/tests/tests/helpers.rs index 11a600b81a..8e5957a335 100644 --- a/tests/tests/helpers.rs +++ b/tests/tests/helpers.rs @@ -38,8 +38,8 @@ use futures::{SinkExt, Stream, StreamExt}; use jsonrpsee::core::middleware::{Batch, Notification, RpcServiceBuilder, RpcServiceT}; use jsonrpsee::server::middleware::http::ProxyGetRequestLayer; use jsonrpsee::server::{ - ConnectionGuard, PendingSubscriptionSink, RpcModule, Server, ServerBuilder, ServerHandle, SubscriptionMessage, - TrySendError, serve_with_graceful_shutdown, stop_channel, + ConnectionGuard, PendingSubscriptionSink, RpcModule, Server, ServerBuilder, ServerHandle, TrySendError, + serve_with_graceful_shutdown, stop_channel, }; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; use jsonrpsee::{Methods, SubscriptionCloseResponse}; @@ -329,7 +329,7 @@ pub async fn pipe_from_stream_and_drop( Some(item) => item, None => return Err(anyhow::anyhow!("Subscription executed successful")), }; - let msg = SubscriptionMessage::from_json(&item)?; + let msg = serde_json::value::to_raw_value(&item)?; match sink.try_send(msg) { Ok(_) => (), diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 7c6d2741cc..fde43082ec 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -46,8 +46,7 @@ use hyper_util::rt::TokioExecutor; use jsonrpsee::core::client::SubscriptionCloseReason; use jsonrpsee::core::client::{ClientT, Error, IdKind, Subscription, SubscriptionClientT}; use jsonrpsee::core::params::{ArrayParams, BatchRequestBuilder}; -use jsonrpsee::core::server::SubscriptionMessage; -use jsonrpsee::core::{JsonValue, StringError}; +use jsonrpsee::core::{JsonValue, SubscriptionError}; use jsonrpsee::http_client::HttpClientBuilder; use jsonrpsee::server::middleware::http::HostFilterLayer; use jsonrpsee::server::{ConnectionGuard, ServerBuilder, ServerConfig, ServerHandle}; @@ -532,7 +531,7 @@ async fn ws_server_should_stop_subscription_after_client_drop() { "unsubscribe_hello", |_, pending, mut tx, _| async move { let sink = pending.accept().await?; - let msg = SubscriptionMessage::from_json(&1)?; + let msg = serde_json::value::to_raw_value(&1)?; sink.send(msg).await?; sink.closed().await; let send_back = Arc::make_mut(&mut tx); @@ -1343,7 +1342,7 @@ async fn response_payload_async_api_works() { if let Some((sink, close)) = ctx.lock().await.take() { for idx in 0..3 { - let msg = SubscriptionMessage::from_json(&idx).unwrap(); + let msg = serde_json::value::to_raw_value(&idx).unwrap(); _ = sink.send(msg).await; } drop(sink); @@ -1356,7 +1355,7 @@ async fn response_payload_async_api_works() { .unwrap(); module - .register_subscription::, _, _>( + .register_subscription::, _, _>( "sub", "s", "unsub", diff --git a/tests/tests/proc_macros.rs b/tests/tests/proc_macros.rs index e746bc1ab7..8d5f8a47b9 100644 --- a/tests/tests/proc_macros.rs +++ b/tests/tests/proc_macros.rs @@ -43,9 +43,7 @@ use jsonrpsee::ws_client::*; use serde_json::json; mod rpc_impl { - use jsonrpsee::core::server::{ - IntoSubscriptionCloseResponse, PendingSubscriptionSink, SubscriptionCloseResponse, SubscriptionMessage, - }; + use jsonrpsee::core::server::{IntoSubscriptionCloseResponse, PendingSubscriptionSink, SubscriptionCloseResponse}; use jsonrpsee::core::{SubscriptionResult, async_trait}; use jsonrpsee::proc_macros::rpc; use jsonrpsee::types::{ErrorObject, ErrorObjectOwned}; @@ -160,24 +158,27 @@ mod rpc_impl { async fn sub(&self, pending: PendingSubscriptionSink) -> SubscriptionResult { let sink = pending.accept().await?; - sink.send("Response_A".into()).await?; - sink.send("Response_B".into()).await?; + let msg1 = serde_json::value::to_raw_value(&"Response_A").unwrap(); + let msg2 = serde_json::value::to_raw_value(&"Response_B").unwrap(); + sink.send(msg1).await?; + sink.send(msg2).await?; Ok(()) } async fn sub_with_params(&self, pending: PendingSubscriptionSink, val: u32) -> SubscriptionResult { let sink = pending.accept().await?; - let msg = SubscriptionMessage::from_json(&val)?; - sink.send(msg.clone()).await?; - sink.send(msg).await?; + let json = serde_json::value::to_raw_value(&val).unwrap(); + sink.send(json.clone()).await?; + sink.send(json).await?; Ok(()) } async fn sub_not_result(&self, pending: PendingSubscriptionSink) { let sink = pending.accept().await.unwrap(); - sink.send("lo".into()).await.unwrap(); + let msg = serde_json::value::to_raw_value("lo").unwrap(); + sink.send(msg).await.unwrap(); } async fn sub_custom_ret(&self, _pending: PendingSubscriptionSink, _x: usize) -> CustomSubscriptionRet { @@ -187,7 +188,8 @@ mod rpc_impl { fn sync_sub(&self, pending: PendingSubscriptionSink) { tokio::spawn(async move { let sink = pending.accept().await.unwrap(); - sink.send("hello".into()).await.unwrap(); + let msg = serde_json::value::to_raw_value("hello").unwrap(); + sink.send(msg).await.unwrap(); }); } diff --git a/tests/tests/rpc_module.rs b/tests/tests/rpc_module.rs index ccbc8fb1cb..e5e09f3822 100644 --- a/tests/tests/rpc_module.rs +++ b/tests/tests/rpc_module.rs @@ -33,11 +33,12 @@ use std::time::Duration; use futures::StreamExt; use helpers::{init_logger, pipe_from_stream_and_drop}; -use jsonrpsee::core::EmptyServerParams; +use jsonrpsee::core::{EmptyServerParams, SubscriptionError}; use jsonrpsee::core::{RpcResult, server::*}; use jsonrpsee::types::error::{ErrorCode, ErrorObject, INVALID_PARAMS_MSG, PARSE_ERROR_CODE}; use jsonrpsee::types::{ErrorObjectOwned, Response, ResponsePayload}; use serde::{Deserialize, Serialize}; +use serde_json::value::RawValue; use tokio::sync::mpsc; use tokio::time::interval; use tokio_stream::wrappers::IntervalStream; @@ -245,7 +246,7 @@ async fn subscribing_without_server() { while let Some(letter) = stream_data.pop() { tracing::debug!("This is your friendly subscription sending data."); - let msg = SubscriptionMessage::from_json(&letter).unwrap(); + let msg = serde_json::value::to_raw_value(&letter).unwrap(); sink.send(msg).await.unwrap(); tokio::time::sleep(std::time::Duration::from_millis(500)).await; } @@ -273,7 +274,7 @@ async fn close_test_subscribing_without_server() { module .register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _, _| async move { let sink = pending.accept().await?; - let msg = SubscriptionMessage::from_json(&"lo")?; + let msg = serde_json::value::to_raw_value("lo")?; // make sure to only send one item sink.send(msg.clone()).await?; @@ -324,7 +325,7 @@ async fn subscribing_without_server_bad_params() { }; let sink = pending.accept().await?; - let msg = SubscriptionMessage::from_json(&p)?; + let msg = serde_json::value::to_raw_value(&p)?; sink.send(msg).await?; Ok(()) @@ -348,7 +349,7 @@ async fn subscribing_without_server_indicates_close() { let sink = pending.accept().await?; for m in 0..5 { - let msg = SubscriptionMessage::from_json(&m)?; + let msg = serde_json::value::to_raw_value(&m)?; sink.send(msg).await?; } @@ -452,7 +453,7 @@ async fn bounded_subscription_works() { let mut buf = VecDeque::new(); while let Some(n) = stream.next().await { - let msg = SubscriptionMessage::from_json(&n).expect("usize infallible; qed"); + let msg = serde_json::value::to_raw_value(&n).expect("usize serialize infallible; qed"); match sink.try_send(msg) { Err(TrySendError::Closed(_)) => panic!("This is a bug"), @@ -499,8 +500,8 @@ async fn bounded_subscription_works() { } #[tokio::test] -async fn serialize_sub_error_adds_extra_string_quotes() { - #[derive(Serialize)] +async fn serialize_sub_error_json() { + #[derive(Serialize, Deserialize)] struct MyError { number: u32, address: String, @@ -513,28 +514,57 @@ async fn serialize_sub_error_adds_extra_string_quotes() { .register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _, _| async move { let _ = pending.accept().await?; tokio::time::sleep(std::time::Duration::from_millis(100)).await; - - let err = serde_json::to_string(&MyError { number: 11, address: "State street 1337".into() }).unwrap(); - Err(err.into()) + let json = + serde_json::value::to_raw_value(&MyError { number: 11, address: "State street 1337".into() }).unwrap(); + Err(SubscriptionError::from_json(json)) }) .unwrap(); - let (rp, mut stream) = module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","id":0}"#, 1).await.unwrap(); - let resp = serde_json::from_str::>(rp.get()).unwrap(); - let sub_resp = stream.recv().await.unwrap(); + let (sub_id, notif) = + run_subscription(r#"{"jsonrpc":"2.0","method":"my_sub","params":[true],"id":0}"#, &module).await; - let resp = match resp.payload { - ResponsePayload::Success(val) => val, - _ => panic!("Expected valid response"), - }; + assert_eq!( + format!( + r#"{{"jsonrpc":"2.0","method":"my_sub","params":{{"subscription":{},"error":{{"number":11,"address":"State street 1337"}}}}}}"#, + sub_id, + ), + notif.get() + ); + + assert!(serde_json::from_str::>(notif.get()).is_ok()); +} + +#[tokio::test] +async fn serialize_sub_error_str() { + #[derive(Serialize, Deserialize)] + struct MyError { + number: u32, + address: String, + } + + init_logger(); + + let mut module = RpcModule::new(()); + module + .register_subscription("my_sub", "my_sub", "my_unsub", |_, pending, _, _| async move { + let _ = pending.accept().await?; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let s = serde_json::to_string(&MyError { number: 11, address: "State street 1337".into() }).unwrap(); + Err(s.into()) + }) + .unwrap(); + + let (sub_id, notif) = run_subscription(r#"{"jsonrpc":"2.0","method":"my_sub","id":0}"#, &module).await; assert_eq!( format!( - r#"{{"jsonrpc":"2.0","method":"my_sub","params":{{"subscription":{},"error":"{{"number":11,"address":"State street 1337"}}"}}}}"#, - resp, + r#"{{"jsonrpc":"2.0","method":"my_sub","params":{{"subscription":{},"error":"{{\"number\":11,\"address\":\"State street 1337\"}}"}}}}"#, + sub_id, ), - sub_resp + notif.get() ); + + assert!(serde_json::from_str::>(notif.get()).is_err()); } #[tokio::test] @@ -556,25 +586,20 @@ async fn subscription_close_response_works() { }; let _sink = pending.accept().await.unwrap(); + let msg = serde_json::value::to_raw_value(&x).unwrap(); - SubscriptionCloseResponse::Notif(SubscriptionMessage::from_json(&x).unwrap()) + SubscriptionCloseResponse::Notif(msg.into()) }) .unwrap(); // ensure subscription with raw_json_request works. { - let (rp, mut stream) = - module.raw_json_request(r#"{"jsonrpc":"2.0","method":"my_sub","params":[1],"id":0}"#, 1).await.unwrap(); - let resp = serde_json::from_str::>(rp.get()).unwrap(); - - let sub_id = match resp.payload { - ResponsePayload::Success(val) => val, - _ => panic!("Expected valid response"), - }; + let (sub_id, notif) = + run_subscription(r#"{"jsonrpc":"2.0","method":"my_sub","params":[1],"id":0}"#, &module).await; assert_eq!( format!(r#"{{"jsonrpc":"2.0","method":"my_sub","params":{{"subscription":{},"result":1}}}}"#, sub_id), - stream.recv().await.unwrap() + notif.get() ); } @@ -651,3 +676,16 @@ async fn conn_id_in_rpc_module_without_server() { matches!(module.call::<_, usize>("get_conn_id", EmptyServerParams::new()).await, Ok(conn_id) if conn_id == 0) ); } + +async fn run_subscription(req: &str, rpc: &RpcModule<()>) -> (u64, Box) { + let (rp, mut stream) = rpc.raw_json_request(req, 1).await.unwrap(); + let resp = serde_json::from_str::>(rp.get()).unwrap(); + let sub_resp = stream.recv().await.unwrap(); + + let sub_id = match resp.payload { + ResponsePayload::Success(val) => val, + _ => panic!("Expected valid response"), + }; + + (*sub_id, sub_resp) +} diff --git a/types/src/request.rs b/types/src/request.rs index ece555fc38..78432da789 100644 --- a/types/src/request.rs +++ b/types/src/request.rs @@ -236,7 +236,7 @@ mod test { fn serialize_call() { let method = "subtract"; let id = Id::Number(1); // It's enough to check one variant, since the type itself also has tests. - let params = Some(RawValue::from_string("[42,23]".into()).unwrap()); + let params = Some(serde_json::value::to_raw_value(&[42, 23]).unwrap()); let test_vector: &[(&'static str, Option<_>, Option<_>, &'static str)] = &[ // With all fields set.