-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Use Proto Types in storage instead of bytes #3112
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use Proto Types in storage instead of bytes #3112
Conversation
| message BlockResponse { | ||
| oneof payload { | ||
| Block literal = 1; | ||
| string remote = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think remote should be an object so it's more flexible going forward. My initial thought it that it would look like this:
message RemoteBlockResponse {
string path = 1;
oneof config {
RemoteBlockConfigS3 s3 = 2;
RemoteBlockConfigHTTP http = 3;
}
}
message RemoteBlockConfigS3 {
string region = 1;
string bucket = 2;
bool requester_pays = 3;
optional endpoint = 4;
}
message RemoteBlockConfigHTTP {
string endpoint = 1;
optional map<string, string> headers = 2;
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool. Makes sense to me.
| tokio::sync::mpsc::channel::<Result<BlockResponse, Status>>(16); | ||
| let (tx, rx) = tokio::sync::mpsc::channel::< | ||
| Result<ProtoBlockResponse, Status>, | ||
| >(16); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
16? is this arbitrary? if the client thats receiving the blocks is really slow, wouldn't that result in send failures from the spawned task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll add better documentation.
if the client thats receiving the blocks is really slow, wouldn't that result in send failures from the spawned task?
I think the task will just block until the stream is cleared. From the okio::sync::mpsc docs:
/// # Examples
///
/// In the following example, each call to `send` will block until the
/// previously sent value was received.
///
/// ```rust
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(1);
///
/// tokio::spawn(async move {
/// for i in 0..10 {
/// if let Err(_) = tx.send(i).await {
/// println!("receiver dropped");
/// return;
/// }
/// }
/// });
///
/// while let Some(i) = rx.recv().await {
/// println!("got = {}", i);
/// }
/// }
/// ```
```
I'm not exactly sure what the Tonic stream is doing under the hood. It may be clearing the buffer into its own buffer for us 🤷
| ScriptTx script = 1; | ||
| // CreateTx create = 2; | ||
| // MintTx mint = 3; | ||
| // UpgradeTx upgrade = 4; | ||
| // UploadTx upload = 5; | ||
| // BlobTx blob = 6; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of these being ...Tx can we spell out Transaction? It keeps it cleaner & had no impact to the traffic on the wire.
xgreenx
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice to verify the deserilziation of the blocks
| message Transaction { | ||
| oneof variant { | ||
| ScriptTransaction script = 1; | ||
| // CreateTx create = 2; | ||
| // MintTx mint = 3; | ||
| // UpgradeTx upgrade = 4; | ||
| // UploadTx upload = 5; | ||
| // BlobTx blob = 6; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a todo please? And create an issues to implement it. Or do you want to do that before we merge to the master?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Working on it here:
#3116
I can add an issue to be thorough.
| type OwnedKey = BlockHeight; | ||
| type Value = Self::OwnedValue; | ||
| type OwnedValue = Block; | ||
| type OwnedValue = ProtoBlock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For ProtoBlock we want to use protobuf-based encoder and decoder, not Postcard
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done here:
#3106
| // TODO: Should this be owned to begin with? | ||
| let (header, txs) = block.clone().into_inner(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should work with reference, cloning block looks like an overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we should work with reference in proto_header_from_header and in proto_tx_from_tx
| } | ||
| } | ||
| type GetBlockRangeStream = ReceiverStream<Result<BlockResponse, Status>>; | ||
| type GetBlockRangeStream = ReceiverStream<Result<ProtoBlockResponse, Status>>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| type GetBlockRangeStream = ReceiverStream<Result<ProtoBlockResponse, Status>>; | |
| type GetBlockRangeStream = BoxStream<Result<ProtoBlockResponse, Status>>; |
If you used BoxStream, then you could avoid usage of the tokio::spawn( below. You can just do inner.map({...}).into_boxed()
| // TODO: Should this be owned to begin with? | ||
| let (header, txs) = block.clone().into_inner(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we should work with reference in proto_header_from_header and in proto_tx_from_tx
## Linked Issues/PRs <!-- List of related issues/PRs --> ## Description <!-- List of detailed changes --> ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else?
4bb648d
into
chore/integrate-block-aggregator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is being reviewed by Cursor Bugbot
Details
Your team is on the Bugbot Free tier. On this plan, Bugbot will review limited PRs each billing cycle for each member of your team.
To receive Bugbot reviews on all of your PRs, visit the Cursor dashboard to activate Pro and start your 14-day free trial.
| where | ||
| Api: BlockAggregatorApi<BlockRangeResponse = BlockRangeResponse>, | ||
| DB: BlockAggregatorDB<BlockRangeResponse = BlockRangeResponse>, | ||
| DB: BlockAggregatorDB<Block = Blocks::Block, BlockRangeResponse = BlockRangeResponse>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: API Trait Block Type Inconsistency
The BlockAggregatorApi trait bound is missing the Block associated type constraint. Line 21 specifies Api: BlockAggregatorApi<BlockRangeResponse = BlockRangeResponse> but doesn't constrain Api::Block to equal Blocks::Block, causing a type mismatch. The handle_query method expects BlockAggregatorQuery<BlockRangeResponse, Blocks::Block> but Api::await_query() returns BlockAggregatorQuery<Api::BlockRangeResponse, Api::Block>, which won't compile unless Api::Block = Blocks::Block.
## Linked Issues/PRs Complete feedback from: #3100 #3101 #3112 #3116 As they have all been merged into #3100 ### TODOs #### Completed - [x] Move proto types to new repo (or at least remove dep on protoc) FuelLabs/fuel-core-protobuf#1 - [x] Use ipv4 instead of ipv4 `let listener = TcpListener::bind("[::1]:0").unwrap();` - [x] make sure rpc is optional (may already be good with feature) - [x] "If we add a new database, we should include it in all places where we already interact with databases, like check_version, rollback_to, and so on(check where do we use databa liek relayer)" - [x] "The CombinedDatabase::check_version() method is missing a version check for the new block_aggregation database. This omission is inconsistent with other database checks and could lead to version compatibility issues." - [x] "I would like to mention the reason why this database doesn't force a monotonic increase in height." - [x] "Also, becuase we don't force monotonic height increase, we can't actually re-use rollback feature from historicat database. But we still need the ability to reset the state of the block aggregator to height X. So you can add another function, which we can call in rollback_to." - [x] "I know that ServiceRunner here is an overkill, but we have GraphQL that looks the same, and it was fiiiiine=D Maybe for consistency plus some logging we could reuse it here as well. It's just an internal service for your main service" - [x] `// TODO: Should this be owned to begin with?` Yeah, we should work with reference in proto_header_from_header and in proto_tx_from_tx - [x] maybe "If you used BoxStream, then you could avoid usage of the tokio::spawn( below. You can just do inner.map({...}).into_boxed()" in protobuf_adapater impl of protobuf trait: `type GetBlockRangeStream = BoxStream<Result<ProtoBlockResponse, Status>>;` - [x] `#[allow(unused)] fn arb_inputs()` - [x] in `serializer_adapter.rs`: "We should work with reference to everywhere in this file. And we should also avoid usage of unswap_or_default. All fields are always set, so it is strange why it can be None` - [x] `if let Some(value) = policies.get(PolicyType::Owner) { values[5] = value; }` - [x] in `serializer_adapter.rs`: "Why values for policies in an empty array? If the policy is not set, we shouldn't include them. If no policy is set, then it will be empty vector." - [x] in `crates/types/src/blockchain/header.rs`: "We don't need to expose the application header. You can find an example on how to create a header here: https://github.com/FuelLabs/fuel-rust-indexer/blob/main/crates/receipts_manager/adapters/graphql_event_adapter.rs#L250", "And the same for the block, you don't need manually calculate it, you can do that from the Block::new." - [x] in `types/src/test_helpers.rs`: "Block::new should be enough for you to create a header and a block. You don't need to implement all the logic by yourself.You can clean up getters and setters which you've added, after you udpated code here=) " - [x] Maybe remove `self.go_to_sleep_before_continuing().await;` #### Need Feedback - [ ] in `serializer_adapter.rs`: "If we use Rust definition to define variants inside of enums, then we can remove useless fields like data for MessageCoinPredicate and witness_index for MessageCoinPredicate" (@xgreenx I don't understand this. What "Rust definition"?) #### Noop - [ ] ~in `serializer_adapter.rs`: "I think instead of creating many procedure fucntions, we could implement TryFrom and From for types, plus, we could split them into its own folders by transactions and some common folder."~ **(Since this is in a separate repo now, the TryFrom/From impls wouldn't be valid due to the orphan rule)** - [ ] ~in `api.proto`: "I think all txs have these, maybe we could move them to the top-level Transaction?" DRY up chargeable txs?~**( I don't really care for this abstraction, but I could be convinced to DRY this up. Just doesn't seem like much gain)** - [ ] ~I think we can move fetching of the full blocks to be a part of default functionality provided by FuelClient along with old functionality (in `test-helpers/src/client_ext.rs`)~ **(I don't want to add new functionality in this work, we could do a followup)** - [ ] ~"[nit] https://github.com/FuelLabs/fuel-rust-indexer/blob/main/crates/receipts_manager/service.rs#L584 We can replace the whole service with stream of joined events(old + new)"~ **(WE could do this as a followup)** ## Description <!-- List of detailed changes --> ## Checklist - [ ] Breaking changes are clearly marked as such in the PR description and changelog - [ ] New behavior is reflected in tests - [ ] [The specification](https://github.com/FuelLabs/fuel-specs/) matches the implemented behavior (link update PR if changes are needed) ### Before requesting review - [ ] I have reviewed the code myself - [ ] I have created follow-up issues caused by this PR and linked them here ### After merging, notify other teams [Add or remove entries as needed] - [ ] [Rust SDK](https://github.com/FuelLabs/fuels-rs/) - [ ] [Sway compiler](https://github.com/FuelLabs/sway/) - [ ] [Platform documentation](https://github.com/FuelLabs/devrel-requests/issues/new?assignees=&labels=new+request&projects=&template=NEW-REQUEST.yml&title=%5BRequest%5D%3A+) (for out-of-organization contributors, the person merging the PR will do this) - [ ] Someone else?
Linked Issues/PRs
followup to feedback from #3100
Description
Checklist
Before requesting review
After merging, notify other teams
[Add or remove entries as needed]
Note
Replaces opaque byte blocks with strongly-typed Protobuf messages across API, storage, and streams, adds complete proto schema coverage, and introduces serializers for Fuel <-> Protobuf with tests.
proto/api.proto(versionedBlock,HeaderwithV1/V2, allTransaction/Input/Outputvariants, metadata, policies, etc.).SerializerAdapterto convert FuelBlock/TXs to/from Protobuf (ProtoBlock) with roundtrip support and proptests.BlockAggregatorApi/BlockSource/BlockAggregatorto carry aBlocktype; useProtoBlockthroughout.ProtoBlockliterals inGetBlockRangeandNewBlockSubscription.ProtoBlockinstorage_db(Blockstable,StorageStream); track contiguous height unchanged.ProtoBlockand new schema; add proptest regressions.generate_txns_rootpublic); enablefault-provingpaths; addproptestundertest-helpers.serdeon generated types; new feature flags and minor config updates.Written by Cursor Bugbot for commit c0a6540. This will update automatically on new commits. Configure here.