-
Notifications
You must be signed in to change notification settings - Fork 2.2k
linera_core::client: batch downloading of missing blobs (#4755)
#4768
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -339,7 +339,7 @@ impl<Env: Environment> Client<Env> { | |
| } | ||
| }; | ||
| for certificate in certificates { | ||
| last_info = Some(self.handle_certificate(Box::new(certificate)).await?.info); | ||
| last_info = Some(self.handle_certificate(certificate).await?.info); | ||
| } | ||
| // Now download the rest in batches from the remote node. | ||
| while next_height < stop { | ||
|
|
@@ -361,6 +361,24 @@ impl<Env: Environment> Client<Env> { | |
| Ok(last_info) | ||
| } | ||
|
|
||
| async fn download_blobs( | ||
| &self, | ||
| remote_node: &RemoteNode<impl ValidatorNode>, | ||
| blob_ids: impl IntoIterator<Item = BlobId>, | ||
| ) -> Result<(), ChainClientError> { | ||
| self.local_node | ||
| .store_blobs( | ||
| &futures::stream::iter(blob_ids.into_iter().map(|blob_id| async move { | ||
| remote_node.try_download_blob(blob_id).await.unwrap() | ||
| })) | ||
| .buffer_unordered(self.options.max_joined_tasks) | ||
| .collect::<Vec<_>>() | ||
| .await, | ||
| ) | ||
| .await | ||
| .map_err(Into::into) | ||
| } | ||
|
|
||
| /// Tries to process all the certificates, requesting any missing blobs from the given node. | ||
| /// Returns the chain info of the last successfully processed certificate. | ||
| #[instrument(level = "trace", skip_all)] | ||
|
|
@@ -370,34 +388,47 @@ impl<Env: Environment> Client<Env> { | |
| certificates: Vec<ConfirmedBlockCertificate>, | ||
| ) -> Result<Option<Box<ChainInfo>>, ChainClientError> { | ||
| let mut info = None; | ||
| for certificate in certificates { | ||
| let certificate = Box::new(certificate); | ||
| let mut result = self.handle_certificate(certificate.clone()).await; | ||
| let required_blob_ids: Vec<_> = certificates | ||
| .iter() | ||
| .flat_map(|certificate| certificate.value().required_blob_ids()) | ||
| .collect(); | ||
|
|
||
| if let Err(LocalNodeError::BlobsNotFound(blob_ids)) = &result { | ||
| let blobs = | ||
| futures::stream::iter(blob_ids.iter().copied().map(|blob_id| async move { | ||
| remote_node.try_download_blob(blob_id).await.unwrap() | ||
| })) | ||
| .buffer_unordered(self.options.max_joined_tasks) | ||
| .collect::<Vec<_>>() | ||
| .await; | ||
| self.local_node.store_blobs(&blobs).await?; | ||
| result = self.handle_certificate(certificate.clone()).await; | ||
| match self | ||
| .local_node | ||
| .read_blob_states_from_storage(&required_blob_ids) | ||
| .await | ||
| { | ||
| Err(LocalNodeError::BlobsNotFound(blob_ids)) => { | ||
| self.download_blobs(remote_node, blob_ids).await?; | ||
| } | ||
| x => { | ||
| x?; | ||
| } | ||
| } | ||
|
|
||
| info = Some(result?.info); | ||
| for certificate in certificates { | ||
| info = Some( | ||
| match self.handle_certificate(certificate.clone()).await { | ||
| Err(LocalNodeError::BlobsNotFound(blob_ids)) => { | ||
| self.download_blobs(remote_node, blob_ids).await?; | ||
| self.handle_certificate(certificate).await? | ||
| } | ||
| x => x?, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (I'd prefer
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
| .info, | ||
| ); | ||
| } | ||
|
|
||
| // Done with all certificates. | ||
| Ok(info) | ||
| } | ||
|
|
||
| async fn handle_certificate<T: ProcessableCertificate>( | ||
| &self, | ||
| certificate: Box<GenericCertificate<T>>, | ||
| certificate: GenericCertificate<T>, | ||
| ) -> Result<ChainInfoResponse, LocalNodeError> { | ||
| self.local_node | ||
| .handle_certificate(*certificate, &self.notifier) | ||
| .handle_certificate(certificate, &self.notifier) | ||
| .await | ||
| } | ||
|
|
||
|
|
@@ -489,7 +520,7 @@ impl<Env: Environment> Client<Env> { | |
| &self, | ||
| certificate: Box<GenericCertificate<T>>, | ||
| ) -> Result<(), LocalNodeError> { | ||
| let info = self.handle_certificate(certificate).await?.info; | ||
| let info = self.handle_certificate(*certificate).await?.info; | ||
| self.update_from_info(&info); | ||
| Ok(()) | ||
| } | ||
|
|
@@ -707,8 +738,6 @@ impl<Env: Environment> Client<Env> { | |
| mode: ReceiveCertificateMode, | ||
| nodes: Option<Vec<RemoteNode<Env::ValidatorNode>>>, | ||
| ) -> Result<(), ChainClientError> { | ||
| let certificate = Box::new(certificate); | ||
|
|
||
| // Verify the certificate before doing any expensive networking. | ||
| let (max_epoch, committees) = self.admin_committees().await?; | ||
| if let ReceiveCertificateMode::NeedsCheck = mode { | ||
|
|
@@ -1145,7 +1174,7 @@ impl<Env: Environment> Client<Env> { | |
| }; | ||
|
|
||
| if let Some(timeout) = remote_info.manager.timeout { | ||
| self.handle_certificate(Box::new(*timeout)).await?; | ||
| self.handle_certificate(*timeout).await?; | ||
| } | ||
| let mut proposals = Vec::new(); | ||
| if let Some(proposal) = remote_info.manager.requested_signed_proposal { | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can we use
let .. =for complex expressions in argument position?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't love the added mental load of introducing unnecessary variables. It makes it harder to read the code from top to bottom since you don't know how the thing is going to be used until later, compared to having the top-level/motivating call (
store_blobshere) first.It's sometimes worth it if it's really unclear what the value of the expression is without a name (though that's a bit of a code smell for the names of the functions &c. involved in the expressions), but in this case I think it's pretty obvious that this is a set of blobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(best of both worlds is obviously Haskell-esque
😄)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our brains don't seem to work in the same way because for me this style hurts a lot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The intermediate variable (when named appropriately) acts as a summary of the complex expression.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Values are protocols between a constructor and a consumer. That means there are two information flows that you want to make clear at the point of seeing a value: one is upstream ‘what the value is’; the other is downstream ‘how the value is going to be used’. Usually ‘what the value is’ is evident from ‘how the value is constructed’; sometimes there are also non-obvious invariants that aren't clear from the way the value is constructed, and then IMO it's worth giving the expression a name via variable assignment, named function argument, et cetera, though a better solution (IMO) is to make the constructing variable clearer, e.g. by extracting it into a function whose name or (better) type expresses the invariants.
In your example the problem is that
tasksdoesn't really tell you what the tasks are for. Are we going to count them? Track their progress? That doesn't just make it harder to see what the function as a whole does, it also might have a bearing on what the tasks themselves actually are above and beyond what's captured in the name: if we're planning to run the tasks then it's implied that they're in a runnable state (not errored, exited, et cetera). In a small two-line example like this you can quickly look at the next line to see how the information is flowing, but as functions get bigger it's easy to lose track, because you've essentially decomposed a tree:into a (backwards!) list of edges:
(and usually with more noise between them!). Now you have to pay attention to and remember all the names in order to be able to mentally reconstruct the (tree-shaped) information flow of the data, rather than it being inherent in the syntax.
Concretely, in this case, I'd probably call the variable
blobs. But that defers the information about why we're interested in having a set of blobs (to store them)! You could call the variableblobs_to_pass_to_store_blobsbut that's a little silly to do for everything, and to get the same level of information that's given by having syntactic nesting that reflects the flow of information you need to do this transitively:In the case where the expression is very opaque, a quick fix can be to give it a descriptive name that describes its value:
This still obscures the information flow, but it's probably worth it if you don't want to refactor/rename the mysterious function into something more readable (a named function argument gives you the best of both worlds here). But in the case where the expression directly suggests what's being returned, e.g. when you're mapping a function called
try_download_blob, declaring that it returns ‘blobs’ is information-free.There's another argument in your comment about specifically the
?and.await, (I guess) implying that any expression ending with either of those should be assigned to a variable to make them ‘loud, explicit syntax’ as Stroustrup would say. I think this is somewhat a matter of opinion and culture, so I won't try to argue it from first principles, but Rust-the-language has made the decision that these operations are unsurprising enough today to be worthy of the terser syntax after extensive discussion (rather than, say,tryorawaitstatements/blocks) and I think idiomatic Rust shouldn't second-guess that.