Skip to content

Commit 6839fa7

Browse files
committed
Remove zstd in anticipation for it being re-added through marble
1 parent f0e46f2 commit 6839fa7

File tree

11 files changed

+34
-144
lines changed

11 files changed

+34
-144
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
default.sled
2+
crash_*
23
*db
34
*conf
45
*snap.*

Cargo.toml

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,7 @@ default = []
3131
# test-only configurations that cause performance to drop significantly.
3232
# It will cause your tests to take much more time, and possibly time out etc...
3333
testing = ["event_log", "lock_free_delays", "light_testing"]
34-
light_testing = ["compression", "failpoints", "backtrace", "memshred"]
35-
compression = ["zstd"]
34+
light_testing = ["failpoints", "backtrace", "memshred"]
3635
lock_free_delays = []
3736
failpoints = []
3837
event_log = []
@@ -41,20 +40,20 @@ no_logs = ["log/max_level_off"]
4140
no_inline = []
4241
pretty_backtrace = ["color-backtrace"]
4342
docs = []
43+
no_zstd = []
4444
miri_optimizations = []
4545
mutex = []
4646
memshred = []
4747

4848
[dependencies]
4949
libc = "0.2.96"
50-
zstd = { version = "0.11.2", optional = true }
5150
crc32fast = "1.2.1"
5251
log = "0.4.14"
53-
parking_lot = "0.12.0"
52+
parking_lot = "0.12.1"
5453
color-backtrace = { version = "0.5.1", optional = true }
5554
num-format = { version = "0.4.0", optional = true }
5655
backtrace = { version = "0.3.60", optional = true }
57-
im = "15.0.0"
56+
im = "15.1.0"
5857

5958
[target.'cfg(any(target_os = "linux", target_os = "macos", target_os="windows"))'.dependencies]
6059
fs2 = "0.4.3"

src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -481,8 +481,8 @@ impl Config {
481481
);
482482
if self.use_compression {
483483
supported!(
484-
cfg!(feature = "compression"),
485-
"the 'compression' feature must be enabled"
484+
!cfg!(feature = "no_zstd"),
485+
"the 'no_zstd' feature is set, but Config.use_compression is also set to true"
486486
);
487487
}
488488
supported!(

src/doc/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@
99
//! * forward and reverse iterators
1010
//! * a monotonic ID generator capable of giving out 75-125+ million unique IDs
1111
//! per second, never double allocating even in the presence of crashes
12-
//! * [zstd](https://github.com/facebook/zstd) compression (use the zstd build
13-
//! feature)
12+
//! * [zstd](https://github.com/facebook/zstd) compression
1413
//! * cpu-scalable lock-free implementation
1514
//! * SSD-optimized log-structured storage
1615
//!

src/metrics.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -320,14 +320,10 @@ impl Metrics {
320320
ret.push_str(&format!("hit ratio: {}%\n", hit_ratio));
321321

322322
ret.push_str(&format!("{}\n", "-".repeat(134)));
323-
ret.push_str("serialization and compression:\n");
323+
ret.push_str("serialization:\n");
324324
ret.push_str(&p(vec![
325325
lat("serialize", &self.serialize),
326326
lat("deserialize", &self.deserialize),
327-
#[cfg(feature = "compression")]
328-
lat("compress", &self.compress),
329-
#[cfg(feature = "compression")]
330-
lat("decompress", &self.decompress),
331327
]));
332328

333329
ret.push_str(&format!("{}\n", "-".repeat(134)));

src/pagecache/heap.rs

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -234,18 +234,10 @@ impl Heap {
234234
}
235235
}
236236

237-
pub fn read(
238-
&self,
239-
heap_id: HeapId,
240-
use_compression: bool,
241-
) -> Result<(MessageKind, Vec<u8>)> {
237+
pub fn read(&self, heap_id: HeapId) -> Result<(MessageKind, Vec<u8>)> {
242238
log::trace!("Heap::read({:?})", heap_id);
243239
let (slab_id, slab_idx, original_lsn) = heap_id.decompose();
244-
self.slabs[slab_id as usize].read(
245-
slab_idx,
246-
original_lsn,
247-
use_compression,
248-
)
240+
self.slabs[slab_id as usize].read(slab_idx, original_lsn)
249241
}
250242

251243
pub fn free(&self, heap_id: HeapId) {
@@ -300,7 +292,6 @@ impl Slab {
300292
&self,
301293
slab_idx: SlabIdx,
302294
original_lsn: Lsn,
303-
use_compression: bool,
304295
) -> Result<(MessageKind, Vec<u8>)> {
305296
let bs = slab_id_to_size(self.slab_id);
306297
let offset = u64::from(slab_idx) * bs;
@@ -332,12 +323,7 @@ impl Slab {
332323
return Err(Error::corruption(None));
333324
}
334325
let buf = heap_buf[13..].to_vec();
335-
let buf2 = if use_compression {
336-
crate::pagecache::decompress(buf)
337-
} else {
338-
buf
339-
};
340-
Ok((MessageKind::from(heap_buf[0]), buf2))
326+
Ok((MessageKind::from(heap_buf[0]), buf))
341327
} else {
342328
log::debug!(
343329
"heap message CRC does not match contents. stored: {} actual: {}",

src/pagecache/logger.rs

Lines changed: 17 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
use std::fs::File;
22

33
use super::{
4-
arr_to_lsn, arr_to_u32, assert_usize, decompress, header, iobuf,
5-
lsn_to_arr, pread_exact, pread_exact_or_eof, roll_iobuf, u32_to_arr, Arc,
6-
BasedBuf, DiskPtr, HeapId, IoBuf, IoBufs, LogKind, LogOffset, Lsn,
7-
MessageKind, Reservation, Serialize, Snapshot, BATCH_MANIFEST_PID,
8-
COUNTER_PID, MAX_MSG_HEADER_LEN, META_PID, SEG_HEADER_LEN,
4+
arr_to_lsn, arr_to_u32, assert_usize, header, iobuf, lsn_to_arr,
5+
pread_exact, pread_exact_or_eof, roll_iobuf, u32_to_arr, Arc, BasedBuf,
6+
DiskPtr, HeapId, IoBuf, IoBufs, LogKind, LogOffset, Lsn, MessageKind,
7+
Reservation, Serialize, Snapshot, BATCH_MANIFEST_PID, COUNTER_PID,
8+
MAX_MSG_HEADER_LEN, META_PID, SEG_HEADER_LEN,
99
};
1010

1111
use crate::*;
@@ -70,18 +70,16 @@ impl Log {
7070
// here because it might not still
7171
// exist in the inline log.
7272
let heap_id = ptr.heap_id().unwrap();
73-
self.config.heap.read(heap_id, self.config.use_compression).map(
74-
|(kind, buf)| {
75-
let header = MessageHeader {
76-
kind,
77-
pid,
78-
segment_number: expected_segment_number,
79-
crc32: 0,
80-
len: 0,
81-
};
82-
LogRead::Heap(header, buf, heap_id, 0)
83-
},
84-
)
73+
self.config.heap.read(heap_id).map(|(kind, buf)| {
74+
let header = MessageHeader {
75+
kind,
76+
pid,
77+
segment_number: expected_segment_number,
78+
crc32: 0,
79+
len: 0,
80+
};
81+
LogRead::Heap(header, buf, heap_id, 0)
82+
})
8583
}
8684
}
8785

@@ -129,43 +127,13 @@ impl Log {
129127
/// completed or aborted later. Useful for maintaining
130128
/// linearizability across CAS operations that may need to
131129
/// persist part of their operation.
132-
#[allow(unused)]
133130
pub fn reserve<T: Serialize + Debug>(
134131
&self,
135132
log_kind: LogKind,
136133
pid: PageId,
137134
item: &T,
138135
guard: &Guard,
139136
) -> Result<Reservation<'_>> {
140-
#[cfg(feature = "compression")]
141-
{
142-
if self.config.use_compression && pid != BATCH_MANIFEST_PID {
143-
use zstd::bulk::compress;
144-
145-
let buf = item.serialize();
146-
147-
#[cfg(feature = "metrics")]
148-
let _measure = Measure::new(&M.compress);
149-
150-
let compressed_buf =
151-
compress(&buf, self.config.compression_factor).unwrap();
152-
153-
let ret = self.reserve_inner(
154-
log_kind,
155-
pid,
156-
&IVec::from(compressed_buf),
157-
None,
158-
guard,
159-
);
160-
161-
if let Err(e) = &ret {
162-
self.iobufs.set_global_error(*e);
163-
}
164-
165-
return ret;
166-
}
167-
}
168-
169137
let ret = self.reserve_inner(log_kind, pid, item, None, guard);
170138

171139
if let Err(e) = &ret {
@@ -860,7 +828,7 @@ pub(crate) fn read_message<R: ReadAt>(
860828
assert_eq!(buf.len(), 16);
861829
let heap_id = HeapId::deserialize(&mut &buf[..]).unwrap();
862830

863-
match config.heap.read(heap_id, config.use_compression) {
831+
match config.heap.read(heap_id) {
864832
Ok((kind, buf2)) => {
865833
assert_eq!(header.kind, kind);
866834
trace!(
@@ -883,10 +851,7 @@ pub(crate) fn read_message<R: ReadAt>(
883851
| MessageKind::Free
884852
| MessageKind::Counter => {
885853
trace!("read a successful inline message");
886-
let buf2 =
887-
if config.use_compression { decompress(buf) } else { buf };
888-
889-
Ok(LogRead::Inline(header, buf2, inline_len))
854+
Ok(LogRead::Inline(header, buf, inline_len))
890855
}
891856
MessageKind::BatchManifest => {
892857
assert_eq!(buf.len(), std::mem::size_of::<Lsn>());

src/pagecache/mod.rs

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -230,32 +230,6 @@ pub(crate) const fn u32_to_arr(number: u32) -> [u8; 4] {
230230
number.to_le_bytes()
231231
}
232232

233-
#[allow(clippy::needless_pass_by_value)]
234-
#[allow(clippy::needless_return)]
235-
pub(in crate::pagecache) fn decompress(in_buf: Vec<u8>) -> Vec<u8> {
236-
#[cfg(feature = "compression")]
237-
{
238-
use zstd::stream::decode_all;
239-
240-
let scootable_in_buf = &mut &*in_buf;
241-
let raw: IVec = IVec::deserialize(scootable_in_buf)
242-
.expect("this had to be serialized with an extra length frame");
243-
#[cfg(feature = "metrics")]
244-
let _measure = Measure::new(&M.decompress);
245-
let out_buf = decode_all(&raw[..]).expect(
246-
"failed to decompress data. \
247-
This is not expected, please open an issue on \
248-
https://github.com/spacejam/sled so we can \
249-
fix this critical issue ASAP. Thank you :)",
250-
);
251-
252-
return out_buf;
253-
}
254-
255-
#[cfg(not(feature = "compression"))]
256-
in_buf
257-
}
258-
259233
#[derive(Debug, Clone, Copy)]
260234
pub struct NodeView<'g>(pub(crate) PageView<'g>);
261235

src/pagecache/reservation.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ impl<'a> Reservation<'a> {
5656

5757
/// Refills the reservation buffer with new data.
5858
/// Must supply a buffer of an identical length
59-
/// as the one initially provided. Don't use this
60-
/// on messages subject to compression etc...
59+
/// as the one initially provided.
6160
///
6261
/// # Panics
6362
///

src/pagecache/snapshot.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
#[cfg(feature = "zstd")]
2-
use zstd::bulk::{compress, decompress};
3-
41
use crate::*;
52

63
use super::{
@@ -530,22 +527,7 @@ fn read_snapshot(config: &RunningConfig) -> Result<Option<Snapshot>> {
530527
return Err(Error::corruption(None));
531528
}
532529

533-
#[cfg(feature = "zstd")]
534-
let bytes = if config.use_compression {
535-
use std::convert::TryInto;
536-
537-
let len_expected: u64 =
538-
u64::from_le_bytes(len_expected_bytes.as_ref().try_into().unwrap());
539-
540-
decompress(&*buf, usize::try_from(len_expected).unwrap()).unwrap()
541-
} else {
542-
buf
543-
};
544-
545-
#[cfg(not(feature = "zstd"))]
546-
let bytes = buf;
547-
548-
Snapshot::deserialize(&mut bytes.as_slice()).map(Some)
530+
Snapshot::deserialize(&mut buf.as_slice()).map(Some)
549531
}
550532

551533
pub(in crate::pagecache) fn write_snapshot(
@@ -554,21 +536,10 @@ pub(in crate::pagecache) fn write_snapshot(
554536
) -> Result<()> {
555537
trace!("writing snapshot {:?}", snapshot);
556538

557-
let raw_bytes = snapshot.serialize();
558-
let decompressed_len = raw_bytes.len();
559-
560-
#[cfg(feature = "zstd")]
561-
let bytes = if config.use_compression {
562-
compress(&*raw_bytes, config.compression_factor).unwrap()
563-
} else {
564-
raw_bytes
565-
};
566-
567-
#[cfg(not(feature = "zstd"))]
568-
let bytes = raw_bytes;
539+
let bytes = snapshot.serialize();
569540

570541
let crc32: [u8; 4] = u32_to_arr(crc32(&bytes));
571-
let len_bytes: [u8; 8] = u64_to_arr(decompressed_len as u64);
542+
let len_bytes: [u8; 8] = u64_to_arr(bytes.len() as u64);
572543

573544
let path_1_suffix =
574545
format!("snap.{:016X}.generating", snapshot.stable_lsn.unwrap_or(0));

0 commit comments

Comments
 (0)