Skip to content

Commit a67d545

Browse files
Introduce multi_get_pairs operations and group another access to preprocessed_blocks. (#4827)
## Motivation There are more uses of `multi_get` that are possible to group. We also introduced some `multi_get_pairs` functions in order to avoid some `.zip` constructions. #4802 ## Proposal The `multi_get_pairs` is introduced for `*MapView` and `LogView`. For the `*CollectionView`, we introduce similar functions. The pattern in `state.rs` was grouped. When possible the `.zip` were eliminated. ## Test Plan The CI. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links None.
1 parent cc37f64 commit a67d545

File tree

6 files changed

+442
-20
lines changed

6 files changed

+442
-20
lines changed

linera-chain/src/chain.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -825,10 +825,12 @@ where
825825
}
826826

827827
let recipients = block_execution_tracker.recipients();
828-
let heights = previous_message_blocks_view.multi_get(&recipients).await?;
829828
let mut recipient_heights = Vec::new();
830829
let mut indices = Vec::new();
831-
for (height, recipient) in heights.into_iter().zip(recipients) {
830+
for (recipient, height) in previous_message_blocks_view
831+
.multi_get_pairs(recipients)
832+
.await?
833+
{
832834
if let Some(height) = height {
833835
let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
834836
indices.push(index);
@@ -845,10 +847,9 @@ where
845847
}
846848

847849
let streams = block_execution_tracker.event_streams();
848-
let heights = previous_event_blocks_view.multi_get(&streams).await?;
849850
let mut stream_heights = Vec::new();
850851
let mut indices = Vec::new();
851-
for (stream, height) in streams.into_iter().zip(heights) {
852+
for (stream, height) in previous_event_blocks_view.multi_get_pairs(streams).await? {
852853
if let Some(height) = height {
853854
let index = usize::try_from(height.0).map_err(|_| ArithmeticError::Overflow)?;
854855
indices.push(index);
@@ -1241,9 +1242,12 @@ where
12411242
}
12421243

12431244
let mut updated_streams = BTreeSet::new();
1244-
let next_indices = self.next_expected_events.multi_get(&stream_ids).await?;
1245-
for ((next_index, indices), stream_id) in
1246-
next_indices.into_iter().zip(list_indices).zip(stream_ids)
1245+
for ((stream_id, next_index), indices) in self
1246+
.next_expected_events
1247+
.multi_get_pairs(stream_ids)
1248+
.await?
1249+
.into_iter()
1250+
.zip(list_indices)
12471251
{
12481252
let initial_index = if stream_id == StreamId::system(EPOCH_STREAM_NAME) {
12491253
// we don't expect the epoch stream to contain event 0

linera-core/src/chain_worker/state.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -478,17 +478,21 @@ where
478478
})
479479
})
480480
.collect::<Result<Vec<_>, _>>()?;
481-
for height in heights.range(next_block_height..) {
482-
hashes.push(
483-
self.chain
484-
.preprocessed_blocks
485-
.get(height)
486-
.await?
487-
.ok_or_else(|| WorkerError::PreprocessedBlocksEntryNotFound {
488-
height: *height,
489-
chain_id: self.chain_id(),
490-
})?,
491-
);
481+
let requested_heights: Vec<BlockHeight> = heights
482+
.range(next_block_height..)
483+
.copied()
484+
.collect::<Vec<BlockHeight>>();
485+
for (height, hash) in self
486+
.chain
487+
.preprocessed_blocks
488+
.multi_get_pairs(requested_heights)
489+
.await?
490+
{
491+
let hash = hash.ok_or_else(|| WorkerError::PreprocessedBlocksEntryNotFound {
492+
height,
493+
chain_id: self.chain_id(),
494+
})?;
495+
hashes.push(hash);
492496
}
493497
let certificates = self.storage.read_certificates(hashes.clone()).await?;
494498
let certificates = match ResultReadCertificates::new(certificates, hashes) {
@@ -497,7 +501,7 @@ where
497501
return Err(WorkerError::ReadCertificatesError(hashes))
498502
}
499503
};
500-
let certificates = heights
504+
let height_to_certificates = heights
501505
.into_iter()
502506
.zip(certificates)
503507
.collect::<HashMap<_, _>>();
@@ -506,7 +510,7 @@ where
506510
for (recipient, heights) in heights_by_recipient {
507511
let mut bundles = Vec::new();
508512
for height in heights {
509-
let cert = certificates
513+
let cert = height_to_certificates
510514
.get(&height)
511515
.ok_or_else(|| ChainError::InternalError("missing certificates".to_string()))?;
512516
bundles.extend(cert.message_bundles_for(recipient));

linera-views/src/views/collection_view.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,37 @@ impl<W: View> ByteCollectionView<W::Context, W> {
415415
Ok(results)
416416
}
417417

418+
/// Loads multiple entries for reading at once with their keys.
419+
/// ```rust
420+
/// # tokio_test::block_on(async {
421+
/// # use linera_views::context::MemoryContext;
422+
/// # use linera_views::collection_view::ByteCollectionView;
423+
/// # use linera_views::register_view::RegisterView;
424+
/// # use linera_views::views::View;
425+
/// # let context = MemoryContext::new_for_testing(());
426+
/// let mut view: ByteCollectionView<_, RegisterView<_, String>> =
427+
/// ByteCollectionView::load(context).await.unwrap();
428+
/// {
429+
/// let subview = view.load_entry_mut(&vec![0, 1]).await.unwrap();
430+
/// subview.set("Bonjour".into());
431+
/// }
432+
/// let short_keys = vec![vec![0, 1], vec![0, 2]];
433+
/// let pairs = view.try_load_entries_pairs(short_keys).await.unwrap();
434+
/// assert_eq!(pairs[0].0, vec![0, 1]);
435+
/// assert_eq!(pairs[1].0, vec![0, 2]);
436+
/// let value0 = pairs[0].1.as_ref().unwrap().get();
437+
/// assert_eq!(*value0, "Bonjour".to_string());
438+
/// assert!(pairs[1].1.is_none());
439+
/// # })
440+
/// ```
441+
pub async fn try_load_entries_pairs(
442+
&self,
443+
short_keys: Vec<Vec<u8>>,
444+
) -> Result<Vec<(Vec<u8>, Option<ReadGuardedView<W>>)>, ViewError> {
445+
let values = self.try_load_entries(short_keys.clone()).await?;
446+
Ok(short_keys.into_iter().zip(values).collect())
447+
}
448+
418449
/// Load all entries for reading at once.
419450
/// ```rust
420451
/// # tokio_test::block_on(async {
@@ -1048,6 +1079,39 @@ impl<I: Serialize, W: View> CollectionView<W::Context, I, W> {
10481079
self.collection.try_load_entries(short_keys).await
10491080
}
10501081

1082+
/// Loads multiple entries for reading at once with their keys.
1083+
/// The entries in indices have to be all distinct.
1084+
/// ```rust
1085+
/// # tokio_test::block_on(async {
1086+
/// # use linera_views::context::MemoryContext;
1087+
/// # use linera_views::collection_view::CollectionView;
1088+
/// # use linera_views::register_view::RegisterView;
1089+
/// # use linera_views::views::View;
1090+
/// # let context = MemoryContext::new_for_testing(());
1091+
/// let mut view: CollectionView<_, u64, RegisterView<_, String>> =
1092+
/// CollectionView::load(context).await.unwrap();
1093+
/// {
1094+
/// let _subview = view.load_entry_or_insert(&23).await.unwrap();
1095+
/// }
1096+
/// let indices = [23, 24];
1097+
/// let subviews = view.try_load_entries_pairs(indices).await.unwrap();
1098+
/// let value0 = subviews[0].1.as_ref().unwrap().get();
1099+
/// assert_eq!(*value0, String::default());
1100+
/// # })
1101+
/// ```
1102+
pub async fn try_load_entries_pairs<Q>(
1103+
&self,
1104+
indices: impl IntoIterator<Item = Q>,
1105+
) -> Result<Vec<(Q, Option<ReadGuardedView<W>>)>, ViewError>
1106+
where
1107+
I: Borrow<Q>,
1108+
Q: Serialize + Clone,
1109+
{
1110+
let indices_vec: Vec<Q> = indices.into_iter().collect();
1111+
let values = self.try_load_entries(indices_vec.iter()).await?;
1112+
Ok(indices_vec.into_iter().zip(values).collect())
1113+
}
1114+
10511115
/// Load all entries for reading at once.
10521116
/// ```rust
10531117
/// # tokio_test::block_on(async {
@@ -1463,6 +1527,39 @@ impl<I: CustomSerialize, W: View> CustomCollectionView<W::Context, I, W> {
14631527
self.collection.try_load_entries(short_keys).await
14641528
}
14651529

1530+
/// Loads multiple entries for reading at once with their keys.
1531+
/// The entries in indices have to be all distinct.
1532+
/// ```rust
1533+
/// # tokio_test::block_on(async {
1534+
/// # use linera_views::context::MemoryContext;
1535+
/// # use linera_views::collection_view::CustomCollectionView;
1536+
/// # use linera_views::register_view::RegisterView;
1537+
/// # use linera_views::views::View;
1538+
/// # let context = MemoryContext::new_for_testing(());
1539+
/// let mut view: CustomCollectionView<_, u128, RegisterView<_, String>> =
1540+
/// CustomCollectionView::load(context).await.unwrap();
1541+
/// {
1542+
/// let _subview = view.load_entry_or_insert(&23).await.unwrap();
1543+
/// }
1544+
/// let indices = [23, 42];
1545+
/// let subviews = view.try_load_entries_pairs(indices).await.unwrap();
1546+
/// let value0 = subviews[0].1.as_ref().unwrap().get();
1547+
/// assert_eq!(*value0, String::default());
1548+
/// # })
1549+
/// ```
1550+
pub async fn try_load_entries_pairs<Q>(
1551+
&self,
1552+
indices: impl IntoIterator<Item = Q>,
1553+
) -> Result<Vec<(Q, Option<ReadGuardedView<W>>)>, ViewError>
1554+
where
1555+
I: Borrow<Q>,
1556+
Q: CustomSerialize + Clone,
1557+
{
1558+
let indices_vec: Vec<Q> = indices.into_iter().collect();
1559+
let values = self.try_load_entries(indices_vec.iter()).await?;
1560+
Ok(indices_vec.into_iter().zip(values).collect())
1561+
}
1562+
14661563
/// Load all entries for reading at once.
14671564
/// ```rust
14681565
/// # tokio_test::block_on(async {

linera-views/src/views/log_view.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,30 @@ where
280280
Ok(result)
281281
}
282282

283+
/// Reads the index-value pairs at the given positions.
284+
/// ```rust
285+
/// # tokio_test::block_on(async {
286+
/// # use linera_views::context::MemoryContext;
287+
/// # use linera_views::log_view::LogView;
288+
/// # use linera_views::views::View;
289+
/// # let context = MemoryContext::new_for_testing(());
290+
/// let mut log = LogView::load(context).await.unwrap();
291+
/// log.push(34);
292+
/// log.push(42);
293+
/// assert_eq!(
294+
/// log.multi_get_pairs(vec![0, 1, 5]).await.unwrap(),
295+
/// vec![(0, Some(34)), (1, Some(42)), (5, None)]
296+
/// );
297+
/// # })
298+
/// ```
299+
pub async fn multi_get_pairs(
300+
&self,
301+
indices: Vec<usize>,
302+
) -> Result<Vec<(usize, Option<T>)>, ViewError> {
303+
let values = self.multi_get(indices.clone()).await?;
304+
Ok(indices.into_iter().zip(values).collect())
305+
}
306+
283307
async fn read_context(&self, range: Range<usize>) -> Result<Vec<T>, ViewError> {
284308
let count = range.len();
285309
let mut keys = Vec::with_capacity(count);

linera-views/src/views/map_view.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,36 @@ where
377377
Ok(results)
378378
}
379379

380+
/// Reads the key-value pairs at the given positions, if any.
381+
/// ```rust
382+
/// # tokio_test::block_on(async {
383+
/// # use linera_views::context::MemoryContext;
384+
/// # use linera_views::map_view::ByteMapView;
385+
/// # use linera_views::views::View;
386+
/// # let context = MemoryContext::new_for_testing(());
387+
/// let mut map = ByteMapView::load(context).await.unwrap();
388+
/// map.insert(vec![0, 1], String::from("Hello"));
389+
/// let pairs = map
390+
/// .multi_get_pairs(vec![vec![0, 1], vec![0, 2]])
391+
/// .await
392+
/// .unwrap();
393+
/// assert_eq!(
394+
/// pairs,
395+
/// vec![
396+
/// (vec![0, 1], Some(String::from("Hello"))),
397+
/// (vec![0, 2], None)
398+
/// ]
399+
/// );
400+
/// # })
401+
/// ```
402+
pub async fn multi_get_pairs(
403+
&self,
404+
short_keys: Vec<Vec<u8>>,
405+
) -> Result<Vec<(Vec<u8>, Option<V>)>, ViewError> {
406+
let values = self.multi_get(short_keys.clone()).await?;
407+
Ok(short_keys.into_iter().zip(values).collect())
408+
}
409+
380410
/// Obtains a mutable reference to a value at a given position if available.
381411
/// ```rust
382412
/// # tokio_test::block_on(async {
@@ -1193,6 +1223,41 @@ where
11931223
self.map.multi_get(short_keys).await
11941224
}
11951225

1226+
/// Reads the index-value pairs at the given positions, if any.
1227+
/// ```rust
1228+
/// # tokio_test::block_on(async {
1229+
/// # use linera_views::context::MemoryContext;
1230+
/// # use linera_views::map_view::MapView;
1231+
/// # use linera_views::views::View;
1232+
/// # let context = MemoryContext::new_for_testing(());
1233+
/// let mut map: MapView<_, u32, _> = MapView::load(context).await.unwrap();
1234+
/// map.insert(&(37 as u32), String::from("Hello"));
1235+
/// map.insert(&(49 as u32), String::from("Bonjour"));
1236+
/// assert_eq!(
1237+
/// map.multi_get_pairs([37 as u32, 49 as u32, 64 as u32])
1238+
/// .await
1239+
/// .unwrap(),
1240+
/// vec![
1241+
/// (37 as u32, Some(String::from("Hello"))),
1242+
/// (49 as u32, Some(String::from("Bonjour"))),
1243+
/// (64 as u32, None)
1244+
/// ]
1245+
/// );
1246+
/// # })
1247+
/// ```
1248+
pub async fn multi_get_pairs<Q>(
1249+
&self,
1250+
indices: impl IntoIterator<Item = Q>,
1251+
) -> Result<Vec<(Q, Option<V>)>, ViewError>
1252+
where
1253+
I: Borrow<Q>,
1254+
Q: Serialize + Clone,
1255+
{
1256+
let indices_vec = indices.into_iter().collect::<Vec<Q>>();
1257+
let values = self.multi_get(indices_vec.iter()).await?;
1258+
Ok(indices_vec.into_iter().zip(values).collect())
1259+
}
1260+
11961261
/// Obtains a mutable reference to a value at a given position if available
11971262
/// ```rust
11981263
/// # tokio_test::block_on(async {
@@ -1708,6 +1773,42 @@ where
17081773
self.map.multi_get(short_keys).await
17091774
}
17101775

1776+
/// Read index-value pairs at several positions, if any.
1777+
/// ```rust
1778+
/// # tokio_test::block_on(async {
1779+
/// # use linera_views::context::MemoryContext;
1780+
/// # use linera_views::map_view::CustomMapView;
1781+
/// # use linera_views::views::View;
1782+
/// # let context = MemoryContext::new_for_testing(());
1783+
/// let mut map: CustomMapView<MemoryContext<()>, u128, String> =
1784+
/// CustomMapView::load(context).await.unwrap();
1785+
/// map.insert(&(34 as u128), String::from("Hello"));
1786+
/// map.insert(&(12 as u128), String::from("Hi"));
1787+
/// assert_eq!(
1788+
/// map.multi_get_pairs([34 as u128, 12 as u128, 89 as u128])
1789+
/// .await
1790+
/// .unwrap(),
1791+
/// vec![
1792+
/// (34 as u128, Some(String::from("Hello"))),
1793+
/// (12 as u128, Some(String::from("Hi"))),
1794+
/// (89 as u128, None)
1795+
/// ]
1796+
/// );
1797+
/// # })
1798+
/// ```
1799+
pub async fn multi_get_pairs<Q>(
1800+
&self,
1801+
indices: impl IntoIterator<Item = Q>,
1802+
) -> Result<Vec<(Q, Option<V>)>, ViewError>
1803+
where
1804+
I: Borrow<Q>,
1805+
Q: CustomSerialize + Clone,
1806+
{
1807+
let indices_vec = indices.into_iter().collect::<Vec<Q>>();
1808+
let values = self.multi_get(indices_vec.iter()).await?;
1809+
Ok(indices_vec.into_iter().zip(values).collect())
1810+
}
1811+
17111812
/// Obtains a mutable reference to a value at a given position if available
17121813
/// ```rust
17131814
/// # tokio_test::block_on(async {

0 commit comments

Comments
 (0)