Skip to content

Commit 6c9229b

Browse files
Matthieu Vachonmaoueh
andauthored
Re-written FirehoseBlockStream using async-stream (#3078)
The implementation of the never ending Firehose stream of blocks have been re-implemented using `async-stream` library. This will makes the implementation much more readable and maintainable in the future. This should enable re-using `TriggersAdapter::triggers_in_block` directly instead of having to defer to `firehose_triggers_in_block` (subsequent PR to come). This also removes the `notify_block_consumed` callback on the `FirehoseBlockStream` and the latest active cursor to reconnect to is now saved inside the FirehoseBlockStream directly. This will make it easier to implement the `BufferedBlockStream` in the future. *Caveats* Formatting of code inside `try_stream!` macro doesn't work, unclear if it's a general limitation in macros in general or this very specific macro. Co-authored-by: Matthieu Vachon <[email protected]>
1 parent 171e430 commit 6c9229b

File tree

5 files changed

+91
-230
lines changed

5 files changed

+91
-230
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/src/subgraph/instance_manager.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -671,14 +671,6 @@ where
671671
};
672672
}
673673

674-
// Notify the BlockStream implementation that a block was succesfully consumed
675-
// and that its internal cursoring mechanism can be saved to memory.
676-
//
677-
// The first `get_mut` is to get the inner `Stream` out of `Cancelable` which
678-
// returns a `TryStreamExt::MapErr` struct and the second `get_mut` is to get
679-
// out the actual `dyn BlockStream` trait on which we can call our method.
680-
block_stream.get_mut().get_mut().notify_block_consumed();
681-
682674
if needs_restart {
683675
// Cancel the stream for real
684676
ctx.state

graph/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = "2018"
66
[dependencies]
77
anyhow = "1.0"
88
async-trait = "0.1.50"
9+
async-stream = "0.3"
910
atomic_refcell = "0.1.8"
1011
bigdecimal = { version = "0.1.0", features = ["serde"] }
1112
bytes = "1.0.1"

graph/src/blockchain/block_stream.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use crate::{prelude::*, prometheus::labels};
1111
pub trait BlockStream<C: Blockchain>:
1212
Stream<Item = Result<BlockStreamEvent<C>, Error>> + Unpin
1313
{
14-
fn notify_block_consumed(&mut self) {}
1514
}
1615

1716
pub type FirehoseCursor = Option<String>;

0 commit comments

Comments
 (0)