11use std:: {
22 alloc:: { alloc, dealloc, Layout } ,
33 cell:: UnsafeCell ,
4- sync:: atomic:: { AtomicBool , AtomicPtr } ,
4+ sync:: atomic:: AtomicPtr ,
55} ;
66
77use crate :: { pagecache:: * , * } ;
88
9- // This is the most writers in a single IO buffer
10- // that we have space to accommodate in the counter
11- // for writers in the IO buffer header.
12- pub ( in crate :: pagecache) const MAX_WRITERS : Header = 127 ;
13-
14- pub ( in crate :: pagecache) type Header = u64 ;
15-
169macro_rules! io_fail {
1710 ( $self: expr, $e: expr) => {
1811 #[ cfg( feature = "failpoints" ) ]
@@ -69,8 +62,6 @@ pub(crate) struct IoBuf {
6962 pub offset : LogOffset ,
7063 pub lsn : Lsn ,
7164 pub capacity : usize ,
72- maxed : AtomicBool ,
73- linearizer : Mutex < ( ) > ,
7465 stored_max_stable_lsn : Lsn ,
7566}
7667
@@ -119,16 +110,6 @@ impl IoBuf {
119110 }
120111 }
121112
122- // use this for operations on an `IoBuf` that must be
123- // linearized together, and can't fit in the header!
124- pub ( crate ) fn linearized < F , B > ( & self , f : F ) -> B
125- where
126- F : FnOnce ( ) -> B ,
127- {
128- let _l = self . linearizer . lock ( ) ;
129- f ( )
130- }
131-
132113 // This is called upon the initialization of a fresh segment.
133114 // We write a new segment header to the beginning of the buffer
134115 // for assistance during recovery. The caller is responsible
@@ -159,22 +140,12 @@ impl IoBuf {
159140 }
160141
161142 // ensure writes to the buffer land after our header.
162- let last_salt = salt ( last) ;
163- let new_salt = bump_salt ( last_salt) ;
164- let bumped = bump_offset ( new_salt, SEG_HEADER_LEN ) ;
143+ let last_salt = header :: salt ( last) ;
144+ let new_salt = header :: bump_salt ( last_salt) ;
145+ let bumped = header :: bump_offset ( new_salt, SEG_HEADER_LEN ) ;
165146 self . set_header ( bumped) ;
166147 }
167148
168- pub ( crate ) fn set_maxed ( & self , maxed : bool ) {
169- debug_delay ( ) ;
170- self . maxed . store ( maxed, Release ) ;
171- }
172-
173- pub ( crate ) fn get_maxed ( & self ) -> bool {
174- debug_delay ( ) ;
175- self . maxed . load ( Acquire )
176- }
177-
178149 pub ( crate ) fn get_header ( & self ) -> Header {
179150 debug_delay ( ) ;
180151 self . header . load ( Acquire )
@@ -425,8 +396,6 @@ impl IoBufs {
425396 offset : next_lid,
426397 lsn : next_lsn,
427398 capacity : segment_size - base,
428- maxed : AtomicBool :: new ( false ) ,
429- linearizer : Mutex :: new ( ( ) ) ,
430399 stored_max_stable_lsn : -1 ,
431400 } ;
432401
@@ -617,9 +586,9 @@ impl IoBufs {
617586 "created reservation for uninitialized slot" ,
618587 ) ;
619588
620- assert ! ( is_sealed( header) ) ;
589+ assert ! ( header :: is_sealed( header) ) ;
621590
622- let bytes_to_write = offset ( header) ;
591+ let bytes_to_write = header :: offset ( header) ;
623592
624593 trace ! (
625594 "write_to_log log_offset {} lsn {} len {}" ,
@@ -628,7 +597,7 @@ impl IoBufs {
628597 bytes_to_write
629598 ) ;
630599
631- let maxed = iobuf . linearized ( || iobuf . get_maxed ( ) ) ;
600+ let maxed = header :: is_maxed ( header ) ;
632601 let unused_space = capacity - bytes_to_write;
633602 let should_pad = maxed && unused_space >= MAX_MSG_HEADER_LEN ;
634603
@@ -887,18 +856,18 @@ impl IoBufs {
887856pub ( crate ) fn roll_iobuf ( iobufs : & Arc < IoBufs > ) -> Result < usize > {
888857 let iobuf = iobufs. current_iobuf ( ) ;
889858 let header = iobuf. get_header ( ) ;
890- if is_sealed ( header) {
859+ if header :: is_sealed ( header) {
891860 trace ! ( "skipping roll_iobuf due to already-sealed header" ) ;
892861 return Ok ( 0 ) ;
893862 }
894- if offset ( header) == 0 {
863+ if header :: offset ( header) == 0 {
895864 trace ! ( "skipping roll_iobuf due to empty segment" ) ;
896865 } else {
897866 trace ! ( "sealing ioubuf from roll_iobuf" ) ;
898867 maybe_seal_and_write_iobuf ( iobufs, & iobuf, header, false ) ?;
899868 }
900869
901- Ok ( offset ( header) )
870+ Ok ( header :: offset ( header) )
902871}
903872
904873/// Blocks until the specified log sequence number has
@@ -957,7 +926,10 @@ pub(in crate::pagecache) fn make_stable_inner(
957926
958927 let iobuf = iobufs. current_iobuf ( ) ;
959928 let header = iobuf. get_header ( ) ;
960- if offset ( header) == 0 || is_sealed ( header) || iobuf. lsn > lsn {
929+ if header:: offset ( header) == 0
930+ || header:: is_sealed ( header)
931+ || iobuf. lsn > lsn
932+ {
961933 // nothing to write, don't bother sealing
962934 // current IO buffer.
963935 } else {
@@ -1051,7 +1023,7 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
10511023 header : Header ,
10521024 from_reserve : bool ,
10531025) -> Result < ( ) > {
1054- if is_sealed ( header) {
1026+ if header :: is_sealed ( header) {
10551027 // this buffer is already sealed. nothing to do here.
10561028 return Ok ( ( ) ) ;
10571029 }
@@ -1063,39 +1035,27 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
10631035 let capacity = iobuf. capacity ;
10641036 let segment_size = iobufs. config . segment_size ;
10651037
1066- if offset ( header) > capacity {
1038+ if header :: offset ( header) > capacity {
10671039 // a race happened, nothing we can do
10681040 return Ok ( ( ) ) ;
10691041 }
10701042
1071- let sealed = mk_sealed ( header) ;
1072- let res_len = offset ( sealed) ;
1073-
1043+ let res_len = header:: offset ( header) ;
10741044 let maxed = from_reserve || capacity - res_len < MAX_MSG_HEADER_LEN ;
1045+ let sealed = if maxed {
1046+ trace ! ( "setting maxed to true for iobuf with lsn {}" , lsn) ;
1047+ header:: mk_maxed ( header:: mk_sealed ( header) )
1048+ } else {
1049+ header:: mk_sealed ( header)
1050+ } ;
10751051
1076- let worked = iobuf. linearized ( || {
1077- if iobuf. cas_header ( header, sealed) . is_err ( ) {
1078- // cas failed, don't try to continue
1079- return false ;
1080- }
1081-
1082- trace ! ( "sealed iobuf with lsn {}" , lsn) ;
1083-
1084- if maxed {
1085- // NB we linearize this together with sealing
1086- // the header here to guarantee that in write_to_log,
1087- // which may be executing as soon as the seal is set
1088- // by another thread, the thread that calls
1089- // iobuf.get_maxed() is linearized with this one!
1090- trace ! ( "setting maxed to true for iobuf with lsn {}" , lsn) ;
1091- iobuf. set_maxed ( true ) ;
1092- }
1093- true
1094- } ) ;
1052+ let worked = iobuf. cas_header ( header, sealed) . is_ok ( ) ;
10951053 if !worked {
10961054 return Ok ( ( ) ) ;
10971055 }
10981056
1057+ trace ! ( "sealed iobuf with lsn {}" , lsn) ;
1058+
10991059 assert ! (
11001060 capacity + SEG_HEADER_LEN >= res_len,
11011061 "res_len of {} higher than buffer capacity {}" ,
@@ -1166,8 +1126,6 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
11661126 offset : next_offset,
11671127 lsn : next_lsn,
11681128 capacity : segment_size,
1169- maxed : AtomicBool :: new ( false ) ,
1170- linearizer : Mutex :: new ( ( ) ) ,
11711129 stored_max_stable_lsn : -1 ,
11721130 } ;
11731131
@@ -1177,8 +1135,8 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
11771135 } else {
11781136 let new_cap = capacity - res_len;
11791137 assert_ne ! ( new_cap, 0 ) ;
1180- let last_salt = salt ( sealed) ;
1181- let new_salt = bump_salt ( last_salt) ;
1138+ let last_salt = header :: salt ( sealed) ;
1139+ let new_salt = header :: bump_salt ( last_salt) ;
11821140
11831141 IoBuf {
11841142 // reuse the previous io buffer
@@ -1188,8 +1146,6 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
11881146 offset : next_offset,
11891147 lsn : next_lsn,
11901148 capacity : new_cap,
1191- maxed : AtomicBool :: new ( false ) ,
1192- linearizer : Mutex :: new ( ( ) ) ,
11931149 stored_max_stable_lsn : -1 ,
11941150 }
11951151 } ;
@@ -1216,7 +1172,7 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
12161172 drop ( measure_assign_offset) ;
12171173
12181174 // if writers is 0, it's our responsibility to write the buffer.
1219- if n_writers ( sealed) == 0 {
1175+ if header :: n_writers ( sealed) == 0 {
12201176 iobufs. config . global_error ( ) ?;
12211177 trace ! (
12221178 "asynchronously writing iobuf with lsn {} to log from maybe_seal" ,
@@ -1273,53 +1229,9 @@ impl Debug for IoBuf {
12731229 "\n \t IoBuf {{ lid: {}, n_writers: {}, offset: \
12741230 {}, sealed: {} }}",
12751231 self . offset,
1276- n_writers( header) ,
1277- offset( header) ,
1278- is_sealed( header)
1232+ header :: n_writers( header) ,
1233+ header :: offset( header) ,
1234+ header :: is_sealed( header)
12791235 ) )
12801236 }
12811237}
1282-
1283- pub ( crate ) const fn is_sealed ( v : Header ) -> bool {
1284- v & 1 << 31 == 1 << 31
1285- }
1286-
1287- pub ( crate ) const fn mk_sealed ( v : Header ) -> Header {
1288- v | 1 << 31
1289- }
1290-
1291- pub ( crate ) const fn n_writers ( v : Header ) -> Header {
1292- v << 33 >> 57
1293- }
1294-
1295- #[ inline]
1296- pub ( crate ) fn incr_writers ( v : Header ) -> Header {
1297- assert_ne ! ( n_writers( v) , MAX_WRITERS ) ;
1298- v + ( 1 << 24 )
1299- }
1300-
1301- #[ inline]
1302- pub ( crate ) fn decr_writers ( v : Header ) -> Header {
1303- assert_ne ! ( n_writers( v) , 0 ) ;
1304- v - ( 1 << 24 )
1305- }
1306-
1307- #[ inline]
1308- pub ( crate ) fn offset ( v : Header ) -> usize {
1309- let ret = v << 40 >> 40 ;
1310- usize:: try_from ( ret) . unwrap ( )
1311- }
1312-
1313- #[ inline]
1314- pub ( crate ) fn bump_offset ( v : Header , by : usize ) -> Header {
1315- assert_eq ! ( by >> 24 , 0 ) ;
1316- v + ( by as Header )
1317- }
1318-
1319- pub ( crate ) const fn bump_salt ( v : Header ) -> Header {
1320- ( v + ( 1 << 32 ) ) & 0xFFFF_FFFF_0000_0000
1321- }
1322-
1323- pub ( crate ) const fn salt ( v : Header ) -> Header {
1324- v >> 32 << 32
1325- }
0 commit comments