1
1
use std:: future:: Future ;
2
2
use std:: path:: Path ;
3
3
use std:: pin:: Pin ;
4
+ use std:: sync:: Arc ;
5
+ use std:: sync:: atomic:: AtomicU64 ;
4
6
use std:: time:: { Duration , Instant } ;
5
7
6
8
use bytes:: Bytes ;
@@ -22,6 +24,63 @@ async fn time<O>(fut: impl Future<Output = O>) -> (O, Duration) {
22
24
( out, before. elapsed ( ) )
23
25
}
24
26
27
+ struct SyncStats {
28
+ pub prefetched_bytes : AtomicU64 ,
29
+ pub prefetched_bytes_discarded_due_to_new_session : AtomicU64 ,
30
+ pub prefetched_bytes_discarded_due_to_consecutive_handshake : AtomicU64 ,
31
+ pub prefetched_bytes_discarded_due_to_invalid_frame_header : AtomicU64 ,
32
+ pub synced_bytes_discarded_due_to_invalid_frame_header : AtomicU64 ,
33
+ pub prefetched_bytes_used : AtomicU64 ,
34
+ pub synced_bytes_used : AtomicU64 ,
35
+ pub snapshot_bytes : AtomicU64 ,
36
+ }
37
+
38
+ impl SyncStats {
39
+ fn new ( ) -> Self {
40
+ Self {
41
+ prefetched_bytes : AtomicU64 :: new ( 0 ) ,
42
+ prefetched_bytes_discarded_due_to_new_session : AtomicU64 :: new ( 0 ) ,
43
+ prefetched_bytes_discarded_due_to_consecutive_handshake : AtomicU64 :: new ( 0 ) ,
44
+ prefetched_bytes_discarded_due_to_invalid_frame_header : AtomicU64 :: new ( 0 ) ,
45
+ synced_bytes_discarded_due_to_invalid_frame_header : AtomicU64 :: new ( 0 ) ,
46
+ prefetched_bytes_used : AtomicU64 :: new ( 0 ) ,
47
+ synced_bytes_used : AtomicU64 :: new ( 0 ) ,
48
+ snapshot_bytes : AtomicU64 :: new ( 0 ) ,
49
+ }
50
+ }
51
+
52
+ fn add_prefetched_bytes ( & self , bytes : u64 ) {
53
+ self . prefetched_bytes . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
54
+ }
55
+
56
+ fn add_prefetched_bytes_discarded_due_to_new_session ( & self , bytes : u64 ) {
57
+ self . prefetched_bytes_discarded_due_to_new_session . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
58
+ }
59
+
60
+ fn add_prefetched_bytes_discarded_due_to_consecutive_handshake ( & self , bytes : u64 ) {
61
+ self . prefetched_bytes_discarded_due_to_consecutive_handshake . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
62
+ }
63
+
64
+ fn add_prefetched_bytes_discarded_due_to_invalid_frame_header ( & self , bytes : u64 ) {
65
+ self . prefetched_bytes_discarded_due_to_invalid_frame_header . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
66
+ }
67
+
68
+ fn add_synced_bytes_discarded_due_to_invalid_frame_headear ( & self , bytes : u64 ) {
69
+ self . synced_bytes_discarded_due_to_invalid_frame_header . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
70
+ }
71
+
72
+ fn add_prefetched_bytes_used ( & self , bytes : u64 ) {
73
+ self . prefetched_bytes_used . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
74
+ }
75
+
76
+ fn add_synced_bytes_used ( & self , bytes : u64 ) {
77
+ self . synced_bytes_used . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
78
+ }
79
+ fn add_snapshot_bytes ( & self , bytes : u64 ) {
80
+ self . snapshot_bytes . fetch_add ( bytes, std:: sync:: atomic:: Ordering :: SeqCst ) ;
81
+ }
82
+ }
83
+
25
84
/// A remote replicator client, that pulls frames over RPC
26
85
pub struct RemoteClient {
27
86
remote : super :: client:: Client ,
@@ -38,6 +97,7 @@ pub struct RemoteClient {
38
97
frames_latency_count : u128 ,
39
98
snapshot_latency_sum : Duration ,
40
99
snapshot_latency_count : u128 ,
100
+ sync_stats : Arc < SyncStats > ,
41
101
}
42
102
43
103
impl RemoteClient {
@@ -57,6 +117,7 @@ impl RemoteClient {
57
117
frames_latency_count : 0 ,
58
118
snapshot_latency_sum : Duration :: default ( ) ,
59
119
snapshot_latency_count : 0 ,
120
+ sync_stats : Arc :: new ( SyncStats :: new ( ) ) ,
60
121
} )
61
122
}
62
123
@@ -109,6 +170,11 @@ impl RemoteClient {
109
170
110
171
async fn do_handshake_with_prefetch ( & mut self ) -> ( Result < ( ) , Error > , Duration ) {
111
172
tracing:: info!( "Attempting to perform handshake with primary." ) ;
173
+ if let Some ( ( Ok ( frames) , _) ) = & self . prefetched_batch_log_entries {
174
+ // TODO: check if it's ok to just do 4096 * frames.len()
175
+ let bytes = frames. get_ref ( ) . frames . iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
176
+ self . sync_stats . add_prefetched_bytes_discarded_due_to_consecutive_handshake ( bytes) ;
177
+ }
112
178
if self . dirty {
113
179
self . prefetched_batch_log_entries = None ;
114
180
self . meta . reset ( ) ;
@@ -135,10 +201,19 @@ impl RemoteClient {
135
201
} else {
136
202
( hello_fut. await , None )
137
203
} ;
204
+ let mut prefetched_bytes = None ;
205
+ if let Some ( ( Ok ( frames) , _) ) = & frames {
206
+ let bytes = frames. get_ref ( ) . frames . iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
207
+ self . sync_stats . add_prefetched_bytes ( bytes) ;
208
+ prefetched_bytes = Some ( bytes) ;
209
+ }
138
210
self . prefetched_batch_log_entries = if let Ok ( true ) = hello. 0 {
139
211
tracing:: debug!(
140
212
"Frames prefetching failed because of new session token returned by handshake"
141
213
) ;
214
+ if let Some ( bytes) = prefetched_bytes {
215
+ self . sync_stats . add_prefetched_bytes_discarded_due_to_new_session ( bytes) ;
216
+ }
142
217
None
143
218
} else {
144
219
frames
@@ -150,15 +225,31 @@ impl RemoteClient {
150
225
async fn handle_next_frames_response (
151
226
& mut self ,
152
227
frames : Result < Response < Frames > , Status > ,
228
+ prefetched : bool ,
153
229
) -> Result < <Self as ReplicatorClient >:: FrameStream , Error > {
154
230
let frames = frames?. into_inner ( ) . frames ;
231
+ let bytes = frames. iter ( ) . map ( |f| f. data . len ( ) as u64 ) . sum ( ) ;
155
232
156
233
if let Some ( f) = frames. last ( ) {
157
- let header: FrameHeader = FrameHeader :: read_from_prefix ( & f. data )
234
+ let header_result = FrameHeader :: read_from_prefix ( & f. data ) ;
235
+ if header_result. is_none ( ) {
236
+ if prefetched {
237
+ self . sync_stats . add_prefetched_bytes_discarded_due_to_invalid_frame_header ( bytes) ;
238
+ } else {
239
+ self . sync_stats . add_synced_bytes_discarded_due_to_invalid_frame_headear ( bytes) ;
240
+ }
241
+ }
242
+ let header: FrameHeader = header_result
158
243
. ok_or_else ( || Error :: Internal ( "invalid frame header" . into ( ) ) ) ?;
159
244
self . last_received = Some ( header. frame_no . get ( ) ) ;
160
245
}
161
246
247
+ if prefetched {
248
+ self . sync_stats . add_prefetched_bytes_used ( bytes) ;
249
+ } else {
250
+ self . sync_stats . add_synced_bytes_used ( bytes) ;
251
+ }
252
+
162
253
let frames_iter = frames
163
254
. into_iter ( )
164
255
. map ( Ok ) ;
@@ -174,17 +265,18 @@ impl RemoteClient {
174
265
Result < <Self as ReplicatorClient >:: FrameStream , Error > ,
175
266
Duration ,
176
267
) {
177
- let ( frames, time) = match self . prefetched_batch_log_entries . take ( ) {
178
- Some ( ( result, time) ) => ( result, time) ,
268
+ let ( ( frames, time) , prefetched ) = match self . prefetched_batch_log_entries . take ( ) {
269
+ Some ( ( result, time) ) => ( ( result, time) , true ) ,
179
270
None => {
180
271
let req = self . make_request ( LogOffset {
181
272
next_offset : self . next_offset ( ) ,
182
273
wal_flavor : None ,
183
274
} ) ;
184
- time ( self . remote . replication . batch_log_entries ( req) ) . await
275
+ let result = time ( self . remote . replication . batch_log_entries ( req) ) . await ;
276
+ ( result, false )
185
277
}
186
278
} ;
187
- let res = self . handle_next_frames_response ( frames) . await ;
279
+ let res = self . handle_next_frames_response ( frames, prefetched ) . await ;
188
280
( res, time)
189
281
}
190
282
@@ -193,13 +285,18 @@ impl RemoteClient {
193
285
next_offset : self . next_offset ( ) ,
194
286
wal_flavor : None ,
195
287
} ) ;
288
+ let sync_stats = self . sync_stats . clone ( ) ;
196
289
let mut frames = self
197
290
. remote
198
291
. replication
199
292
. snapshot ( req)
200
293
. await ?
201
294
. into_inner ( )
202
295
. map_err ( |e| e. into ( ) )
296
+ . map_ok ( move |f| {
297
+ sync_stats. add_snapshot_bytes ( f. data . len ( ) as u64 ) ;
298
+ f
299
+ } )
203
300
. peekable ( ) ;
204
301
205
302
{
@@ -212,6 +309,7 @@ impl RemoteClient {
212
309
}
213
310
}
214
311
312
+
215
313
Ok ( Box :: pin ( frames) )
216
314
}
217
315
}
0 commit comments