Custom retry policy with tower but by sticked!relaxation of restrictions
#2392
-
Feature RequestMotivationCustom retry policy with tower inside axum (version 0.7.x) , but by sticked! Because the requirement was successfully tested on older versions of axum(version 0.6.20) use axum::{
extract::{Request, FromRequest},
response::Response,
body::{
Body,
},
debug_handler,
};
use http::StatusCode;
use tower::builder::ServiceBuilder;
use tower::util::{MapRequestLayer as TowerMapRequestLayer, MapRequest as TowerMapRequest};
use tower::retry::{Retry, RetryLayer};
#[derive(Clone, Debug)]
struct ArcReq<T>(pub Arc<T>);
#[async_trait]
impl <S> FromRequest<S> for ArcReq<Request>
where
Request: FromRequest<S>,
S: Send + Sync,
{
type Rejection = Infallible;
async fn from_request(req: Request, state: &S) -> Result<Self, Self::Rejection> {
Ok(ArcReq(Arc::new(req)))
}
}
use tower::retry::Policy;
use tower_http::classify::{
ClassifyResponse, ClassifiedResponse, StatusInRangeAsFailures, StatusInRangeFailureClass
};
trait IsRetryable {
fn is_retryable(&self) -> bool;
}
impl IsRetryable for StatusInRangeFailureClass {
fn is_retryable(&self) -> bool {
match self {
Self::StatusCode(code) => code.as_u16() == 500,
Self::Error(_) => false,
}
}
}
#[derive(Clone)]
struct RetryBasedOnClassification<C> {
classifier: C,
attempt_count: u8,
// ...
}
#[allow(unused)]
impl <C> RetryBasedOnClassification<C>
where
C: ClassifyResponse + Clone,
C::FailureClass: IsRetryable,
{
fn new(classifier: C, retry_count: u8) -> Self {
Self {
classifier,
attempt_count: retry_count
}
}
}
impl <ReqB, ResB, E, C> Policy<Arc<http::Request<ReqB>>, http::Response<ResB>, E> for RetryBasedOnClassification<C>
where
C: ClassifyResponse + Clone,
C::FailureClass: IsRetryable,
E: std::error::Error + 'static,
ReqB: http_body::Body,
ResB: http_body::Body,
{
type Future = futures::future::Ready<Self>;
fn retry(&self, req: &Arc<http::Request<ReqB>>, result: Result<&http::Response<ResB>, &E>) -> Option<Self::Future> {
match result {
Ok(res) => {
if let ClassifiedResponse::Ready(class) = self.classifier.clone().classify_response(res) {
if class.err()?.is_retryable() && self.attempt_count > 0 {
println!("current retry-count: {}", self.attempt_count);
let mut self_clone = self.clone();
self_clone.attempt_count -= 1;
return Some(futures::future::ready(self_clone));
}
};
None
},
Err(err) => self
.classifier
.clone()
.classify_error(err)
.is_retryable()
.then(|| futures::future::ready(self.clone())),
}
}
fn clone_request(&self, req: &Arc<http::Request<ReqB>>) -> Option<Arc<http::Request<ReqB>>> {
Some(req.clone())
}
}
// note: required because it appears within the type `UnsyncBoxBody<Bytes, Error>` label:
// `(dyn HttpBody<Data = bytes::Bytes, Error = axum::Error> + std::marker::Send + 'static)`
// cannot be shared between threads safely
#[debug_handler]
async fn share(request: ArcReq<Request>) -> Result<Response, Infallible> {
Response::builder()
.status(StatusCode::INTERNAL_SERVER_ERROR)
.body(Body::empty())
.map_err(|error| unreachable!())
}
let real_service = ServiceBuilder::new()
.layer(TowerMapRequestLayer::new(|req: Request| Arc::new(req)))
.layer(RetryLayer::new(RetryBasedOnClassification::new(
StatusInRangeAsFailures::new_for_client_and_server_errors(),
2
)))
.service_fn(share);
let app = Router::<()>::new()
.route(
"/",
routing::get_service(real_service)
);
axum::serve(
super::create_tokio_tcp_listener(SocketAddr::from(([127,0,0,1], 3001))),
app
)
.await
.unwrap(); Proposaltype BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
pub struct Body(BoxBody); Relaxation of restrictions,such as: type BoxBody = http_body_util::combinators::BoxBody<Bytes, Error>;
pub struct Body(BoxBody); |
Beta Was this translation helpful? Give feedback.
Answered by
davidpdrsn
Dec 3, 2023
Replies: 1 comment 7 replies
-
Please give some context that isn't just 100 lines of code. I don't follow what problem you're having.
|
Beta Was this translation helpful? Give feedback.
7 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thats intentional. You can write you own middleware if you need it to be async.
Arc<Stream>
doesn't help you anyway because actually consuming the body requiresPin<&mut self>
which you cannot get through anArc
.Yeah I just said as much. It's an async stream.
You need to implement retries inside your app and not in a middleware. I'd recommend that anyway. Have a look at https://docs.rs/tryhard/latest/tryhard/index.html