@@ -3,6 +3,7 @@ use chrono::NaiveTime;
33use chrono:: TimeZone ;
44use chrono:: Utc ;
55use futures:: future:: join_all;
6+ use pony:: http:: requests:: ConnUpdateRequest ;
67use rand:: Rng ;
78use std:: collections:: HashMap ;
89use std:: time:: Duration ;
@@ -37,6 +38,7 @@ pub trait Tasks {
3738 async fn collect_conn_stat ( & self ) -> Result < ( ) > ;
3839 async fn cleanup_expired_connections ( & self , interval_sec : u64 , publisher : ZmqPublisher ) ;
3940 async fn cleanup_expired_subscriptions ( & self , interval_sec : u64 , publisher : ZmqPublisher ) ;
41+ async fn restore_subscriptions ( & self , interval_sec : u64 , publisher : ZmqPublisher ) ;
4042}
4143
4244#[ async_trait]
@@ -168,6 +170,77 @@ impl Tasks for Api<HashMap<String, Vec<Node>>, Connection, Subscription> {
168170 }
169171 }
170172
173+ async fn restore_subscriptions ( & self , interval_sec : u64 , publisher : ZmqPublisher ) {
174+ let mut interval = tokio:: time:: interval ( Duration :: from_secs ( interval_sec) ) ;
175+
176+ loop {
177+ interval. tick ( ) . await ;
178+ log:: debug!( "Run restore subscriptions task" ) ;
179+
180+ let expired_subs: Vec < uuid:: Uuid > = {
181+ let mem = self . sync . memory . read ( ) . await ;
182+ mem. subscriptions
183+ . iter ( )
184+ . filter_map ( |( id, sub) | if sub. is_active ( ) { Some ( * id) } else { None } )
185+ . collect ( )
186+ } ;
187+
188+ for sub_id in expired_subs {
189+ let conns_to_restore: Vec < ( uuid:: Uuid , Connection ) > = {
190+ let mem = self . sync . memory . read ( ) . await ;
191+ mem. connections
192+ . get_by_subscription_id ( & sub_id)
193+ . map ( |conns| {
194+ conns
195+ . iter ( )
196+ . filter ( |( _id, c) | c. get_deleted ( ) )
197+ . filter_map ( |( id, c) | Some ( ( * id, c. clone ( ) . into ( ) ) ) )
198+ . collect ( )
199+ } )
200+ . unwrap_or_default ( )
201+ } ;
202+
203+ for ( conn_id, conn) in conns_to_restore {
204+ let msg = conn. as_update_message ( & conn_id) ;
205+ if let Ok ( bytes) = rkyv:: to_bytes :: < _ , 1024 > ( & msg) {
206+ let key = conn
207+ . node_id
208+ . map ( |id| id. to_string ( ) )
209+ . unwrap_or_else ( || conn. get_env ( ) ) ;
210+ let _ = publisher. send_binary ( & key, bytes. as_ref ( ) ) . await ;
211+ }
212+
213+ let conn_upd = ConnUpdateRequest {
214+ env : Some ( conn. get_env ( ) ) ,
215+ is_deleted : Some ( false ) ,
216+ password : conn. get_password ( ) ,
217+ days : None ,
218+ } ;
219+
220+ match SyncOp :: update_conn ( & self . sync , & conn_id, conn_upd) . await {
221+ Ok ( StorageOperationStatus :: Updated ( _) ) => {
222+ log:: info!( "Expired connection {} restored" , conn_id) ;
223+ }
224+ Ok ( status) => {
225+ log:: warn!(
226+ "Connection {} could not be restored: {:?}" ,
227+ conn_id,
228+ status
229+ ) ;
230+ }
231+ Err ( e) => {
232+ log:: error!(
233+ "Failed to restore expired connection {}: {:?}" ,
234+ conn_id,
235+ e
236+ ) ;
237+ }
238+ }
239+ }
240+ }
241+ }
242+ }
243+
171244 async fn periodic_db_sync ( & self , interval_sec : u64 ) {
172245 let base = Duration :: from_secs ( interval_sec) ;
173246
0 commit comments