Skip to content

Commit 57dd267

Browse files
goffrieConvex, Inc.
authored andcommitted
Replace IntervalMap with a BBST-based implementation (#39326)
This replaces the naive IntervalMap implementation (which queries each subscription against each index key separately) with a treap-based data structure that supports querying an index key in O((k+1) log n) time, where k is the number of _matching_ subscriptions and n is the total number of subscribed intervals. The net effect is that long-lived subscriptions get much cheaper. GitOrigin-RevId: f39ee382b2ca4441b71b64715c52d27a2556bcc5
1 parent 90e1af3 commit 57dd267

File tree

11 files changed

+706
-65
lines changed

11 files changed

+706
-65
lines changed

Cargo.lock

Lines changed: 13 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ encoding_rs = "0.8.32"
5959
enum-iterator = "2.1.0"
6060
event-listener = "5.3.1"
6161
fastrace = { git = "https://github.com/fast/fastrace", rev = "eacc377a8b3435e04f1d7a68085ce6eedb0d1d4a", version = "0.7", features = [ "enable" ] }
62+
fastrand = "2.3.0"
6263
flate2 = { version = "1", features = [ "zlib-ng" ] }
6364
flexbuffers = "25"
6465
float_next_after = "1.0.0"

crates/common/src/interval/bounds.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,12 +60,18 @@ impl End {
6060
}
6161
}
6262

63+
#[inline]
6364
pub const fn as_ref(&self) -> EndRef<'_> {
6465
match self {
6566
End::Excluded(binary_key) => EndRef::Excluded(binary_key.as_slice()),
6667
End::Unbounded => EndRef::Unbounded,
6768
}
6869
}
70+
71+
#[inline]
72+
pub fn greater_than(&self, key: &[u8]) -> bool {
73+
self.as_ref().greater_than(key)
74+
}
6975
}
7076

7177
impl HeapSize for End {
@@ -90,6 +96,14 @@ impl EndRef<'_> {
9096
EndRef::Unbounded => End::Unbounded,
9197
}
9298
}
99+
100+
#[inline]
101+
pub fn greater_than(&self, key: &[u8]) -> bool {
102+
match *self {
103+
EndRef::Excluded(end) => key < end,
104+
EndRef::Unbounded => true,
105+
}
106+
}
93107
}
94108

95109
#[cfg(test)]

crates/database/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ futures-async-stream = { workspace = true }
5050
governor = { workspace = true }
5151
imbl = { workspace = true }
5252
indexing = { path = "../indexing" }
53+
interval_map = { path = "../interval_map" }
5354
itertools = { workspace = true }
5455
keybroker = { path = "../keybroker" }
5556
maplit = { workspace = true }

crates/database/src/subscription.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ use futures::{
5151
FutureExt as _,
5252
StreamExt as _,
5353
};
54-
use indexing::interval::IntervalMap;
54+
use interval_map::IntervalMap;
5555
use parking_lot::Mutex;
5656
use prometheus::VMHistogram;
5757
use search::query::TextSearchSubscriptions;
@@ -390,7 +390,7 @@ impl SubscriptionManager {
390390
.subscriptions
391391
.indexed
392392
.iter()
393-
.map(|(key, (_fields, range_map))| (key, range_map.len()))
393+
.map(|(key, (_fields, range_map))| (key, range_map.subscriber_len()))
394394
.collect();
395395
let total_subscribers: usize = subscribers_by_index.values().sum();
396396
let search_len = self.subscriptions.search.filter_len();
@@ -471,10 +471,7 @@ impl SubscriptionManager {
471471
metrics::log_missing_index_key_subscriptions();
472472
continue;
473473
};
474-
475-
for subscriber_id in range_map.query(index_key) {
476-
notify(subscriber_id);
477-
}
474+
range_map.query(index_key, &mut *notify);
478475
}
479476
}
480477

@@ -623,7 +620,8 @@ impl Subscription {
623620

624621
/// Tracks every subscriber for a given read-set.
625622
struct SubscriptionMap {
626-
indexed: BTreeMap<TabletIndexName, (IndexedFields, IntervalMap<SubscriberId>)>,
623+
// TODO: remove nesting, merge all IntervalMaps into one big data structure
624+
indexed: BTreeMap<TabletIndexName, (IndexedFields, IntervalMap)>,
627625
search: TextSearchSubscriptions,
628626
}
629627

@@ -641,7 +639,9 @@ impl SubscriptionMap {
641639
.indexed
642640
.entry(index.clone())
643641
.or_insert_with(|| (index_reads.fields.clone(), IntervalMap::new()));
644-
interval_map.insert(id, index_reads.intervals.clone());
642+
interval_map
643+
.insert(id, index_reads.intervals.iter())
644+
.expect("stored more than u32::MAX intervals?");
645645
}
646646
for (index, reads) in reads.iter_search() {
647647
self.search.insert(id, index, reads);
@@ -654,7 +654,7 @@ impl SubscriptionMap {
654654
.indexed
655655
.get_mut(index)
656656
.unwrap_or_else(|| panic!("Missing index entry for {}", index));
657-
assert!(range_map.remove(id).is_some());
657+
range_map.remove(id);
658658
if range_map.is_empty() {
659659
self.indexed.remove(index);
660660
}

crates/database/src/system_query.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,7 @@ impl EqFields {
386386
}
387387

388388
impl From<EqFields> for Interval {
389+
#[inline]
389390
fn from(value: EqFields) -> Self {
390391
Interval::prefix(value.prefix.into())
391392
}

crates/indexing/src/interval.rs

Lines changed: 0 additions & 55 deletions
This file was deleted.

crates/indexing/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
pub mod backend_in_memory_indexes;
1010
pub mod index_registry;
11-
pub mod interval;
1211
mod metrics;
1312

1413
#[cfg(test)]

crates/interval_map/Cargo.toml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
[package]
2+
name = "interval_map"
3+
version = "0.1.0"
4+
edition = "2021"
5+
license = "LicenseRef-FSL-1.1-Apache-2.0"
6+
7+
[dependencies]
8+
common = { path = "../common" }
9+
fastrand = { workspace = true }
10+
slab = { workspace = true }
11+
12+
[dev-dependencies]
13+
cmd_util = { path = "../cmd_util" }
14+
common = { path = "../common", features = ["testing"] }
15+
proptest = { workspace = true }
16+
proptest-derive = { workspace = true }
17+
18+
[lints]
19+
workspace = true

0 commit comments

Comments
 (0)