Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion core-client/transports/src/transports/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,16 @@ mod tests {
}

fn io() -> IoHandler {
use jsonrpc_core::Result;

let mut io = IoHandler::default();
io.add_sync_method("hello", |params: Params| match params.parse::<(String,)>() {
Ok((msg,)) => Ok(Value::String(format!("hello {}", msg))),
_ => Ok(Value::String("world".into())),
});
io.add_sync_method("fail", |_: Params| Err(Error::new(ErrorCode::ServerError(-34))));
io.add_sync_method("fail", |_: Params| -> Result<i64> {
Err(Error::new(ErrorCode::ServerError(-34)))
});
io.add_notification("notify", |params: Params| {
let (value,) = params.parse::<(u64,)>().expect("expected one u64 as param");
assert_eq!(value, 12);
Expand Down
2 changes: 1 addition & 1 deletion core/examples/middlewares.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ impl Middleware<Meta> for MyMiddleware {
fn on_request<F, X>(&self, request: Request, meta: Meta, next: F) -> Either<Self::Future, X>
where
F: FnOnce(Request, Meta) -> X + Send,
X: Future<Output = Option<Response>> + Send + 'static,
X: Future<Output = Option<SerializedResponse>> + Send + 'static,
{
let start = Instant::now();
let request_number = self.0.fetch_add(1, atomic::Ordering::SeqCst);
Expand Down
58 changes: 42 additions & 16 deletions core/src/calls.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use crate::types::{Error, Params, Value};
use crate::types::{Error, Id, Params, SerializedOutput, Value, Version};
use crate::BoxFuture;
use futures_util::{self, future, FutureExt};
use serde::Serialize;
use std::fmt;
use std::future::Future;
use std::sync::Arc;
Expand Down Expand Up @@ -30,23 +32,23 @@ impl<T, E> WrapFuture<T, E> for BoxFuture<Result<T, E>> {
}

/// A synchronous or asynchronous method.
pub trait RpcMethodSync: Send + Sync + 'static {
pub trait RpcMethodSync<R = Value>: Send + Sync + 'static {
/// Call method
fn call(&self, params: Params) -> BoxFuture<crate::Result<Value>>;
fn call(&self, params: Params) -> BoxFuture<crate::Result<R>>;
}

/// Asynchronous Method
pub trait RpcMethodSimple: Send + Sync + 'static {
pub trait RpcMethodSimple<R = Value>: Send + Sync + 'static {
/// Output future
type Out: Future<Output = Result<Value, Error>> + Send;
type Out: Future<Output = Result<R, Error>> + Send;
/// Call method
fn call(&self, params: Params) -> Self::Out;
}

/// Asynchronous Method with Metadata
pub trait RpcMethod<T: Metadata>: Send + Sync + 'static {
pub trait RpcMethod<T: Metadata, R = Value>: Send + Sync + 'static {
/// Call method
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<Value>>;
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<R>>;
}

/// Notification
Expand All @@ -61,11 +63,24 @@ pub trait RpcNotification<T: Metadata>: Send + Sync + 'static {
fn execute(&self, params: Params, meta: T);
}

pub trait RpcMethodWithSerializedOutput<T: Metadata>: Send + Sync + 'static {
fn call(&self, params: Params, meta: T, jsonrpc: Option<Version>, id: Id) -> BoxFuture<Option<SerializedOutput>>;
}

pub fn rpc_wrap<T: Metadata, R: Serialize + Send + 'static, F: RpcMethod<T, R>>(
f: F,
) -> Arc<dyn RpcMethodWithSerializedOutput<T>> {
Arc::new(move |params: Params, meta: T, jsonrpc: Option<Version>, id: Id| {
let result = f.call(params, meta);
result.then(move |r| future::ready(Some(SerializedOutput::from(r, id, jsonrpc))))
})
}

/// Possible Remote Procedures with Metadata
#[derive(Clone)]
pub enum RemoteProcedure<T: Metadata> {
/// A method call
Method(Arc<dyn RpcMethod<T>>),
Method(Arc<dyn RpcMethodWithSerializedOutput<T>>),
/// A notification
Notification(Arc<dyn RpcNotification<T>>),
/// An alias to other method,
Expand All @@ -83,23 +98,23 @@ impl<T: Metadata> fmt::Debug for RemoteProcedure<T> {
}
}

impl<F: Send + Sync + 'static, X: Send + 'static> RpcMethodSimple for F
impl<F: Send + Sync + 'static, X: Send + 'static, R> RpcMethodSimple<R> for F
where
F: Fn(Params) -> X,
X: Future<Output = Result<Value, Error>>,
X: Future<Output = Result<R, Error>>,
{
type Out = X;
fn call(&self, params: Params) -> Self::Out {
self(params)
}
}

impl<F: Send + Sync + 'static, X: Send + 'static> RpcMethodSync for F
impl<F: Send + Sync + 'static, X: Send + 'static, R> RpcMethodSync<R> for F
where
F: Fn(Params) -> X,
X: WrapFuture<Value, Error>,
X: WrapFuture<R, Error>,
{
fn call(&self, params: Params) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params) -> BoxFuture<crate::Result<R>> {
self(params).into_future()
}
}
Expand All @@ -113,13 +128,13 @@ where
}
}

impl<F: Send + Sync + 'static, X: Send + 'static, T> RpcMethod<T> for F
impl<F: Send + Sync + 'static, X: Send + 'static, T, R> RpcMethod<T, R> for F
where
T: Metadata,
F: Fn(Params, T) -> X,
X: Future<Output = Result<Value, Error>>,
X: Future<Output = Result<R, Error>>,
{
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params, meta: T) -> BoxFuture<crate::Result<R>> {
Box::pin(self(params, meta))
}
}
Expand All @@ -133,3 +148,14 @@ where
self(params, meta)
}
}

impl<F: Send + Sync + 'static, X: Send + 'static, T> RpcMethodWithSerializedOutput<T> for F
where
T: Metadata,
F: Fn(Params, T, Option<Version>, Id) -> X,
X: Future<Output = Option<SerializedOutput>>,
{
fn call(&self, params: Params, meta: T, jsonrpc: Option<Version>, id: Id) -> BoxFuture<Option<SerializedOutput>> {
Box::pin(self(params, meta, jsonrpc, id))
}
}
31 changes: 17 additions & 14 deletions core/src/delegates.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,28 @@
//! Delegate rpc calls

use serde::Serialize;
use std::collections::HashMap;
use std::future::Future;
use std::sync::Arc;

use crate::calls::{Metadata, RemoteProcedure, RpcMethod, RpcNotification};
use crate::types::{Error, Params, Value};
use crate::calls::{rpc_wrap, Metadata, RemoteProcedure, RpcMethod, RpcNotification};
use crate::types::{Error, Params};
use crate::BoxFuture;

struct DelegateAsyncMethod<T, F> {
delegate: Arc<T>,
closure: F,
}

impl<T, M, F, I> RpcMethod<M> for DelegateAsyncMethod<T, F>
impl<T, M, F, I, R> RpcMethod<M, R> for DelegateAsyncMethod<T, F>
where
M: Metadata,
F: Fn(&T, Params) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
T: Send + Sync + 'static,
F: Send + Sync + 'static,
{
fn call(&self, params: Params, _meta: M) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params, _meta: M) -> BoxFuture<crate::Result<R>> {
let closure = &self.closure;
Box::pin(closure(&self.delegate, params))
}
Expand All @@ -32,15 +33,15 @@ struct DelegateMethodWithMeta<T, F> {
closure: F,
}

impl<T, M, F, I> RpcMethod<M> for DelegateMethodWithMeta<T, F>
impl<T, M, F, I, R> RpcMethod<M, R> for DelegateMethodWithMeta<T, F>
where
M: Metadata,
F: Fn(&T, Params, M) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
T: Send + Sync + 'static,
F: Send + Sync + 'static,
{
fn call(&self, params: Params, meta: M) -> BoxFuture<crate::Result<Value>> {
fn call(&self, params: Params, meta: M) -> BoxFuture<crate::Result<R>> {
let closure = &self.closure;
Box::pin(closure(&self.delegate, params, meta))
}
Expand Down Expand Up @@ -112,31 +113,33 @@ where
}

/// Adds async method to the delegate.
pub fn add_method<F, I>(&mut self, name: &str, method: F)
pub fn add_method<F, I, R>(&mut self, name: &str, method: F)
where
F: Fn(&T, Params) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
F: Send + Sync + 'static,
R: Serialize + Send + 'static,
{
self.methods.insert(
name.into(),
RemoteProcedure::Method(Arc::new(DelegateAsyncMethod {
RemoteProcedure::Method(rpc_wrap(DelegateAsyncMethod {
delegate: self.delegate.clone(),
closure: method,
})),
);
}

/// Adds async method with metadata to the delegate.
pub fn add_method_with_meta<F, I>(&mut self, name: &str, method: F)
pub fn add_method_with_meta<F, I, R>(&mut self, name: &str, method: F)
where
F: Fn(&T, Params, M) -> I,
I: Future<Output = Result<Value, Error>> + Send + 'static,
I: Future<Output = Result<R, Error>> + Send + 'static,
F: Send + Sync + 'static,
R: Serialize + Send + 'static,
{
self.methods.insert(
name.into(),
RemoteProcedure::Method(Arc::new(DelegateMethodWithMeta {
RemoteProcedure::Method(rpc_wrap(DelegateMethodWithMeta {
delegate: self.delegate.clone(),
closure: method,
})),
Expand Down
Loading