7
7
//!
8
8
//! This module is responsible for writing new records to MAS' database.
9
9
10
- use std:: { fmt:: Display , net:: IpAddr } ;
10
+ use std:: {
11
+ fmt:: Display ,
12
+ net:: IpAddr ,
13
+ sync:: {
14
+ atomic:: { AtomicU32 , Ordering } ,
15
+ Arc ,
16
+ } ,
17
+ } ;
11
18
12
19
use chrono:: { DateTime , Utc } ;
13
20
use futures_util:: { future:: BoxFuture , FutureExt , TryStreamExt } ;
@@ -44,6 +51,9 @@ pub enum Error {
44
51
#[ error( "inconsistent database: {0}" ) ]
45
52
Inconsistent ( String ) ,
46
53
54
+ #[ error( "bug in syn2mas: write buffers not finished" ) ]
55
+ WriteBuffersNotFinished ,
56
+
47
57
#[ error( "{0}" ) ]
48
58
Multiple ( MultipleErrors ) ,
49
59
}
@@ -185,12 +195,52 @@ impl WriterConnectionPool {
185
195
}
186
196
}
187
197
198
+ /// Small utility to make sure `finish()` is called on all write buffers
199
+ /// before committing to the database.
200
+ #[ derive( Default ) ]
201
+ struct FinishChecker {
202
+ counter : Arc < AtomicU32 > ,
203
+ }
204
+
205
+ struct FinishCheckerHandle {
206
+ counter : Arc < AtomicU32 > ,
207
+ }
208
+
209
+ impl FinishChecker {
210
+ /// Acquire a new handle, for a task that should declare when it has
211
+ /// finished.
212
+ pub fn handle ( & self ) -> FinishCheckerHandle {
213
+ self . counter . fetch_add ( 1 , Ordering :: SeqCst ) ;
214
+ FinishCheckerHandle {
215
+ counter : Arc :: clone ( & self . counter ) ,
216
+ }
217
+ }
218
+
219
+ /// Check that all handles have been declared as finished.
220
+ pub fn check_all_finished ( self ) -> Result < ( ) , Error > {
221
+ if self . counter . load ( Ordering :: SeqCst ) == 0 {
222
+ Ok ( ( ) )
223
+ } else {
224
+ Err ( Error :: WriteBuffersNotFinished )
225
+ }
226
+ }
227
+ }
228
+
229
+ impl FinishCheckerHandle {
230
+ /// Declare that the task this handle represents has been finished.
231
+ pub fn declare_finished ( self ) {
232
+ self . counter . fetch_sub ( 1 , Ordering :: SeqCst ) ;
233
+ }
234
+ }
235
+
188
236
pub struct MasWriter < ' c > {
189
237
conn : LockedMasDatabase < ' c > ,
190
238
writer_pool : WriterConnectionPool ,
191
239
192
240
indices_to_restore : Vec < IndexDescription > ,
193
241
constraints_to_restore : Vec < ConstraintDescription > ,
242
+
243
+ write_buffer_finish_checker : FinishChecker ,
194
244
}
195
245
196
246
pub struct MasNewUser {
@@ -449,6 +499,7 @@ impl<'conn> MasWriter<'conn> {
449
499
writer_pool : WriterConnectionPool :: new ( writer_connections) ,
450
500
indices_to_restore,
451
501
constraints_to_restore,
502
+ write_buffer_finish_checker : FinishChecker :: default ( ) ,
452
503
} )
453
504
}
454
505
@@ -515,6 +566,8 @@ impl<'conn> MasWriter<'conn> {
515
566
/// - If the database connection experiences an error.
516
567
#[ tracing:: instrument( skip_all) ]
517
568
pub async fn finish ( mut self ) -> Result < ( ) , Error > {
569
+ self . write_buffer_finish_checker . check_all_finished ( ) ?;
570
+
518
571
// Commit all writer transactions to the database.
519
572
self . writer_pool
520
573
. finish ( )
@@ -1027,28 +1080,24 @@ type WriteBufferFlusher<'conn, T> =
1027
1080
1028
1081
/// A buffer for writing rows to the MAS database.
1029
1082
/// Generic over the type of rows.
1030
- ///
1031
- /// # Panics
1032
- ///
1033
- /// Panics if dropped before `finish()` has been called.
1034
1083
pub struct MasWriteBuffer < ' conn , T > {
1035
1084
rows : Vec < T > ,
1036
1085
flusher : WriteBufferFlusher < ' conn , T > ,
1037
- finished : bool ,
1086
+ finish_checker_handle : FinishCheckerHandle ,
1038
1087
}
1039
1088
1040
1089
impl < ' conn , T > MasWriteBuffer < ' conn , T > {
1041
- pub fn new ( flusher : WriteBufferFlusher < ' conn , T > ) -> Self {
1090
+ pub fn new ( writer : & MasWriter , flusher : WriteBufferFlusher < ' conn , T > ) -> Self {
1042
1091
MasWriteBuffer {
1043
1092
rows : Vec :: with_capacity ( WRITE_BUFFER_BATCH_SIZE ) ,
1044
1093
flusher,
1045
- finished : false ,
1094
+ finish_checker_handle : writer . write_buffer_finish_checker . handle ( ) ,
1046
1095
}
1047
1096
}
1048
1097
1049
1098
pub async fn finish ( mut self , writer : & mut MasWriter < ' conn > ) -> Result < ( ) , Error > {
1050
- self . finished = true ;
1051
1099
self . flush ( writer) . await ?;
1100
+ self . finish_checker_handle . declare_finished ( ) ;
1052
1101
Ok ( ( ) )
1053
1102
}
1054
1103
@@ -1071,12 +1120,6 @@ impl<'conn, T> MasWriteBuffer<'conn, T> {
1071
1120
}
1072
1121
}
1073
1122
1074
- impl < T > Drop for MasWriteBuffer < ' _ , T > {
1075
- fn drop ( & mut self ) {
1076
- assert ! ( self . finished, "MasWriteBuffer dropped but not finished!" ) ;
1077
- }
1078
- }
1079
-
1080
1123
#[ cfg( test) ]
1081
1124
mod test {
1082
1125
use std:: collections:: { BTreeMap , BTreeSet } ;
0 commit comments