-
Notifications
You must be signed in to change notification settings - Fork 2.9k
Add remote block cache #3106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: chore/protobuf-api-adatper
Are you sure you want to change the base?
Add remote block cache #3106
Conversation
| >, | ||
| > { | ||
| let block_aggregator_config = config.rpc_config.clone(); | ||
| let sync_from = block_aggregator_config.sync_from.unwrap_or_default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Incorrect default sync height for RPC server
The sync_from variable uses unwrap_or_default() which defaults to BlockHeight(0), but this is incorrect when the genesis block starts at a height other than zero. The genesis_block is available in the parent function init_sub_services at line 285, but it's not passed to init_rpc_server. This means the RPC server will incorrectly sync from height 0 instead of the actual genesis block height when sync_from is None. As noted in the PR discussion, it would be better to use genesis_block.header().height() as the default.
| .rollback_to(target_block_height)?; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Rollback loop exits without checking block aggregation storage
The rollback_to function computes block_aggregation_storage_rolled_back and performs rollback if needed, but the final break condition at lines 518-522 doesn't include a check for block_aggregation_storage_rolled_back. This means the loop can exit even when the block aggregation storage hasn't finished rolling back to the target height. When the rpc feature is enabled, the function should also verify that block_aggregation_storage_rolled_back is true before breaking out of the loop, similar to how it checks gas_price_rolled_back and compression_db_rolled_back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with cursor!
| } | ||
| _ = self.sync_task.await_stop() => { | ||
| Err(Error::BlockSource(anyhow!("Sync task stopped unexpectedly"))) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Duplicate select branches in async block source
The tokio::select! macro in next_block method has duplicate branches. Lines 128-133 use _ pattern bindings for self.importer_task.await_stop() and self.sync_task.await_stop(), but lines 134-139 also poll the same futures with variable bindings. This creates a compile error as tokio::select! cannot have multiple branches polling the same future. The first four branches (lines 128-133) should be removed to keep only the branches that capture the error values.
| Some(Mode::Local(_)) => { | ||
| bail!( | ||
| "Database is configured in S3 mode, but Local storage method was requested. If you would like to run in S3 mode, then please use a clean DB" | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Backwards error message in S3 storage mode check
The error message at line 614 is backwards. When StorageMethod::S3 is requested but the database mode is Local, the error message says "Database is configured in S3 mode, but Local storage method was requested" when it should say the opposite: the database is in Local mode, but S3 storage method was requested. The error message text doesn't match the actual condition being checked.
| } | ||
| _ = self.sync_task.await_stop() => { | ||
| Err(Error::BlockSource(anyhow!("Sync task stopped unexpectedly"))) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Unreachable tokio::select! branches due to duplicate futures
The tokio::select! macro has duplicate branches for the same futures. Lines 128-133 match self.importer_task.await_stop() and self.sync_task.await_stop() by discarding the result with _. Lines 134-139 then try to match the same futures again to bind the error values. In tokio::select!, only the first matching branch executes, so lines 134-139 are unreachable dead code. The intent appears to be capturing error information, but the first branches will always match first.
Additional Locations (1)
| } | ||
| _ = self.sync_task.await_stop() => { | ||
| Err(Error::BlockSource(anyhow!("Sync task stopped unexpectedly"))) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Duplicate tokio::select branches in next_block
The tokio::select! macro contains duplicate branches monitoring the same futures: self.importer_task.await_stop() appears twice (lines 128-130 and 134-136), and self.sync_task.await_stop() appears twice (lines 131-133 and 137-139). In tokio::select!, the first matching branch executes and the others are dropped, making one set of branches unreachable dead code. This appears to be leftover from editing and compromises error handling.
Additional Locations (1)
| } | ||
| _ = self.sync_task.await_stop() => { | ||
| Err(Error::BlockSource(anyhow!("Sync task stopped unexpectedly"))) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Duplicate futures in tokio select branches
The tokio::select! macro contains duplicate branches that await the same futures. Lines 128-129 and 134-135 both await self.importer_task.await_stop(), while lines 131-132 and 137-138 both await self.sync_task.await_stop(). This creates ambiguous behavior where the same future is polled multiple times in the same select statement, and only one branch can ever be taken when a future completes, making the duplicate branches unreachable dead code.
crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs
Outdated
Show resolved
Hide resolved
crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs
Outdated
Show resolved
Hide resolved
crates/fuel-core/src/database.rs
Outdated
| let remove_heights = tx | ||
| .iter_all_keys::<Blocks>(Some(IterDirection::Reverse)) | ||
| .flatten() | ||
| .take_while(|height| height <= &block_height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Rollback removes wrong blocks - inverted condition
The rollback_to method has an inverted condition in the take_while clause. When iterating in reverse (highest to lowest) and rolling back to a target block_height, the code uses height <= &block_height which stops immediately when starting from a height above the target. The correct condition is height > &block_height to collect all blocks above the target that need to be removed. As written, this function will remove nothing (or remove the wrong blocks if the iterator somehow starts below the target), completely breaking the rollback functionality for the block aggregation storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And cursor is right again!
crates/services/block_aggregator_api/src/blocks/importer_and_db_source.rs
Outdated
Show resolved
Hide resolved
| res = router.serve(self.addr) => { | ||
| if let Err(e) = res { | ||
| tracing::error!("BlockAggregator tonic server error: {}", e); | ||
| TaskNextAction::ErrorContinue(anyhow!(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Server cannot recover from errors - router consumed
When the tonic server encounters an error during serve(), the code returns TaskNextAction::ErrorContinue suggesting the task should retry. However, get_router() uses take() which moves the router out of the Option, leaving it as None. On the next run() call, get_router() fails because the router was consumed and never recreated. The error recovery path is broken - either the router needs to be recreated before returning ErrorContinue, or TaskNextAction::Stop should be returned instead since retry cannot succeed.
Additional Locations (1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with cursor!=D
crates/fuel-core/src/database.rs
Outdated
| let remove_heights = tx | ||
| .iter_all_keys::<Blocks>(Some(IterDirection::Reverse)) | ||
| .flatten() | ||
| .take_while(|height| height <= &block_height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Rollback removes wrong blocks due to inverted comparison
The rollback_to function for Database<BlockAggregatorDatabase> has an inverted comparison in the take_while predicate. When iterating in reverse (from highest to lowest), take_while(|height| height <= &block_height) will stop immediately because the first element (highest height) fails the condition when it's above the target. This results in an empty collection, so no blocks get removed during rollback. The predicate should be height > &block_height to collect all heights above the target for removal.
| } | ||
| _ = self.sync_task.await_stop() => { | ||
| Err(Error::BlockSource(anyhow!("Sync task stopped unexpectedly"))) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Duplicate tokio::select branches for await_stop handlers
The next_block function contains duplicate tokio::select! branches for both self.importer_task.await_stop() and self.sync_task.await_stop(). Lines 128-133 were added but the existing handlers at lines 134-139 weren't removed. In tokio::select!, having duplicate futures means only one branch will ever match, making the second set of handlers unreachable dead code. This appears to be an incomplete refactoring where two versions of the error handling were accidentally left in.
| .rollback_to(target_block_height)?; | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with cursor!
crates/fuel-core/src/database.rs
Outdated
| let remove_heights = tx | ||
| .iter_all_keys::<Blocks>(Some(IterDirection::Reverse)) | ||
| .flatten() | ||
| .take_while(|height| height <= &block_height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And cursor is right again!
| pub fn new(url: String) -> Result<Self> { | ||
| let (query_sender, query_receiver) = tokio::sync::mpsc::channel::< | ||
| BlockAggregatorQuery<BlockRangeResponse, ProtoBlock>, | ||
| >(100); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like 100 should be customizable via CLI, becuase it defiend how many concurrent user we can support via RPC
| const ARB_LITERAL_BLOCK_BUFFER_SIZE: usize = 100; | ||
| let req = request.into_inner(); | ||
| let (response, receiver) = tokio::sync::oneshot::channel(); | ||
| let query = BlockAggregatorQuery::GetBlockRange { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like each query will go to main run loop, so all read queries will become sequential, instead of parallel. Looks like you could pass database/s3 configuraiton here, and directly fetch the data fro mthe soruce, isntead of sending a query to the service, which will fetch it from the source for you.
| .map_err(|e| Status::internal(format!("Failed to send query: {}", e)))?; | ||
|
|
||
| let (task_sender, task_receiver) = tokio::sync::mpsc::channel(ARB_CHANNEL_SIZE); | ||
| tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove tokio spwan now, I guess =D
| res = router.serve(self.addr) => { | ||
| if let Err(e) = res { | ||
| tracing::error!("BlockAggregator tonic server error: {}", e); | ||
| TaskNextAction::ErrorContinue(anyhow!(e)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree with cursor!=D
| first: BlockHeight, | ||
| last: BlockHeight, | ||
| ) -> crate::result::Result<Self::BlockRangeResponse> { | ||
| // TODO: Check if it exists |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's return an error in case we are not synced up to the requested height. But we don't need to check existance on S3, it will be too slow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've implemented this. It revealed that if we error, we aren't sending an error to the user, just handling the error in the service. We need to do both. I'll work on adding this and a test showing that it gets included.
crates/fuel-core/src/database.rs
Outdated
| let remove_heights = tx | ||
| .iter_all_keys::<Blocks>(Some(IterDirection::Reverse)) | ||
| .flatten() | ||
| .take_while(|height| height <= &block_height) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Rollback removes wrong blocks due to inverted condition
The rollback_to method for BlockAggregatorDatabase has an inverted condition when determining which blocks to remove. When iterating with IterDirection::Reverse (highest to lowest), the condition take_while(|height| height <= &block_height) will immediately stop at the first (highest) element since it's greater than the target. This means no blocks are collected or removed. The condition should be height > &block_height to collect blocks above the target height (the ones that need removal during rollback).
| bail!( | ||
| "Database is configured in S3 mode, but Local storage method was requested. If you would like to run in S3 mode, then please use a clean DB" | ||
| ); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Incorrect error messages for S3 storage mode mismatch
The error messages in the StorageMethod::S3 and StorageMethod::S3NoPublish branches are copy-pasted incorrectly. When the database is configured in Local mode but S3 storage is requested, the error message incorrectly states "Database is configured in S3 mode, but Local storage method was requested." The message should say "Database is configured in Local mode, but S3 storage method was requested." This misleads users about the actual configuration conflict.
Additional Locations (1)
| // This could be more complicated with increasing backoff times, etc. | ||
| async fn go_to_sleep_before_continuing(&self) { | ||
| tokio::time::sleep(Duration::from_millis(10)).await; | ||
| async fn get_receipts(&self, tx_ids: &[TxId]) -> Result<Vec<Receipt>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bug: Missing transaction silently returns empty vector instead of error
When get_txs encounters a transaction that is expected (it's in the block's tx_ids list) but isn't found in storage, it returns Ok(vec![]) instead of an error. This empty vector is then used to uncompress the block, resulting in a block with zero transactions even though transactions were expected. This is inconsistent with existing code in database/block.rs which properly returns a not_found!(Transactions) error in this case. The silent failure could lead to storing corrupted blocks with missing transactions without any indication of the data inconsistency.
Linked Issues/PRs
Description
Have added integration with S3 buckets for remote storage.
In order to do integration tests with the new code, we needed to use
localstack, a local version of AWS which includes S3.For all of this to work together, we've introduced a separate CI action called
RPC S3 Integration Tests (w/LocalStack). This is slightly flaky since if the env-vars aren't set, it won't fail. For this reason the tracing has been left to inspect the results of the test. It might be possible to introduce some combination of feature flags and throw an error if the env-vars aren't set.Checklist
Before requesting review