|
| 1 | +use core::future::ready; |
| 2 | + |
| 3 | +use bytes::Bytes; |
| 4 | +use frunk::{HCons, HNil}; |
| 5 | +use futures::FutureExt; |
| 6 | +use harpc_codec::encode::ErrorEncoder; |
| 7 | +use harpc_tower::{ |
| 8 | + body::{Body, controlled::Controlled, full::Full}, |
| 9 | + request::Request, |
| 10 | + response::{Parts, Response}, |
| 11 | +}; |
| 12 | +use harpc_types::{response_kind::ResponseKind, service::ServiceId, version::Version}; |
| 13 | +use tower::{Service, ServiceExt, util::Oneshot}; |
| 14 | + |
| 15 | +use crate::error::NotFound; |
| 16 | + |
| 17 | +#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] |
| 18 | +pub struct Handler<S> { |
| 19 | + service: ServiceId, |
| 20 | + version: Version, |
| 21 | + |
| 22 | + inner: S, |
| 23 | +} |
| 24 | + |
| 25 | +impl<S> Handler<S> { |
| 26 | + pub(crate) const fn new<Svc>(inner: S) -> Self |
| 27 | + where |
| 28 | + Svc: harpc_service::Service, |
| 29 | + { |
| 30 | + Self { |
| 31 | + service: Svc::ID, |
| 32 | + version: Svc::VERSION, |
| 33 | + |
| 34 | + inner, |
| 35 | + } |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +/// Route requests to the appropriate handler based on the request's service and version. |
| 40 | +/// |
| 41 | +/// This is essentially a type-level linked list of handlers, it's boxed equivalent is [`Steer`], |
| 42 | +/// but unlike [`Steer`] it doesn't require the same type for all handlers and a separation of both |
| 43 | +/// the meta information (service id and version) and the actual handler. Unlike [`Steer`], it also |
| 44 | +/// doesn't require `&mut self` access, which allows for more granular cloning. |
| 45 | +/// |
| 46 | +/// # Design motivations |
| 47 | +/// |
| 48 | +/// The reason why we essentially require `Clone`/`Copy` for the handlers is that once the route is |
| 49 | +/// constructed it needs to be available for each request that happens, now, there are multiple ways |
| 50 | +/// to achieve this. |
| 51 | +/// |
| 52 | +/// One way would be to simply clone the entire [`Router`] during each request, but that has the |
| 53 | +/// downside of potentially cloning a lot of data that isn't actually required for each request, |
| 54 | +/// making the addition of new handlers slower and slower. |
| 55 | +/// The other solution instead would be to `Mutex` the entire [`Router`], but that would make the |
| 56 | +/// entire [`Router`] essentially single-threaded, which is not ideal. |
| 57 | +/// |
| 58 | +/// This takes a middle ground, which is similar in implementation to other tower-based frameworks, |
| 59 | +/// such as axum. The inner routes are stored in an `Arc<T>`, which is cheap to clone, but means we |
| 60 | +/// need to require `&self` during routing. Once a route was chosen, we simply clone the service |
| 61 | +/// (and oneshot) the service. This keeps the cloned data minimal and allows for multi-threading. |
| 62 | +/// |
| 63 | +/// The downside is that we're unable to keep any state in a service delegate that's persisted |
| 64 | +/// across invocations. To signal this, `ServiceDelegate` takes `self` and consumes it (even though |
| 65 | +/// it isn't strictly needed), as well as the use of sessions. To store any information across |
| 66 | +/// calls, one must make use of smart pointers, such as `Arc`. |
| 67 | +/// |
| 68 | +/// [`Router`]: crate::router::Router |
| 69 | +/// [`Steer`]: https://docs.rs/tower/latest/tower/steer/struct.Steer.html |
| 70 | +pub trait Route<ReqBody, C> { |
| 71 | + type ResponseBody: Body<Control: AsRef<ResponseKind>, Error = !>; |
| 72 | + type Future: Future<Output = Response<Self::ResponseBody>>; |
| 73 | + |
| 74 | + fn call(&self, request: Request<ReqBody>, codec: C) -> Self::Future |
| 75 | + where |
| 76 | + ReqBody: Body<Control = !, Error: Send + Sync> + Send + Sync, |
| 77 | + C: ErrorEncoder + Send + Sync; |
| 78 | +} |
| 79 | + |
| 80 | +// The clone requirement might seem odd here, but is the same as in axum's router implementation. |
| 81 | +// see: https://docs.rs/axum/latest/src/axum/routing/route.rs.html#45 |
| 82 | +impl<C, Svc, Tail, ReqBody, ResBody> Route<ReqBody, C> for HCons<Handler<Svc>, Tail> |
| 83 | +where |
| 84 | + Svc: Service<Request<ReqBody>, Response = Response<ResBody>, Error = !> + Clone, |
| 85 | + Tail: Route<ReqBody, C>, |
| 86 | + ResBody: Body<Control: AsRef<ResponseKind>, Error = !>, |
| 87 | +{ |
| 88 | + // cannot use `impl Future` here, as it would require additional constraints on the associated |
| 89 | + // type, that are already present on the `call` method. |
| 90 | + type Future = futures::future::Either< |
| 91 | + futures::future::Map< |
| 92 | + Oneshot<Svc, Request<ReqBody>>, |
| 93 | + fn(Result<Response<ResBody>, !>) -> Response<Self::ResponseBody>, |
| 94 | + >, |
| 95 | + futures::future::Map< |
| 96 | + Tail::Future, |
| 97 | + fn(Response<Tail::ResponseBody>) -> Response<Self::ResponseBody>, |
| 98 | + >, |
| 99 | + >; |
| 100 | + type ResponseBody = harpc_tower::either::Either<ResBody, Tail::ResponseBody>; |
| 101 | + |
| 102 | + fn call(&self, request: Request<ReqBody>, codec: C) -> Self::Future |
| 103 | + where |
| 104 | + ReqBody: Body<Control = !, Error: Send + Sync> + Send + Sync, |
| 105 | + C: ErrorEncoder + Send + Sync, |
| 106 | + { |
| 107 | + let requirement = self.head.version.into_requirement(); |
| 108 | + |
| 109 | + if self.head.service == request.service().id |
| 110 | + && requirement.compatible(request.service().version) |
| 111 | + { |
| 112 | + let service = self.head.inner.clone(); |
| 113 | + |
| 114 | + futures::future::Either::Left( |
| 115 | + service |
| 116 | + .oneshot(request) |
| 117 | + .map(|Ok(response)| response.map_body(harpc_tower::either::Either::Left)), |
| 118 | + ) |
| 119 | + } else { |
| 120 | + futures::future::Either::Right( |
| 121 | + self.tail |
| 122 | + .call(request, codec) |
| 123 | + .map(|response| response.map_body(harpc_tower::either::Either::Right)), |
| 124 | + ) |
| 125 | + } |
| 126 | + } |
| 127 | +} |
| 128 | + |
| 129 | +impl<C, ReqBody> Route<ReqBody, C> for HNil { |
| 130 | + type Future = core::future::Ready<Response<Self::ResponseBody>>; |
| 131 | + type ResponseBody = Controlled<ResponseKind, Full<Bytes>>; |
| 132 | + |
| 133 | + fn call(&self, request: Request<ReqBody>, codec: C) -> Self::Future |
| 134 | + where |
| 135 | + ReqBody: Body<Control = !, Error: Send + Sync> + Send + Sync, |
| 136 | + C: ErrorEncoder + Send + Sync, |
| 137 | + { |
| 138 | + let error = NotFound { |
| 139 | + service: request.service().id, |
| 140 | + version: request.service().version, |
| 141 | + }; |
| 142 | + |
| 143 | + let session = request.session(); |
| 144 | + |
| 145 | + let error = codec.encode_error(error); |
| 146 | + |
| 147 | + ready(Response::from_error(Parts::new(session), error)) |
| 148 | + } |
| 149 | +} |
0 commit comments