Skip to content

Commit c7fcd51

Browse files
authored
Merge pull request #1190 from spacejam/tyler_inline_maxed_header
inline maxed header, remove unnecessarily complex linearizer
2 parents d513996 + 9cdbd09 commit c7fcd51

File tree

6 files changed

+122
-148
lines changed

6 files changed

+122
-148
lines changed

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,14 @@ mutex = []
4444
crossbeam-epoch = "0.9.0"
4545
crossbeam-utils = "0.8.0"
4646
fxhash = "0.2.1"
47-
libc = "0.2.79"
47+
libc = "0.2.80"
4848
zstd = { version = "0.5.3", optional = true }
4949
crc32fast = "1.2.1"
5050
log = "0.4.11"
5151
parking_lot = "0.11.0"
5252
color-backtrace = { version = "0.4.2", optional = true }
5353
rio = { version = "0.9.4", optional = true }
54-
backtrace = { version = "0.3.53", optional = true }
54+
backtrace = { version = "0.3.54", optional = true }
5555

5656
[target.'cfg(any(target_os = "linux", target_os = "macos", target_os="windows"))'.dependencies]
5757
fs2 = "0.4.3"

src/pagecache/header.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use super::*;
2+
3+
// This is the most writers in a single IO buffer
4+
// that we have space to accommodate in the counter
5+
// for writers in the IO buffer header.
6+
pub(in crate::pagecache) const MAX_WRITERS: Header = 127;
7+
8+
pub(in crate::pagecache) type Header = u64;
9+
10+
// salt: 31 bits
11+
// maxed: 1 bit
12+
// seal: 1 bit
13+
// n_writers: 7 bits
14+
// offset: 24 bits
15+
16+
pub(crate) const fn is_maxed(v: Header) -> bool {
17+
v & (1 << 32) == 1 << 32
18+
}
19+
20+
pub(crate) const fn mk_maxed(v: Header) -> Header {
21+
v | (1 << 32)
22+
}
23+
24+
pub(crate) const fn is_sealed(v: Header) -> bool {
25+
v & (1 << 31) == 1 << 31
26+
}
27+
28+
pub(crate) const fn mk_sealed(v: Header) -> Header {
29+
v | (1 << 31)
30+
}
31+
32+
pub(crate) const fn n_writers(v: Header) -> Header {
33+
(v << 33) >> 57
34+
}
35+
36+
#[inline]
37+
pub(crate) fn incr_writers(v: Header) -> Header {
38+
assert_ne!(n_writers(v), MAX_WRITERS);
39+
v + (1 << 24)
40+
}
41+
42+
#[inline]
43+
pub(crate) fn decr_writers(v: Header) -> Header {
44+
assert_ne!(n_writers(v), 0);
45+
v - (1 << 24)
46+
}
47+
48+
#[inline]
49+
pub(crate) fn offset(v: Header) -> usize {
50+
let ret = (v << 40) >> 40;
51+
usize::try_from(ret).unwrap()
52+
}
53+
54+
#[inline]
55+
pub(crate) fn bump_offset(v: Header, by: usize) -> Header {
56+
assert_eq!(by >> 24, 0);
57+
v + (by as Header)
58+
}
59+
60+
pub(crate) const fn bump_salt(v: Header) -> Header {
61+
(v + (1 << 33)) & 0xFFFF_FFFD_0000_0000
62+
}
63+
64+
pub(crate) const fn salt(v: Header) -> Header {
65+
(v >> 33) << 33
66+
}

src/pagecache/iobuf.rs

Lines changed: 32 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
11
use std::{
22
alloc::{alloc, dealloc, Layout},
33
cell::UnsafeCell,
4-
sync::atomic::{AtomicBool, AtomicPtr},
4+
sync::atomic::AtomicPtr,
55
};
66

77
use crate::{pagecache::*, *};
88

9-
// This is the most writers in a single IO buffer
10-
// that we have space to accommodate in the counter
11-
// for writers in the IO buffer header.
12-
pub(in crate::pagecache) const MAX_WRITERS: Header = 127;
13-
14-
pub(in crate::pagecache) type Header = u64;
15-
169
macro_rules! io_fail {
1710
($self:expr, $e:expr) => {
1811
#[cfg(feature = "failpoints")]
@@ -69,8 +62,6 @@ pub(crate) struct IoBuf {
6962
pub offset: LogOffset,
7063
pub lsn: Lsn,
7164
pub capacity: usize,
72-
maxed: AtomicBool,
73-
linearizer: Mutex<()>,
7465
stored_max_stable_lsn: Lsn,
7566
}
7667

@@ -119,16 +110,6 @@ impl IoBuf {
119110
}
120111
}
121112

122-
// use this for operations on an `IoBuf` that must be
123-
// linearized together, and can't fit in the header!
124-
pub(crate) fn linearized<F, B>(&self, f: F) -> B
125-
where
126-
F: FnOnce() -> B,
127-
{
128-
let _l = self.linearizer.lock();
129-
f()
130-
}
131-
132113
// This is called upon the initialization of a fresh segment.
133114
// We write a new segment header to the beginning of the buffer
134115
// for assistance during recovery. The caller is responsible
@@ -159,22 +140,12 @@ impl IoBuf {
159140
}
160141

161142
// ensure writes to the buffer land after our header.
162-
let last_salt = salt(last);
163-
let new_salt = bump_salt(last_salt);
164-
let bumped = bump_offset(new_salt, SEG_HEADER_LEN);
143+
let last_salt = header::salt(last);
144+
let new_salt = header::bump_salt(last_salt);
145+
let bumped = header::bump_offset(new_salt, SEG_HEADER_LEN);
165146
self.set_header(bumped);
166147
}
167148

168-
pub(crate) fn set_maxed(&self, maxed: bool) {
169-
debug_delay();
170-
self.maxed.store(maxed, Release);
171-
}
172-
173-
pub(crate) fn get_maxed(&self) -> bool {
174-
debug_delay();
175-
self.maxed.load(Acquire)
176-
}
177-
178149
pub(crate) fn get_header(&self) -> Header {
179150
debug_delay();
180151
self.header.load(Acquire)
@@ -425,8 +396,6 @@ impl IoBufs {
425396
offset: next_lid,
426397
lsn: next_lsn,
427398
capacity: segment_size - base,
428-
maxed: AtomicBool::new(false),
429-
linearizer: Mutex::new(()),
430399
stored_max_stable_lsn: -1,
431400
};
432401

@@ -617,9 +586,9 @@ impl IoBufs {
617586
"created reservation for uninitialized slot",
618587
);
619588

620-
assert!(is_sealed(header));
589+
assert!(header::is_sealed(header));
621590

622-
let bytes_to_write = offset(header);
591+
let bytes_to_write = header::offset(header);
623592

624593
trace!(
625594
"write_to_log log_offset {} lsn {} len {}",
@@ -628,7 +597,7 @@ impl IoBufs {
628597
bytes_to_write
629598
);
630599

631-
let maxed = iobuf.linearized(|| iobuf.get_maxed());
600+
let maxed = header::is_maxed(header);
632601
let unused_space = capacity - bytes_to_write;
633602
let should_pad = maxed && unused_space >= MAX_MSG_HEADER_LEN;
634603

@@ -887,18 +856,18 @@ impl IoBufs {
887856
pub(crate) fn roll_iobuf(iobufs: &Arc<IoBufs>) -> Result<usize> {
888857
let iobuf = iobufs.current_iobuf();
889858
let header = iobuf.get_header();
890-
if is_sealed(header) {
859+
if header::is_sealed(header) {
891860
trace!("skipping roll_iobuf due to already-sealed header");
892861
return Ok(0);
893862
}
894-
if offset(header) == 0 {
863+
if header::offset(header) == 0 {
895864
trace!("skipping roll_iobuf due to empty segment");
896865
} else {
897866
trace!("sealing ioubuf from roll_iobuf");
898867
maybe_seal_and_write_iobuf(iobufs, &iobuf, header, false)?;
899868
}
900869

901-
Ok(offset(header))
870+
Ok(header::offset(header))
902871
}
903872

904873
/// Blocks until the specified log sequence number has
@@ -957,7 +926,10 @@ pub(in crate::pagecache) fn make_stable_inner(
957926

958927
let iobuf = iobufs.current_iobuf();
959928
let header = iobuf.get_header();
960-
if offset(header) == 0 || is_sealed(header) || iobuf.lsn > lsn {
929+
if header::offset(header) == 0
930+
|| header::is_sealed(header)
931+
|| iobuf.lsn > lsn
932+
{
961933
// nothing to write, don't bother sealing
962934
// current IO buffer.
963935
} else {
@@ -1051,7 +1023,7 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
10511023
header: Header,
10521024
from_reserve: bool,
10531025
) -> Result<()> {
1054-
if is_sealed(header) {
1026+
if header::is_sealed(header) {
10551027
// this buffer is already sealed. nothing to do here.
10561028
return Ok(());
10571029
}
@@ -1063,39 +1035,27 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
10631035
let capacity = iobuf.capacity;
10641036
let segment_size = iobufs.config.segment_size;
10651037

1066-
if offset(header) > capacity {
1038+
if header::offset(header) > capacity {
10671039
// a race happened, nothing we can do
10681040
return Ok(());
10691041
}
10701042

1071-
let sealed = mk_sealed(header);
1072-
let res_len = offset(sealed);
1073-
1043+
let res_len = header::offset(header);
10741044
let maxed = from_reserve || capacity - res_len < MAX_MSG_HEADER_LEN;
1045+
let sealed = if maxed {
1046+
trace!("setting maxed to true for iobuf with lsn {}", lsn);
1047+
header::mk_maxed(header::mk_sealed(header))
1048+
} else {
1049+
header::mk_sealed(header)
1050+
};
10751051

1076-
let worked = iobuf.linearized(|| {
1077-
if iobuf.cas_header(header, sealed).is_err() {
1078-
// cas failed, don't try to continue
1079-
return false;
1080-
}
1081-
1082-
trace!("sealed iobuf with lsn {}", lsn);
1083-
1084-
if maxed {
1085-
// NB we linearize this together with sealing
1086-
// the header here to guarantee that in write_to_log,
1087-
// which may be executing as soon as the seal is set
1088-
// by another thread, the thread that calls
1089-
// iobuf.get_maxed() is linearized with this one!
1090-
trace!("setting maxed to true for iobuf with lsn {}", lsn);
1091-
iobuf.set_maxed(true);
1092-
}
1093-
true
1094-
});
1052+
let worked = iobuf.cas_header(header, sealed).is_ok();
10951053
if !worked {
10961054
return Ok(());
10971055
}
10981056

1057+
trace!("sealed iobuf with lsn {}", lsn);
1058+
10991059
assert!(
11001060
capacity + SEG_HEADER_LEN >= res_len,
11011061
"res_len of {} higher than buffer capacity {}",
@@ -1166,8 +1126,6 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
11661126
offset: next_offset,
11671127
lsn: next_lsn,
11681128
capacity: segment_size,
1169-
maxed: AtomicBool::new(false),
1170-
linearizer: Mutex::new(()),
11711129
stored_max_stable_lsn: -1,
11721130
};
11731131

@@ -1177,8 +1135,8 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
11771135
} else {
11781136
let new_cap = capacity - res_len;
11791137
assert_ne!(new_cap, 0);
1180-
let last_salt = salt(sealed);
1181-
let new_salt = bump_salt(last_salt);
1138+
let last_salt = header::salt(sealed);
1139+
let new_salt = header::bump_salt(last_salt);
11821140

11831141
IoBuf {
11841142
// reuse the previous io buffer
@@ -1188,8 +1146,6 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
11881146
offset: next_offset,
11891147
lsn: next_lsn,
11901148
capacity: new_cap,
1191-
maxed: AtomicBool::new(false),
1192-
linearizer: Mutex::new(()),
11931149
stored_max_stable_lsn: -1,
11941150
}
11951151
};
@@ -1216,7 +1172,7 @@ pub(in crate::pagecache) fn maybe_seal_and_write_iobuf(
12161172
drop(measure_assign_offset);
12171173

12181174
// if writers is 0, it's our responsibility to write the buffer.
1219-
if n_writers(sealed) == 0 {
1175+
if header::n_writers(sealed) == 0 {
12201176
iobufs.config.global_error()?;
12211177
trace!(
12221178
"asynchronously writing iobuf with lsn {} to log from maybe_seal",
@@ -1273,53 +1229,9 @@ impl Debug for IoBuf {
12731229
"\n\tIoBuf {{ lid: {}, n_writers: {}, offset: \
12741230
{}, sealed: {} }}",
12751231
self.offset,
1276-
n_writers(header),
1277-
offset(header),
1278-
is_sealed(header)
1232+
header::n_writers(header),
1233+
header::offset(header),
1234+
header::is_sealed(header)
12791235
))
12801236
}
12811237
}
1282-
1283-
pub(crate) const fn is_sealed(v: Header) -> bool {
1284-
v & 1 << 31 == 1 << 31
1285-
}
1286-
1287-
pub(crate) const fn mk_sealed(v: Header) -> Header {
1288-
v | 1 << 31
1289-
}
1290-
1291-
pub(crate) const fn n_writers(v: Header) -> Header {
1292-
v << 33 >> 57
1293-
}
1294-
1295-
#[inline]
1296-
pub(crate) fn incr_writers(v: Header) -> Header {
1297-
assert_ne!(n_writers(v), MAX_WRITERS);
1298-
v + (1 << 24)
1299-
}
1300-
1301-
#[inline]
1302-
pub(crate) fn decr_writers(v: Header) -> Header {
1303-
assert_ne!(n_writers(v), 0);
1304-
v - (1 << 24)
1305-
}
1306-
1307-
#[inline]
1308-
pub(crate) fn offset(v: Header) -> usize {
1309-
let ret = v << 40 >> 40;
1310-
usize::try_from(ret).unwrap()
1311-
}
1312-
1313-
#[inline]
1314-
pub(crate) fn bump_offset(v: Header, by: usize) -> Header {
1315-
assert_eq!(by >> 24, 0);
1316-
v + (by as Header)
1317-
}
1318-
1319-
pub(crate) const fn bump_salt(v: Header) -> Header {
1320-
(v + (1 << 32)) & 0xFFFF_FFFF_0000_0000
1321-
}
1322-
1323-
pub(crate) const fn salt(v: Header) -> Header {
1324-
v >> 32 << 32
1325-
}

0 commit comments

Comments
 (0)