Skip to content

Commit 5e0c0f7

Browse files
authored
execute_plan: don't build temporary vec of rows (#2918)
# Description of Changes Avoid building a temporary `Vec` in `execute_plan` by exposing a list-building interface instead. - The old `fn encode_list` is rewritten in terms of this list-building interface. - The `BsatnRowList` and `BsatnRowListBuilder` types are split into two entirely separate types. The latter now tries to recognize the case where there isn't a known static layout, but where the BSATN lengths happen to be the same for all rows anyways. In those cases, the allocation of `RowSizeHint::RowOffsets` is avoided in favor of just storing the found length in bytes. This is in particular useful for small table updates as statistically, the fewer rows, the more chance of the lengths being all equal. In the case of a single row, the chance is notably 100%. It is also good for the case of when we don't have `RelValue::Row` or `Row::Ptr` but where the underlying table that actually has a static layout. In the future, we might want to avoid these lists in incremental as well. # Benchmarks Benchmark numbers vs. master using `cargo bench --bench subscription -- --baseline subs` on i7-7700K, 64GB RAM: ``` footprint-scan time: [28.731 ms 28.924 ms 29.171 ms] change: [-49.728% -49.006% -48.388%] (p = 0.00 < 0.05) Performance has improved. ``` Performance goes from roughly 56.721 ms to 28.795 ms. # API and ABI breaking changes None # Expected complexity level and risk 2, fairly local change to just subscriptions. # Testing Covered by existing tests.
1 parent e107144 commit 5e0c0f7

File tree

16 files changed

+354
-265
lines changed

16 files changed

+354
-265
lines changed

Cargo.lock

Lines changed: 1 addition & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/client-api-messages/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,8 @@ spacetimedb-sats = { workspace = true, features = ["bytestring"] }
1313

1414
bytes.workspace = true
1515
bytestring.workspace = true
16-
brotli.workspace = true
1716
chrono = { workspace = true, features = ["serde"] }
1817
enum-as-inner.workspace = true
19-
flate2.workspace = true
2018
serde = { workspace = true, features = ["derive"] }
2119
serde_json.workspace = true
2220
serde_with.workspace = true

crates/client-api-messages/src/websocket.rs

Lines changed: 29 additions & 200 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,18 @@ use smallvec::SmallVec;
2626
use spacetimedb_lib::{ConnectionId, Identity, TimeDuration, Timestamp};
2727
use spacetimedb_primitives::TableId;
2828
use spacetimedb_sats::{
29-
bsatn::{self, ToBsatn},
3029
de::{Deserialize, Error},
3130
impl_deserialize, impl_serialize, impl_st,
32-
ser::{serde::SerializeWrapper, Serialize},
31+
ser::Serialize,
3332
AlgebraicType, SpacetimeType,
3433
};
35-
use std::{
36-
io::{self, Read as _, Write as _},
37-
sync::Arc,
38-
};
34+
use std::sync::Arc;
3935

4036
pub const TEXT_PROTOCOL: &str = "v1.json.spacetimedb";
4137
pub const BIN_PROTOCOL: &str = "v1.bsatn.spacetimedb";
4238

4339
pub trait RowListLen {
44-
/// Returns the length of the list.
40+
/// Returns the length, in number of rows, not bytes, of the row list.
4541
fn len(&self) -> usize;
4642
/// Returns whether the list is empty or not.
4743
fn is_empty(&self) -> bool {
@@ -86,16 +82,9 @@ pub trait WebsocketFormat: Sized {
8682
+ Clone
8783
+ Default;
8884

89-
/// Encodes the `elems` to a list in the format and also returns the length of the list.
90-
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64);
91-
9285
/// The type used to encode query updates.
9386
/// This type exists so that some formats, e.g., BSATN, can compress an update.
9487
type QueryUpdate: SpacetimeType + for<'de> Deserialize<'de> + Serialize + Debug + Clone + Send;
95-
96-
/// Convert a `QueryUpdate` into `Self::QueryUpdate`.
97-
/// This allows some formats to e.g., compress the update.
98-
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate;
9988
}
10089

10190
/// Messages sent from the client to the server.
@@ -666,22 +655,6 @@ pub enum CompressableQueryUpdate<F: WebsocketFormat> {
666655
Gzip(Bytes),
667656
}
668657

669-
impl CompressableQueryUpdate<BsatnFormat> {
670-
pub fn maybe_decompress(self) -> QueryUpdate<BsatnFormat> {
671-
match self {
672-
Self::Uncompressed(qu) => qu,
673-
Self::Brotli(bytes) => {
674-
let bytes = brotli_decompress(&bytes).unwrap();
675-
bsatn::from_slice(&bytes).unwrap()
676-
}
677-
Self::Gzip(bytes) => {
678-
let bytes = gzip_decompress(&bytes).unwrap();
679-
bsatn::from_slice(&bytes).unwrap()
680-
}
681-
}
682-
}
683-
}
684-
685658
#[derive(SpacetimeType, Debug, Clone)]
686659
#[sats(crate = spacetimedb_lib)]
687660
pub struct QueryUpdate<F: WebsocketFormat> {
@@ -756,23 +729,8 @@ pub struct JsonFormat;
756729

757730
impl WebsocketFormat for JsonFormat {
758731
type Single = ByteString;
759-
760732
type List = Vec<ByteString>;
761-
762-
fn encode_list<R: ToBsatn + Serialize>(elems: impl Iterator<Item = R>) -> (Self::List, u64) {
763-
let mut count = 0;
764-
let list = elems
765-
.map(|elem| serde_json::to_string(&SerializeWrapper::new(elem)).unwrap().into())
766-
.inspect(|_| count += 1)
767-
.collect();
768-
(list, count)
769-
}
770-
771733
type QueryUpdate = QueryUpdate<Self>;
772-
773-
fn into_query_update(qu: QueryUpdate<Self>, _: Compression) -> Self::QueryUpdate {
774-
qu
775-
}
776734
}
777735

778736
#[derive(Clone, Copy, Default, Debug, SpacetimeType)]
@@ -781,57 +739,8 @@ pub struct BsatnFormat;
781739

782740
impl WebsocketFormat for BsatnFormat {
783741
type Single = Box<[u8]>;
784-
785742
type List = BsatnRowList;
786-
787-
fn encode_list<R: ToBsatn + Serialize>(mut elems: impl Iterator<Item = R>) -> (Self::List, u64) {
788-
// For an empty list, the size of a row is unknown, so use `RowOffsets`.
789-
let Some(first) = elems.next() else {
790-
return (BsatnRowList::row_offsets(), 0);
791-
};
792-
// We have at least one row. Determine the static size from that, if available.
793-
let (mut list, mut scratch) = match first.static_bsatn_size() {
794-
Some(size) => (BsatnRowListBuilder::fixed(size), Vec::with_capacity(size as usize)),
795-
None => (BsatnRowListBuilder::row_offsets(), Vec::new()),
796-
};
797-
// Add the first element and then the rest.
798-
// We assume that the schema of rows yielded by `elems` stays the same,
799-
// so once the size is fixed, it will stay that way.
800-
let mut count = 0;
801-
let mut push = |elem: R| {
802-
elem.to_bsatn_extend(&mut scratch).unwrap();
803-
list.push(&scratch);
804-
scratch.clear();
805-
count += 1;
806-
};
807-
push(first);
808-
for elem in elems {
809-
push(elem);
810-
}
811-
(list.finish(), count)
812-
}
813-
814743
type QueryUpdate = CompressableQueryUpdate<Self>;
815-
816-
fn into_query_update(qu: QueryUpdate<Self>, compression: Compression) -> Self::QueryUpdate {
817-
let qu_len_would_have_been = bsatn::to_len(&qu).unwrap();
818-
819-
match decide_compression(qu_len_would_have_been, compression) {
820-
Compression::None => CompressableQueryUpdate::Uncompressed(qu),
821-
Compression::Brotli => {
822-
let bytes = bsatn::to_vec(&qu).unwrap();
823-
let mut out = Vec::new();
824-
brotli_compress(&bytes, &mut out);
825-
CompressableQueryUpdate::Brotli(out.into())
826-
}
827-
Compression::Gzip => {
828-
let bytes = bsatn::to_vec(&qu).unwrap();
829-
let mut out = Vec::new();
830-
gzip_compress(&bytes, &mut out);
831-
CompressableQueryUpdate::Gzip(out.into())
832-
}
833-
}
834-
}
835744
}
836745

837746
/// A specification of either a desired or decided compression algorithm.
@@ -846,69 +755,28 @@ pub enum Compression {
846755
Gzip,
847756
}
848757

849-
pub fn decide_compression(len: usize, compression: Compression) -> Compression {
850-
/// The threshold beyond which we start to compress messages.
851-
/// 1KiB was chosen without measurement.
852-
/// TODO(perf): measure!
853-
const COMPRESS_THRESHOLD: usize = 1024;
854-
855-
if len > COMPRESS_THRESHOLD {
856-
compression
857-
} else {
858-
Compression::None
859-
}
860-
}
861-
862-
pub fn brotli_compress(bytes: &[u8], out: &mut impl io::Write) {
863-
// We are optimizing for compression speed,
864-
// so we choose the lowest (fastest) level of compression.
865-
// Experiments on internal workloads have shown compression ratios between 7:1 and 10:1
866-
// for large `SubscriptionUpdate` messages at this level.
867-
const COMPRESSION_LEVEL: i32 = 1;
868-
869-
let params = brotli::enc::BrotliEncoderParams {
870-
quality: COMPRESSION_LEVEL,
871-
..<_>::default()
872-
};
873-
let reader = &mut &bytes[..];
874-
brotli::BrotliCompress(reader, out, &params).expect("should be able to BrotliCompress");
875-
}
876-
877-
pub fn brotli_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
878-
let mut decompressed = Vec::new();
879-
brotli::BrotliDecompress(&mut &bytes[..], &mut decompressed)?;
880-
Ok(decompressed)
881-
}
882-
883-
pub fn gzip_compress(bytes: &[u8], out: &mut impl io::Write) {
884-
let mut encoder = flate2::write::GzEncoder::new(out, flate2::Compression::fast());
885-
encoder.write_all(bytes).unwrap();
886-
encoder.finish().expect("should be able to gzip compress `bytes`");
887-
}
888-
889-
pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
890-
let mut decompressed = Vec::new();
891-
let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
892-
Ok(decompressed)
893-
}
894-
895-
type RowSize = u16;
896-
type RowOffset = u64;
758+
pub type RowSize = u16;
759+
pub type RowOffset = u64;
897760

898761
/// A packed list of BSATN-encoded rows.
899-
#[derive(SpacetimeType, Debug, Clone)]
762+
#[derive(SpacetimeType, Debug, Clone, Default)]
900763
#[sats(crate = spacetimedb_lib)]
901-
pub struct BsatnRowList<B = Bytes, I = Arc<[RowOffset]>> {
764+
pub struct BsatnRowList {
902765
/// A size hint about `rows_data`
903766
/// intended to facilitate parallel decode purposes on large initial updates.
904-
size_hint: RowSizeHint<I>,
767+
size_hint: RowSizeHint,
905768
/// The flattened byte array for a list of rows.
906-
rows_data: B,
769+
rows_data: Bytes,
907770
}
908771

909-
impl Default for BsatnRowList {
910-
fn default() -> Self {
911-
Self::row_offsets()
772+
impl BsatnRowList {
773+
/// Returns a new row list where `rows_data` is the flattened byte array
774+
/// containing the BSATN of each row, without any markers for where a row begins and end.
775+
///
776+
/// The `size_hint` encodes the boundaries of each row in `rows_data`.
777+
/// See [`RowSizeHint`] for more details on the encoding.
778+
pub fn new(size_hint: RowSizeHint, rows_data: Bytes) -> Self {
779+
Self { size_hint, rows_data }
912780
}
913781
}
914782

@@ -917,17 +785,23 @@ impl Default for BsatnRowList {
917785
/// The use-case for this is clients who are bandwidth limited and where every byte counts.
918786
#[derive(SpacetimeType, Debug, Clone)]
919787
#[sats(crate = spacetimedb_lib)]
920-
pub enum RowSizeHint<I> {
788+
pub enum RowSizeHint {
921789
/// Each row in `rows_data` is of the same fixed size as specified here.
922790
FixedSize(RowSize),
923791
/// The offsets into `rows_data` defining the boundaries of each row.
924792
/// Only stores the offset to the start of each row.
925793
/// The ends of each row is inferred from the start of the next row, or `rows_data.len()`.
926794
/// The behavior of this is identical to that of `PackedStr`.
927-
RowOffsets(I),
795+
RowOffsets(Arc<[RowOffset]>),
796+
}
797+
798+
impl Default for RowSizeHint {
799+
fn default() -> Self {
800+
Self::RowOffsets([].into())
801+
}
928802
}
929803

930-
impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
804+
impl RowSizeHint {
931805
fn index_to_range(&self, index: usize, data_end: usize) -> Option<Range<usize>> {
932806
match self {
933807
Self::FixedSize(size) => {
@@ -952,37 +826,17 @@ impl<I: AsRef<[RowOffset]>> RowSizeHint<I> {
952826
}
953827
}
954828

955-
impl<B: Default, I> BsatnRowList<B, I> {
956-
pub fn fixed(row_size: RowSize) -> Self {
957-
Self {
958-
size_hint: RowSizeHint::FixedSize(row_size),
959-
rows_data: <_>::default(),
960-
}
961-
}
962-
963-
/// Returns a new empty list using indices
964-
pub fn row_offsets() -> Self
965-
where
966-
I: From<[RowOffset; 0]>,
967-
{
968-
Self {
969-
size_hint: RowSizeHint::RowOffsets([].into()),
970-
rows_data: <_>::default(),
971-
}
972-
}
973-
}
974-
975-
impl<B: AsRef<[u8]>, I: AsRef<[RowOffset]>> RowListLen for BsatnRowList<B, I> {
976-
/// Returns the length of the row list.
829+
impl RowListLen for BsatnRowList {
977830
fn len(&self) -> usize {
978831
match &self.size_hint {
832+
// `size != 0` is always the case for `FixedSize`.
979833
RowSizeHint::FixedSize(size) => self.rows_data.as_ref().len() / *size as usize,
980834
RowSizeHint::RowOffsets(offsets) => offsets.as_ref().len(),
981835
}
982836
}
983837
}
984838

985-
impl<B: AsRef<[u8]>, I> ByteListLen for BsatnRowList<B, I> {
839+
impl ByteListLen for BsatnRowList {
986840
/// Returns the uncompressed size of the list in bytes
987841
fn num_bytes(&self) -> usize {
988842
self.rows_data.as_ref().len()
@@ -1020,28 +874,3 @@ impl Iterator for BsatnRowListIter<'_> {
1020874
self.list.get(index)
1021875
}
1022876
}
1023-
1024-
/// A [`BsatnRowList`] that can be added to.
1025-
pub type BsatnRowListBuilder = BsatnRowList<Vec<u8>, Vec<RowOffset>>;
1026-
1027-
impl BsatnRowListBuilder {
1028-
/// Adds `row`, BSATN-encoded to this list.
1029-
#[inline]
1030-
pub fn push(&mut self, row: &[u8]) {
1031-
if let RowSizeHint::RowOffsets(offsets) = &mut self.size_hint {
1032-
offsets.push(self.rows_data.len() as u64);
1033-
}
1034-
self.rows_data.extend_from_slice(row);
1035-
}
1036-
1037-
/// Finish the in flight list, throwing away the capability to mutate.
1038-
pub fn finish(self) -> BsatnRowList {
1039-
let Self { size_hint, rows_data } = self;
1040-
let rows_data = rows_data.into();
1041-
let size_hint = match size_hint {
1042-
RowSizeHint::FixedSize(fs) => RowSizeHint::FixedSize(fs),
1043-
RowSizeHint::RowOffsets(ro) => RowSizeHint::RowOffsets(ro.into()),
1044-
};
1045-
BsatnRowList { size_hint, rows_data }
1046-
}
1047-
}

crates/core/src/client/messages.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use super::{ClientConfig, DataMessage, Protocol};
22
use crate::host::module_host::{EventStatus, ModuleEvent};
33
use crate::host::ArgsTuple;
44
use crate::messages::websocket as ws;
5+
use crate::subscription::websocket_building::{brotli_compress, decide_compression, gzip_compress};
56
use bytes::{BufMut, Bytes, BytesMut};
67
use bytestring::ByteString;
78
use derive_more::From;
@@ -148,10 +149,10 @@ pub fn serialize(
148149
});
149150

150151
// Conditionally compress the message.
151-
let (in_use, msg_bytes) = match ws::decide_compression(srv_msg.len(), config.compression) {
152+
let (in_use, msg_bytes) = match decide_compression(srv_msg.len(), config.compression) {
152153
Compression::None => buffer.uncompressed(),
153-
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, ws::brotli_compress),
154-
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, ws::gzip_compress),
154+
Compression::Brotli => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_BROTLI, brotli_compress),
155+
Compression::Gzip => buffer.compress_with_tag(SERVER_MSG_COMPRESSION_TAG_GZIP, gzip_compress),
155156
};
156157
(in_use, msg_bytes.into())
157158
}

0 commit comments

Comments
 (0)