Skip to content

Commit 77ffcb8

Browse files
Use BucketQueueView in the protocol. (#2807)
## Motivation The `BucketQueueView` allows to use queues more efficiently. ## Proposal It is known that using Queues on Key-Value stores is kind of an antipattern for KeyValue stores. Grouping entries into buckets can reduce this problem. The problem is that grouping increases the memory cost and exposes to possible Out Of Memory. Thus we did the following: * For `QueueView` with `BlockHeight` being the type, we use a bucket size of 1000 since BlockHeight is just 8 bytes. * For `QueueView` with `TimestampedBundleInInbox` we use a bucket size of 100 since this type is essentially 4 Cryptohashes, a few number and a channel name which can be long but not too long. * For `QueueView` with `MessageBundle` no change is made as this type can be very large. It is to be pointed out that `BucketQueueView` has two differences with `QueueView`: * Buckets indeed. * The `front` is no longer an async operation, which means the front is accessible just after the `load`. On the other hand the `delete_front` becomes an async operation. ## Test Plan (A) The CI is the recommended way. (B) More tests would be needed for correctness. (C) Runtime would be helpful to gauge the gain effectively obtained. ## Release Plan This PR changes the way that data is stored in the storage, therefore using it on existing TestNet / DevNet is impossible. ## Links None.
1 parent 68a922f commit 77ffcb8

File tree

6 files changed

+35
-23
lines changed

6 files changed

+35
-23
lines changed

linera-chain/src/chain.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ use linera_execution::{
3131
ResourceTracker, ServiceRuntimeEndpoint, TransactionTracker,
3232
};
3333
use linera_views::{
34+
bucket_queue_view::BucketQueueView,
3435
context::Context,
3536
log_view::LogView,
3637
map_view::MapView,
37-
queue_view::QueueView,
3838
reentrant_collection_view::ReentrantCollectionView,
3939
register_view::RegisterView,
4040
set_view::SetView,
@@ -205,6 +205,10 @@ impl BundleInInbox {
205205
}
206206
}
207207

208+
// The `TimestampedBundleInInbox` is a relatively small type, so a total
209+
// of 100 seems reasonable for the storing of the data.
210+
const TIMESTAMPBUNDLE_BUCKET_SIZE: usize = 100;
211+
208212
/// A view accessing the state of a chain.
209213
#[derive(Debug, RootView, ClonableView, SimpleObject)]
210214
#[graphql(cache_control(no_cache))]
@@ -239,7 +243,8 @@ where
239243
/// Mailboxes used to receive messages indexed by their origin.
240244
pub inboxes: ReentrantCollectionView<C, Origin, InboxStateView<C>>,
241245
/// A queue of unskippable bundles, with the timestamp when we added them to the inbox.
242-
pub unskippable_bundles: QueueView<C, TimestampedBundleInInbox>,
246+
pub unskippable_bundles:
247+
BucketQueueView<C, TimestampedBundleInInbox, TIMESTAMPBUNDLE_BUCKET_SIZE>,
243248
/// Unskippable bundles that have been removed but are still in the queue.
244249
pub removed_unskippable_bundles: SetView<C, BundleInInbox>,
245250
/// The heights of previous blocks that sent messages to the same recipients.
@@ -680,10 +685,10 @@ where
680685
}
681686
if !removed_unskippable.is_empty() {
682687
// Delete all removed bundles from the front of the unskippable queue.
683-
let maybe_front = self.unskippable_bundles.front().await?;
688+
let maybe_front = self.unskippable_bundles.front();
684689
if maybe_front.is_some_and(|ts_entry| removed_unskippable.remove(&ts_entry.entry)) {
685-
self.unskippable_bundles.delete_front();
686-
while let Some(ts_entry) = self.unskippable_bundles.front().await? {
690+
self.unskippable_bundles.delete_front().await?;
691+
while let Some(ts_entry) = self.unskippable_bundles.front() {
687692
if !removed_unskippable.remove(&ts_entry.entry) {
688693
if !self
689694
.removed_unskippable_bundles
@@ -694,7 +699,7 @@ where
694699
}
695700
self.removed_unskippable_bundles.remove(&ts_entry.entry)?;
696701
}
697-
self.unskippable_bundles.delete_front();
702+
self.unskippable_bundles.delete_front().await?;
698703
}
699704
}
700705
for entry in removed_unskippable {

linera-chain/src/outbox.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ use linera_base::data_types::{ArithmeticError, BlockHeight};
88
#[cfg(with_testing)]
99
use linera_views::context::MemoryContext;
1010
use linera_views::{
11+
bucket_queue_view::BucketQueueView,
1112
context::Context,
12-
queue_view::QueueView,
1313
register_view::RegisterView,
1414
views::{ClonableView, View, ViewError},
1515
};
@@ -34,6 +34,12 @@ static OUTBOX_SIZE: LazyLock<HistogramVec> = LazyLock::new(|| {
3434
)
3535
});
3636

37+
// The number of block heights in a bucket
38+
// The `BlockHeight` has just 8 bytes so the size is constant.
39+
// This means that by choosing a size of 1000, we have a
40+
// reasonable size that will not create any memory issues.
41+
const BLOCK_HEIGHT_BUCKET_SIZE: usize = 1000;
42+
3743
/// The state of an outbox
3844
/// * An outbox is used to send messages to another chain.
3945
/// * Internally, this is implemented as a FIFO queue of (increasing) block heights.
@@ -50,7 +56,7 @@ where
5056
pub next_height_to_schedule: RegisterView<C, BlockHeight>,
5157
/// Keep sending these certified blocks of ours until they are acknowledged by
5258
/// receivers.
53-
pub queue: QueueView<C, BlockHeight>,
59+
pub queue: BucketQueueView<C, BlockHeight, BLOCK_HEIGHT_BUCKET_SIZE>,
5460
}
5561

5662
impl<C> OutboxStateView<C>
@@ -82,11 +88,11 @@ where
8288
height: BlockHeight,
8389
) -> Result<Vec<BlockHeight>, ViewError> {
8490
let mut updates = Vec::new();
85-
while let Some(h) = self.queue.front().await? {
91+
while let Some(h) = self.queue.front().cloned() {
8692
if h > height {
8793
break;
8894
}
89-
self.queue.delete_front();
95+
self.queue.delete_front().await?;
9096
updates.push(h);
9197
}
9298
#[cfg(with_metrics)]

linera-core/src/chain_worker/state/attempted_changes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -570,7 +570,7 @@ where
570570
let chain = &mut self.state.chain;
571571
if let (Some(epoch), Some(entry)) = (
572572
chain.execution_state.system.epoch.get(),
573-
chain.unskippable_bundles.front().await?,
573+
chain.unskippable_bundles.front(),
574574
) {
575575
let elapsed = self
576576
.state

linera-core/src/chain_worker/state/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,8 @@ where
549549
let outboxes = self.chain.outboxes.try_load_entries(&targets).await?;
550550
for outbox in outboxes {
551551
let outbox = outbox.expect("Only existing outboxes should be referenced by `indices`");
552-
let front = outbox.queue.front().await?;
553-
if front.is_some_and(|key| key <= height) {
552+
let front = outbox.queue.front();
553+
if front.is_some_and(|key| *key <= height) {
554554
return Ok(false);
555555
}
556556
}

linera-service-graphql-client/gql/service_schema.graphql

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,14 @@ A block height to identify blocks in a chain
207207
scalar BlockHeight
208208

209209

210+
type BucketQueueView_BlockHeight_e824a938 {
211+
entries(count: Int): [BlockHeight!]!
212+
}
213+
214+
type BucketQueueView_TimestampedBundleInInbox_5a630c55 {
215+
entries(count: Int): [TimestampedBundleInInbox!]!
216+
}
217+
210218
"""
211219
An origin and cursor of a unskippable bundle that is no longer in our inbox.
212220
"""
@@ -335,7 +343,7 @@ type ChainStateExtendedView {
335343
"""
336344
A queue of unskippable bundles, with the timestamp when we added them to the inbox.
337345
"""
338-
unskippableBundles: QueueView_TimestampedBundleInInbox_5a630c55!
346+
unskippableBundles: BucketQueueView_TimestampedBundleInInbox_5a630c55!
339347
"""
340348
Unskippable bundles that have been removed but are still in the queue.
341349
"""
@@ -895,7 +903,7 @@ type OutboxStateView {
895903
Keep sending these certified blocks of ours until they are acknowledged by
896904
receivers.
897905
"""
898-
queue: QueueView_BlockHeight_e824a938!
906+
queue: BucketQueueView_BlockHeight_e824a938!
899907
}
900908

901909
"""
@@ -992,18 +1000,10 @@ type QueryRoot {
9921000
version: VersionInfo!
9931001
}
9941002

995-
type QueueView_BlockHeight_e824a938 {
996-
entries(count: Int): [BlockHeight!]!
997-
}
998-
9991003
type QueueView_MessageBundle_f4399f0b {
10001004
entries(count: Int): [MessageBundle!]!
10011005
}
10021006

1003-
type QueueView_TimestampedBundleInInbox_5a630c55 {
1004-
entries(count: Int): [TimestampedBundleInInbox!]!
1005-
}
1006-
10071007
"""
10081008
The recipient of a transfer
10091009
"""

linera-views/src/views/bucket_queue_view.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ fn stored_indices<T>(stored_data: &VecDeque<(usize, Bucket<T>)>, position: usize
126126
/// The size `N` has to be chosen by taking into account the size of the type `T`
127127
/// and the basic size of a block. For example a total size of 100 bytes to 10 KB
128128
/// seems adequate.
129+
#[derive(Debug)]
129130
pub struct BucketQueueView<C, T, const N: usize> {
130131
context: C,
131132
/// The buckets of stored data. If missing, then it has not been loaded. The first index is always loaded.

0 commit comments

Comments
 (0)