Skip to content

Commit 7bc7fef

Browse files
Update the storage service reading. (#3011)
## Motivation The `read_entries` code of the storage-service is doing a blocking loop. That is inefficient from the viewpoint of concurrency. ## Proposal The following was done: * A vector of handles is created. * The handles are processed and then merged. * On the server side, the code is changed so that it is no longer assumed that the request arrive in order. ## Test Plan The CI proved adequate. The `test_storage_service_big_raw_write` did catch the problem with the ordering of the requests. ## Release Plan Not relevant as we do not use so far the `storage-service` in TestNet / DevNet.
1 parent 8fd535f commit 7bc7fef

File tree

5 files changed

+51
-22
lines changed

5 files changed

+51
-22
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

examples/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

linera-storage-service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ anyhow.workspace = true
2626
async-lock.workspace = true
2727
bcs.workspace = true
2828
clap.workspace = true
29+
futures.workspace = true
2930
linera-base.workspace = true
3031
linera-version.workspace = true
3132
linera-views.workspace = true

linera-storage-service/src/client.rs

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
use std::{mem, sync::Arc};
55

66
use async_lock::{Semaphore, SemaphoreGuard};
7+
use futures::future::join_all;
78
use linera_base::ensure;
89
#[cfg(with_metrics)]
910
use linera_views::metering::MeteredStore;
@@ -96,7 +97,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
9697
if num_chunks == 0 {
9798
Ok(value)
9899
} else {
99-
Self::read_entries(&mut client, message_index, num_chunks).await
100+
self.read_entries(message_index, num_chunks).await
100101
}
101102
}
102103

@@ -161,7 +162,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
161162
let values = values.into_iter().map(|x| x.value).collect::<Vec<_>>();
162163
Ok(values)
163164
} else {
164-
Self::read_entries(&mut client, message_index, num_chunks).await
165+
self.read_entries(message_index, num_chunks).await
165166
}
166167
}
167168

@@ -192,7 +193,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
192193
if num_chunks == 0 {
193194
Ok(keys)
194195
} else {
195-
Self::read_entries(&mut client, message_index, num_chunks).await
196+
self.read_entries(message_index, num_chunks).await
196197
}
197198
}
198199

@@ -227,7 +228,7 @@ impl ReadableKeyValueStore for ServiceStoreClientInternal {
227228
.collect::<Vec<_>>();
228229
Ok(key_values)
229230
} else {
230-
Self::read_entries(&mut client, message_index, num_chunks).await
231+
self.read_entries(message_index, num_chunks).await
231232
}
232233
}
233234
}
@@ -351,21 +352,37 @@ impl ServiceStoreClientInternal {
351352
}
352353
}
353354

355+
async fn read_single_entry(
356+
&self,
357+
message_index: i64,
358+
index: i32,
359+
) -> Result<Vec<u8>, ServiceStoreError> {
360+
let channel = self.channel.clone();
361+
let query = RequestSpecificChunk {
362+
message_index,
363+
index,
364+
};
365+
let request = tonic::Request::new(query);
366+
let mut client = StoreProcessorClient::new(channel);
367+
let response = client.process_specific_chunk(request).await?;
368+
let response = response.into_inner();
369+
let ReplySpecificChunk { chunk } = response;
370+
Ok(chunk)
371+
}
372+
354373
async fn read_entries<S: DeserializeOwned>(
355-
client: &mut StoreProcessorClient<Channel>,
374+
&self,
356375
message_index: i64,
357376
num_chunks: i32,
358377
) -> Result<S, ServiceStoreError> {
359-
let mut value = Vec::new();
378+
let mut handles = Vec::new();
360379
for index in 0..num_chunks {
361-
let query = RequestSpecificChunk {
362-
message_index,
363-
index,
364-
};
365-
let request = tonic::Request::new(query);
366-
let response = client.process_specific_chunk(request).await?;
367-
let response = response.into_inner();
368-
let ReplySpecificChunk { chunk } = response;
380+
let handle = self.read_single_entry(message_index, index);
381+
handles.push(handle);
382+
}
383+
let mut value = Vec::new();
384+
for chunk in join_all(handles).await {
385+
let chunk = chunk?;
369386
value.extend(chunk);
370387
}
371388
Ok(bcs::from_bytes(&value)?)

linera-storage-service/src/server.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,16 @@ enum ServiceStoreServerInternal {
4545
RocksDb(RocksDbStore),
4646
}
4747

48+
#[derive(Default)]
49+
struct BigRead {
50+
num_processed_chunks: usize,
51+
chunks: Vec<Vec<u8>>,
52+
}
53+
4854
#[derive(Default)]
4955
struct PendingBigReads {
5056
index: i64,
51-
chunks_by_index: BTreeMap<i64, Vec<Vec<u8>>>,
57+
big_reads: BTreeMap<i64, BigRead>,
5258
}
5359

5460
struct ServiceStoreServer {
@@ -219,9 +225,11 @@ impl ServiceStoreServer {
219225
let mut pending_big_reads = self.pending_big_reads.write().await;
220226
let message_index = pending_big_reads.index;
221227
pending_big_reads.index += 1;
222-
pending_big_reads
223-
.chunks_by_index
224-
.insert(message_index, chunks);
228+
let big_read = BigRead {
229+
num_processed_chunks: 0,
230+
chunks,
231+
};
232+
pending_big_reads.big_reads.insert(message_index, big_read);
225233
(message_index, num_chunks)
226234
}
227235
}
@@ -457,13 +465,14 @@ impl StoreProcessor for ServiceStoreServer {
457465
index,
458466
} = request;
459467
let mut pending_big_reads = self.pending_big_reads.write().await;
460-
let Some(entry) = pending_big_reads.chunks_by_index.get(&message_index) else {
468+
let Some(entry) = pending_big_reads.big_reads.get_mut(&message_index) else {
461469
return Err(Status::not_found("process_specific_chunk"));
462470
};
463471
let index = index as usize;
464-
let chunk = entry[index].clone();
465-
if entry.len() == index + 1 {
466-
pending_big_reads.chunks_by_index.remove(&message_index);
472+
let chunk = entry.chunks[index].clone();
473+
entry.num_processed_chunks += 1;
474+
if entry.chunks.len() == entry.num_processed_chunks {
475+
pending_big_reads.big_reads.remove(&message_index);
467476
}
468477
let response = ReplySpecificChunk { chunk };
469478
Ok(Response::new(response))

0 commit comments

Comments
 (0)