1+ use crate :: servers:: servers_model:: ServerTypeEnum ;
12use crate :: { appState:: DbPooledConnection , GLOBAL_APP_STATE } ;
23use crate :: schema:: mails:: dsl:: * ;
34use crate :: schema:: contacts:: dsl as contacts_dsl;
45use crate :: schema:: bounce_logs:: dsl as bounce_logs_dsl;
6+ use crate :: schema:: campaign_senders:: dsl as campaign_senders_dsl;
7+ use crate :: schema:: campaigns:: dsl as campaigns_dsl;
8+ use crate :: schema:: servers:: { dsl as servers_dsl, server_type} ;
59use diesel:: prelude:: * ;
6- use chrono:: { Utc , DateTime } ;
10+ use chrono:: { Utc , DateTime , Duration } ;
711use crate :: models:: mail:: {
812 Mail , MailWithDetails , NewMail , UpdateMailRequest
913} ;
1014use uuid:: Uuid ;
15+ use diesel:: dsl:: sql;
16+ use diesel:: sql_types:: { Nullable , Text } ;
1117use mockall:: { automock, predicate:: * } ;
1218use async_trait:: async_trait;
1319
@@ -28,7 +34,10 @@ pub trait MailRepository {
2834 async fn delete_mail ( & self , mail_id : String ) -> Result < Mail , diesel:: result:: Error > ;
2935 async fn increment_mail_clicks ( & self , mail_id : String ) -> Result < Mail , diesel:: result:: Error > ;
3036 async fn get_mails_by_contact ( & self , c_id : Uuid ) -> Result < Vec < MailWithDetails > , diesel:: result:: Error > ;
31- async fn get_queued_mails ( & self ) -> Result < Vec < MailWithDetails > , diesel:: result:: Error > ;
37+ async fn get_mails_by_status ( & self , mail_status : & str , is_ascending : bool ) -> Result < Vec < MailWithDetails > , diesel:: result:: Error > ;
38+ async fn get_stale_submitted_mails ( & self ) -> Result < Vec < MailWithDetails > , diesel:: result:: Error > ;
39+ async fn update_mail_attempts ( & self , mail_id : String ) -> Result < Mail , diesel:: result:: Error > ;
40+ async fn udpate_mail_last_try_error ( & self , mail_id : String , error : & str ) -> Result < Mail , diesel:: result:: Error > ;
3241}
3342
3443pub struct MailRepositoryImpl ;
@@ -65,6 +74,7 @@ impl MailRepository for MailRepositoryImpl {
6574 last_error,
6675 contacts_dsl:: email,
6776 bounce_logs_dsl:: reason. nullable ( ) ,
77+ sql :: < Nullable < Text > > ( "NULL" )
6878 ) )
6979 . into_boxed ( ) ;
7080
@@ -144,19 +154,58 @@ impl MailRepository for MailRepositoryImpl {
144154 attempts,
145155 last_error,
146156 contacts_dsl:: email,
147- bounce_logs_dsl:: reason. nullable ( )
157+ bounce_logs_dsl:: reason. nullable ( ) ,
158+ sql :: < Nullable < Text > > ( "NULL" )
148159 ) )
149160 . filter ( contact_id. eq ( c_id) )
150161 . order ( sent_at. desc ( ) )
151162 . load :: < MailWithDetails > ( & mut conn)
152163 }
153164
154- async fn get_queued_mails ( & self ) -> Result < Vec < MailWithDetails > , diesel:: result:: Error > {
165+ async fn get_mails_by_status ( & self , mail_status : & str , is_ascending : bool ) -> Result < Vec < MailWithDetails > , diesel:: result:: Error > {
155166 let mut conn = get_connection_pool ( ) . await ;
167+
168+ let mut query_results = mails
169+ . inner_join ( contacts_dsl:: contacts. on ( contact_id. eq ( contacts_dsl:: id) ) )
170+ . inner_join ( campaigns_dsl:: campaigns. on ( campaign_id. eq ( campaigns_dsl:: id. nullable ( ) ) ) )
171+ . inner_join ( campaign_senders_dsl:: campaign_senders. on ( campaigns_dsl:: campaign_senders. eq ( campaign_senders_dsl:: id. nullable ( ) ) ) )
172+ . left_outer_join ( bounce_logs_dsl:: bounce_logs. on ( id. eq ( bounce_logs_dsl:: mail_id) ) )
173+ . select ( (
174+ id,
175+ mail_message,
176+ template_id,
177+ campaign_id,
178+ server_id,
179+ sent_at,
180+ status,
181+ open,
182+ clicks,
183+ scheduled_at,
184+ attempts,
185+ last_error,
186+ contacts_dsl:: email,
187+ bounce_logs_dsl:: reason. nullable ( ) ,
188+ campaign_senders_dsl:: from_name. nullable ( ) ,
189+ ) )
190+ . filter ( status. eq ( mail_status) )
191+ . into_boxed ( ) ;
192+
193+ if is_ascending {
194+ query_results = query_results. order ( sent_at. asc ( ) ) ;
195+ } else {
196+ query_results = query_results. order ( sent_at. desc ( ) ) ;
197+ }
198+ query_results. load :: < MailWithDetails > ( & mut conn)
199+ }
200+
201+ async fn get_stale_submitted_mails ( & self ) -> Result < Vec < MailWithDetails > , diesel:: result:: Error > {
202+ let mut conn = get_connection_pool ( ) . await ;
203+ let stale_duration_minutes = 30 ; // only send retry those mails that are older than 30 minutes...
156204
157205 mails
158206 . inner_join ( contacts_dsl:: contacts. on ( contact_id. eq ( contacts_dsl:: id) ) )
159207 . left_outer_join ( bounce_logs_dsl:: bounce_logs. on ( id. eq ( bounce_logs_dsl:: mail_id) ) )
208+ . left_outer_join ( servers_dsl:: servers. on ( server_id. eq ( servers_dsl:: id. nullable ( ) ) ) )
160209 . select ( (
161210 id,
162211 mail_message,
@@ -172,9 +221,29 @@ impl MailRepository for MailRepositoryImpl {
172221 last_error,
173222 contacts_dsl:: email,
174223 bounce_logs_dsl:: reason. nullable ( ) ,
224+ sql :: < Nullable < Text > > ( "NULL" )
175225 ) )
176- . filter ( status. eq ( "queued" ) )
177- . order ( sent_at. asc ( ) ) // Order by sent_at ascending here...
226+ . filter ( status. eq ( "submitted" ) )
227+ . filter ( server_type. eq ( ServerTypeEnum :: AWS ) )
228+ . filter ( sent_at. lt ( Utc :: now ( ) . naive_utc ( ) - Duration :: minutes ( stale_duration_minutes) ) )
229+ . filter ( attempts. le ( 3 ) )
230+ . order ( sent_at. asc ( ) )
178231 . load :: < MailWithDetails > ( & mut conn)
179232 }
233+
234+ async fn update_mail_attempts ( & self , mail_id : String ) -> Result < Mail , diesel:: result:: Error > {
235+ let mut conn = get_connection_pool ( ) . await ;
236+
237+ diesel:: update ( mails. find ( mail_id) )
238+ . set ( attempts. eq ( attempts + 1 ) )
239+ . get_result ( & mut conn)
240+ }
241+
242+ async fn udpate_mail_last_try_error ( & self , mail_id : String , error : & str ) -> Result < Mail , diesel:: result:: Error > {
243+ let mut conn = get_connection_pool ( ) . await ;
244+
245+ diesel:: update ( mails. find ( mail_id) )
246+ . set ( last_error. eq ( error) )
247+ . get_result ( & mut conn)
248+ }
180249}
0 commit comments