2
2
use crate :: { Error , Result } ;
3
3
use futures:: sink:: SinkExt ;
4
4
use interprocess:: local_socket:: { tokio:: prelude:: * , GenericNamespaced } ;
5
- use sos_core:: { events:: changes_feed, Paths } ;
6
- use std:: { sync:: Arc , time:: Duration } ;
5
+ use sos_core:: {
6
+ events:: { changes_feed, LocalChangeEvent } ,
7
+ Paths ,
8
+ } ;
9
+ use std:: { path:: PathBuf , sync:: Arc , time:: Duration } ;
7
10
use tokio:: {
8
11
select,
9
12
sync:: { watch, Mutex } ,
@@ -59,41 +62,26 @@ impl ChangeProducer {
59
62
loop {
60
63
let paths = paths. clone ( ) ;
61
64
select ! {
65
+ // Explicit cancel notification
62
66
_ = cancel_rx. changed( ) => {
63
67
if * cancel_rx. borrow_and_update( ) {
64
68
break ;
65
69
}
66
70
}
71
+ // Periodically refresh the list of consumer sockets
72
+ // to dispatch change events to
67
73
_ = interval. tick( ) => {
68
74
let active = find_active_sockets( paths) . await ?;
69
75
let mut sockets = sockets. lock( ) . await ;
70
76
* sockets = active;
71
77
}
78
+ // Proxy the change events to the consumer sockets
72
79
event = rx. changed( ) => {
73
80
match event {
74
81
Ok ( _) => {
75
82
let event = rx. borrow_and_update( ) . clone( ) ;
76
83
let sockets = sockets. lock( ) . await ;
77
- for pid in & * sockets {
78
- let ps_name = pid. to_string( ) ;
79
- let name = ps_name. to_ns_name:: <GenericNamespaced >( ) ?;
80
- match LocalSocketStream :: connect( name) . await {
81
- Ok ( socket) => {
82
- let mut writer =
83
- LengthDelimitedCodec :: builder( )
84
- . native_endian( )
85
- . new_write( socket) ;
86
- let message = serde_json:: to_vec( & event) ?;
87
- writer. send( message. into( ) ) . await ?;
88
- }
89
- Err ( e) => {
90
- tracing:: warn!(
91
- pid = %pid,
92
- error = %e,
93
- "changes::producer::connect_error" ) ;
94
- }
95
- }
96
- }
84
+ dispatch_sockets( event, & * sockets) . await ?;
97
85
}
98
86
Err ( _) => { }
99
87
}
@@ -106,8 +94,44 @@ impl ChangeProducer {
106
94
}
107
95
}
108
96
97
+ async fn dispatch_sockets (
98
+ event : LocalChangeEvent ,
99
+ sockets : & [ ( u32 , PathBuf ) ] ,
100
+ ) -> Result < ( ) > {
101
+ for ( pid, file) in sockets {
102
+ let ps_name = pid. to_string ( ) ;
103
+ let name = ps_name. to_ns_name :: < GenericNamespaced > ( ) ?;
104
+ match LocalSocketStream :: connect ( name) . await {
105
+ Ok ( socket) => {
106
+ let mut writer = LengthDelimitedCodec :: builder ( )
107
+ . native_endian ( )
108
+ . new_write ( socket) ;
109
+ let message = serde_json:: to_vec ( & event) ?;
110
+ writer. send ( message. into ( ) ) . await ?;
111
+ }
112
+ Err ( e) => {
113
+ // If we can't connect to the socket
114
+ // then treat the file as stale and
115
+ // remove from disc.
116
+ //
117
+ // This could happen if the consumer
118
+ // process aborted abnormally and
119
+ // wasn't able to cleanly remove the file.
120
+ let _ = std:: fs:: remove_file ( file) ?;
121
+ tracing:: warn!(
122
+ pid = %pid,
123
+ error = %e,
124
+ "changes::producer::connect_error" ) ;
125
+ }
126
+ }
127
+ }
128
+ Ok ( ( ) )
129
+ }
130
+
109
131
/// Find active socket files for a producer.
110
- async fn find_active_sockets ( paths : Arc < Paths > ) -> Result < Vec < u32 > > {
132
+ async fn find_active_sockets (
133
+ paths : Arc < Paths > ,
134
+ ) -> Result < Vec < ( u32 , PathBuf ) > > {
111
135
use std:: fs:: read_dir;
112
136
let mut sockets = Vec :: new ( ) ;
113
137
let socks = paths. documents_dir ( ) . join ( crate :: SOCKS ) ;
@@ -126,7 +150,7 @@ async fn find_active_sockets(paths: Arc<Paths>) -> Result<Vec<u32>> {
126
150
sock_file_pid = %pid,
127
151
"changes::producer::find_active_sockets" ,
128
152
) ;
129
- sockets. push ( pid) ;
153
+ sockets. push ( ( pid, entry . path ( ) . to_owned ( ) ) ) ;
130
154
}
131
155
}
132
156
}
0 commit comments