Skip to content

Commit 6f94385

Browse files
authored
Add new DownloadRawCertificatesByHeight endpoint and use it. (#4478)
## Motivation During our performance tests it was found that proxies' bottleneck is (de)serialization of certificates when responding to `DownloadCertificatesByHeights` request. ## Proposal It is a waste of CPU cycles to deserialize data (after loading from the DB) only to serialize it for transporting over gRPC protocol. This PR: - adds a new method on the storage `read_certificates_raw` which returns raw bytes of the requested certificates - adds a new endpoint to `rpc.proto/ValidatorNode` – `DownloadRawCertificatesByHeight` that responds with (bcs) bytes of the requested certificates - updates proxy code to serve the new endpoint - the gRPC _client_ does NOT get a new method, instead the old `download_certificates_by_height` is modified to use the new endpoint. **This is safe:** b/c we will release this as a new SDK version meaning old clients still use old code paths while next version uses new. ## Test Plan CI ## Release Plan - These changes should be backported to the latest `devnet` branch, then - be released in a new SDK, - be released in a validator hotfix. - These changes should be backported to the latest `testnet` branch, then - be released in a new SDK, - be released in a validator hotfix. ## Links closes #4479 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 44e6312 commit 6f94385

File tree

8 files changed

+174
-19
lines changed

8 files changed

+174
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

linera-rpc/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ web-default = ["web"]
4444
[dependencies]
4545
anyhow.workspace = true
4646
async-trait.workspace = true
47+
bcs.workspace = true
4748
bincode.workspace = true
4849
bytes.workspace = true
4950
cfg-if.workspace = true

linera-rpc/proto/rpc.proto

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,12 @@ service ValidatorNode {
8686
// Download a batch of certificates.
8787
rpc DownloadCertificates(CertificatesBatchRequest) returns (CertificatesBatchResponse);
8888

89-
/// Download a batch of certificates by height range.
89+
/// Download a batch of certificates by heights.
9090
rpc DownloadCertificatesByHeights(DownloadCertificatesByHeightsRequest) returns (CertificatesBatchResponse);
9191

92+
// Download a batch of certificates, in serialized form, by their heights.
93+
rpc DownloadRawCertificatesByHeights(DownloadCertificatesByHeightsRequest) returns (RawCertificatesBatch);
94+
9295
// Return the hash of the `Certificate` that last used a blob.
9396
rpc BlobLastUsedBy(BlobId) returns (CryptoHash);
9497

@@ -100,6 +103,19 @@ service ValidatorNode {
100103
rpc MissingBlobIds(BlobIds) returns (BlobIds);
101104
}
102105

106+
// Batch of raw certificates.
107+
message RawCertificatesBatch {
108+
repeated RawCertificate certificates = 1;
109+
}
110+
111+
// A confirmed block certificate in a serialized form.
112+
message RawCertificate {
113+
// BCS-serialized bytes of lite certificate part.
114+
bytes lite_certificate = 1;
115+
// BCS-serialized bytes of confirmed block part.
116+
bytes confirmed_block = 2;
117+
}
118+
103119
// A request for downloading certificates by block heights.
104120
message DownloadCertificatesByHeightsRequest {
105121
ChainId chain_id = 1;

linera-rpc/src/grpc/client.rs

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use linera_base::{
1414
use linera_chain::{
1515
data_types::{self},
1616
types::{
17-
self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate, Timeout,
18-
ValidatedBlock,
17+
self, Certificate, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
18+
LiteCertificate, Timeout, ValidatedBlock,
1919
},
2020
};
2121
use linera_core::{
@@ -32,8 +32,8 @@ use super::{
3232
transport, GRPC_MAX_MESSAGE_SIZE,
3333
};
3434
use crate::{
35-
HandleConfirmedCertificateRequest, HandleLiteCertRequest, HandleTimeoutCertificateRequest,
36-
HandleValidatedCertificateRequest,
35+
grpc::api::RawCertificate, HandleConfirmedCertificateRequest, HandleLiteCertRequest,
36+
HandleTimeoutCertificateRequest, HandleValidatedCertificateRequest,
3737
};
3838

3939
#[derive(Clone)]
@@ -447,15 +447,26 @@ impl ValidatorNode for GrpcClient {
447447
chain_id,
448448
heights: missing.clone(),
449449
};
450-
let mut received: Vec<ConfirmedBlockCertificate> = Vec::<Certificate>::try_from(
451-
client_delegate!(self, download_certificates_by_heights, request)?,
452-
)?
453-
.into_iter()
454-
.map(|cert| {
455-
ConfirmedBlockCertificate::try_from(cert)
456-
.map_err(|_| NodeError::UnexpectedCertificateValue)
457-
})
458-
.collect::<Result<_, _>>()?;
450+
let mut received: Vec<ConfirmedBlockCertificate> =
451+
client_delegate!(self, download_raw_certificates_by_heights, request)?
452+
.certificates
453+
.into_iter()
454+
.map(
455+
|RawCertificate {
456+
lite_certificate,
457+
confirmed_block,
458+
}| {
459+
let cert = bcs::from_bytes::<LiteCertificate>(&lite_certificate)
460+
.map_err(|_| NodeError::UnexpectedCertificateValue)?;
461+
462+
let block = bcs::from_bytes::<ConfirmedBlock>(&confirmed_block)
463+
.map_err(|_| NodeError::UnexpectedCertificateValue)?;
464+
465+
cert.with_value(block)
466+
.ok_or(NodeError::UnexpectedCertificateValue)
467+
},
468+
)
469+
.collect::<Result<_, _>>()?;
459470

460471
if received.is_empty() {
461472
break;

linera-service/src/exporter/test_utils.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,13 @@ impl ValidatorNode for DummyValidator {
353353
unimplemented!()
354354
}
355355

356+
async fn download_raw_certificates_by_heights(
357+
&self,
358+
_request: Request<linera_rpc::grpc::api::DownloadCertificatesByHeightsRequest>,
359+
) -> Result<Response<linera_rpc::grpc::api::RawCertificatesBatch>, Status> {
360+
unimplemented!()
361+
}
362+
356363
async fn blob_last_used_by(
357364
&self,
358365
_request: Request<linera_rpc::grpc::api::BlobId>,

linera-service/src/proxy/grpc.rs

Lines changed: 82 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ use linera_rpc::{
3535
BlobContent, BlobId, BlobIds, BlockProposal, Certificate, CertificatesBatchRequest,
3636
CertificatesBatchResponse, ChainInfoResult, CryptoHash, HandlePendingBlobRequest,
3737
LiteCertificate, NetworkDescription, Notification, PendingBlobRequest,
38-
PendingBlobResult, SubscriptionRequest, VersionInfo,
38+
PendingBlobResult, RawCertificate, RawCertificatesBatch, SubscriptionRequest,
39+
VersionInfo,
3940
},
4041
pool::GrpcConnectionPool,
4142
GrpcProtoConversionError, GrpcProxyable, GRPC_CHUNKED_MESSAGE_FILL_LIMIT,
@@ -675,6 +676,75 @@ where
675676
.await
676677
}
677678

679+
#[instrument(skip_all, err(Display))]
680+
async fn download_raw_certificates_by_heights(
681+
&self,
682+
request: Request<api::DownloadCertificatesByHeightsRequest>,
683+
) -> Result<Response<api::RawCertificatesBatch>, Status> {
684+
let original_request: CertificatesByHeightRequest = request.into_inner().try_into()?;
685+
let chain_info_request = ChainInfoQuery::new(original_request.chain_id)
686+
.with_sent_certificate_hashes_by_heights(original_request.heights);
687+
// Use handle_chain_info_query to get the certificate hashes
688+
let chain_info_response = self
689+
.handle_chain_info_query(Request::new(chain_info_request.try_into()?))
690+
.await?;
691+
// Extract the ChainInfoResult from the response
692+
let chain_info_result = chain_info_response.into_inner();
693+
// Extract the certificate hashes from the ChainInfo
694+
let hashes = match chain_info_result.inner {
695+
Some(api::chain_info_result::Inner::ChainInfoResponse(response)) => {
696+
let chain_info: ChainInfo =
697+
bincode::deserialize(&response.chain_info).map_err(|e| {
698+
Status::internal(format!("Failed to deserialize ChainInfo: {}", e))
699+
})?;
700+
chain_info.requested_sent_certificate_hashes
701+
}
702+
Some(api::chain_info_result::Inner::Error(error)) => {
703+
return Err(Status::internal(format!(
704+
"Chain info query failed: {:?}",
705+
error
706+
)));
707+
}
708+
None => {
709+
return Err(Status::internal("Empty chain info result"));
710+
}
711+
};
712+
713+
// Use 70% of the max message size as a buffer capacity.
714+
// Leave 30% as overhead.
715+
let mut grpc_message_limiter: GrpcMessageLimiter<linera_chain::types::Certificate> =
716+
GrpcMessageLimiter::new(GRPC_CHUNKED_MESSAGE_FILL_LIMIT);
717+
718+
let mut returned_certificates = vec![];
719+
720+
'outer: for batch in hashes.chunks(100) {
721+
let certificates: Vec<(Vec<u8>, Vec<u8>)> = self
722+
.0
723+
.storage
724+
.read_certificates_raw(batch.to_vec())
725+
.await
726+
.map_err(Self::view_error_to_status)?
727+
.into_iter()
728+
.collect();
729+
for (lite_cert_bytes, confirmed_block_bytes) in certificates {
730+
if grpc_message_limiter
731+
.fits_raw(lite_cert_bytes.len() + confirmed_block_bytes.len())
732+
{
733+
returned_certificates.push(RawCertificate {
734+
lite_certificate: lite_cert_bytes,
735+
confirmed_block: confirmed_block_bytes,
736+
});
737+
} else {
738+
break 'outer;
739+
}
740+
}
741+
}
742+
743+
Ok(Response::new(RawCertificatesBatch {
744+
certificates: returned_certificates,
745+
}))
746+
}
747+
678748
#[instrument(skip_all, err(level = Level::WARN))]
679749
async fn blob_last_used_by(
680750
&self,
@@ -764,11 +834,18 @@ impl<T> GrpcMessageLimiter<T> {
764834
U: TryFrom<T, Error = GrpcProtoConversionError> + Message,
765835
{
766836
let required = U::try_from(el).map(|proto| proto.encoded_len())?;
767-
if required > self.remaining {
768-
return Ok(false);
837+
Ok(self.fits_raw(required))
838+
}
839+
840+
/// Adds the given number of bytes to the remaining capacity.
841+
///
842+
/// Returns whether we managed to fit the element.
843+
fn fits_raw(&mut self, bytes_len: usize) -> bool {
844+
if self.remaining < bytes_len {
845+
return false;
769846
}
770-
self.remaining -= required;
771-
Ok(true)
847+
self.remaining = self.remaining.saturating_sub(bytes_len);
848+
true
772849
}
773850
}
774851

linera-storage/src/db_storage.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -824,6 +824,37 @@ where
824824
Ok(certificates)
825825
}
826826

827+
/// Reads certificates by hashes.
828+
///
829+
/// Returns a vector of tuples where the first element is a lite certificate
830+
/// and the second element is confirmed block.
831+
///
832+
/// It does not check if all hashes all returned.
833+
async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
834+
&self,
835+
hashes: I,
836+
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError> {
837+
let hashes = hashes.into_iter().collect::<Vec<_>>();
838+
if hashes.is_empty() {
839+
return Ok(Vec::new());
840+
}
841+
let keys = Self::get_keys_for_certificates(&hashes)?;
842+
let store = self.database.open_shared(&[])?;
843+
let values = store.read_multi_values_bytes(keys).await?;
844+
#[cfg(with_metrics)]
845+
metrics::READ_CERTIFICATES_COUNTER
846+
.with_label_values(&[])
847+
.inc_by(hashes.len() as u64);
848+
Ok(values
849+
.chunks_exact(2)
850+
.filter_map(|chunk| {
851+
let lite_cert_bytes = chunk[0].as_ref()?;
852+
let confirmed_block_bytes = chunk[1].as_ref()?;
853+
Some((lite_cert_bytes.clone(), confirmed_block_bytes.clone()))
854+
})
855+
.collect())
856+
}
857+
827858
async fn read_event(&self, event_id: EventId) -> Result<Option<Vec<u8>>, ViewError> {
828859
let store = self.database.open_shared(&[])?;
829860
let event_key = bcs::to_bytes(&BaseKey::Event(event_id.clone()))?;

linera-storage/src/lib.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,17 @@ pub trait Storage: Sized {
140140
hashes: I,
141141
) -> Result<Vec<Option<ConfirmedBlockCertificate>>, ViewError>;
142142

143+
/// Reads certificates by hashes.
144+
///
145+
/// Returns a vector of tuples where the first element is a lite certificate
146+
/// and the second element is confirmed block.
147+
///
148+
/// It does not check if all hashes all returned.
149+
async fn read_certificates_raw<I: IntoIterator<Item = CryptoHash> + Send>(
150+
&self,
151+
hashes: I,
152+
) -> Result<Vec<(Vec<u8>, Vec<u8>)>, ViewError>;
153+
143154
/// Reads the event with the given ID.
144155
async fn read_event(&self, id: EventId) -> Result<Option<Vec<u8>>, ViewError>;
145156

0 commit comments

Comments
 (0)