1
1
use axum:: extract:: { Path , State } ;
2
- use chrono:: { DateTime , Utc } ;
2
+ use chrono:: { DateTime , Duration , Utc } ;
3
3
use schemars:: JsonSchema ;
4
4
use sea_orm:: { entity:: prelude:: * , QueryOrder , QuerySelect } ;
5
5
use serde:: { Deserialize , Serialize } ;
@@ -27,6 +27,7 @@ async fn bulk_recover_failed_messages(
27
27
app : application:: Model ,
28
28
endp : endpoint:: Model ,
29
29
since : DateTime < Utc > ,
30
+ until : DateTime < Utc > ,
30
31
) -> Result < ( ) > {
31
32
const RECOVER_LIMIT : u64 = 10_000 ;
32
33
const BATCH_SIZE : u64 = 100 ;
@@ -36,6 +37,7 @@ async fn bulk_recover_failed_messages(
36
37
loop {
37
38
let mut query = messagedestination:: Entity :: secure_find_by_endpoint ( endp. id . clone ( ) )
38
39
. filter ( messagedestination:: Column :: Id . gte ( MessageEndpointId :: start_id ( since) ) )
40
+ . filter ( messagedestination:: Column :: Id . lt ( MessageEndpointId :: start_id ( until) ) )
39
41
. filter ( messagedestination:: Column :: Status . eq ( MessageStatus :: Fail ) )
40
42
. order_by_asc ( messagedestination:: Column :: Id )
41
43
. limit ( RECOVER_LIMIT ) ;
@@ -99,16 +101,20 @@ pub(super) async fn recover_failed_webhooks(
99
101
} ) : State < AppState > ,
100
102
Path ( ApplicationEndpointPath { endpoint_id, .. } ) : Path < ApplicationEndpointPath > ,
101
103
permissions:: Application { app } : permissions:: Application ,
102
- ValidatedJson ( data ) : ValidatedJson < RecoverIn > ,
104
+ ValidatedJson ( RecoverIn { since , until } ) : ValidatedJson < RecoverIn > ,
103
105
) -> Result < JsonStatus < 202 , RecoverOut > > {
106
+ let until = until. unwrap_or_else ( Utc :: now) ;
107
+
104
108
// Add five minutes so that people can easily just do `now() - two_weeks` without having to worry about clock sync
105
- let timeframe = chrono:: Duration :: days ( 14 ) ;
106
- let timeframe = timeframe + chrono:: Duration :: minutes ( 5 ) ;
109
+ let max_timeframe = Duration :: days ( 14 ) + Duration :: minutes ( 5 ) ;
107
110
108
- if data . since < Utc :: now ( ) - timeframe {
111
+ if since < until - max_timeframe {
109
112
return Err ( HttpError :: unprocessable_entity ( vec ! [ ValidationErrorItem {
110
113
loc: vec![ "body" . to_owned( ) , "since" . to_owned( ) ] ,
111
- msg: "Cannot recover messages more than 14 days old." . to_owned( ) ,
114
+ msg: format!(
115
+ "Cannot recover more than {} days of messages" ,
116
+ max_timeframe. num_days( )
117
+ ) ,
112
118
ty: "value_error" . to_owned( ) ,
113
119
} ] )
114
120
. into ( ) ) ;
@@ -121,9 +127,9 @@ pub(super) async fn recover_failed_webhooks(
121
127
122
128
let db = db. clone ( ) ;
123
129
let queue_tx = queue_tx. clone ( ) ;
124
- tokio:: spawn (
125
- async move { bulk_recover_failed_messages ( db, queue_tx, app, endp, data . since ) . await } ,
126
- ) ;
130
+ tokio:: spawn ( async move {
131
+ bulk_recover_failed_messages ( db, queue_tx, app, endp, since, until ) . await
132
+ } ) ;
127
133
128
134
Ok ( JsonStatus ( RecoverOut {
129
135
id : QueueBackgroundTaskId :: new ( None , None ) ,
0 commit comments