Skip to content

Commit 60d7467

Browse files
author
Matthieu Vachon
authored
Renamed of the Firehose service gRPC ids and packages (#3054)
This will require a coordinate update for NEAR when Firehose service ids will change there, `graph-node` with this version will need to be deployed.
1 parent 9d24270 commit 60d7467

File tree

19 files changed

+387
-723
lines changed

19 files changed

+387
-723
lines changed

chain/ethereum/examples/firehose.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,8 @@
11
use anyhow::Error;
22
use graph::{
3-
firehose::{
4-
bstream::BlockResponseV2, bstream::BlocksRequestV2, bstream::ForkStep,
5-
endpoints::FirehoseEndpoint,
6-
},
73
log::logger,
84
prelude::{prost, tokio, tonic},
5+
{firehose, firehose::FirehoseEndpoint, firehose::ForkStep},
96
};
107
use graph_chain_ethereum::codec;
118
use prost::Message;
@@ -23,9 +20,9 @@ async fn main() -> Result<(), Error> {
2320

2421
loop {
2522
println!("connecting to the stream!");
26-
let mut stream: Streaming<BlockResponseV2> = match firehose
23+
let mut stream: Streaming<firehose::Response> = match firehose
2724
.clone()
28-
.stream_blocks(BlocksRequestV2 {
25+
.stream_blocks(firehose::Request {
2926
start_block_num: 7000000,
3027
start_cursor: match &cursor {
3128
Some(c) => c.clone(),

chain/ethereum/proto/codec.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
syntax = "proto3";
22

3-
package dfuse.ethereum.codec.v1;
3+
package sf.ethereum.codec.v1;
4+
5+
option go_package = "github.com/streamingfast/sf-ethereum/pb/sf/ethereum/codec/v1;pbcodec";
46

57
import "google/protobuf/timestamp.proto";
68

chain/ethereum/src/chain.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use anyhow::{Context, Error};
22
use graph::blockchain::BlockchainKind;
33
use graph::data::subgraph::UnifiedMappingApiVersion;
4-
use graph::firehose::endpoints::FirehoseNetworkEndpoints;
4+
use graph::firehose::FirehoseNetworkEndpoints;
55
use graph::prelude::{
66
EthereumBlock, EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, StopwatchMetrics,
77
};
@@ -19,7 +19,7 @@ use graph::{
1919
},
2020
cheap_clone::CheapClone,
2121
components::store::DeploymentLocator,
22-
firehose::bstream,
22+
firehose,
2323
log::factory::{ComponentLoggerConfig, ElasticComponentLoggerConfig},
2424
prelude::{
2525
async_trait, error, lazy_static, o, serde_json as json, web3::types::H256, BlockNumber,
@@ -562,11 +562,13 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
562562
async fn to_block_stream_event(
563563
&self,
564564
logger: &Logger,
565-
response: &bstream::BlockResponseV2,
565+
response: &firehose::Response,
566566
adapter: &TriggersAdapter,
567567
filter: &TriggerFilter,
568568
) -> Result<BlockStreamEvent<Chain>, FirehoseError> {
569-
let step = bstream::ForkStep::from_i32(response.step).unwrap_or_else(|| {
569+
use firehose::ForkStep;
570+
571+
let step = ForkStep::from_i32(response.step).unwrap_or_else(|| {
570572
panic!(
571573
"unknown step i32 value {}, maybe you forgot update & re-regenerate the protobuf definitions?",
572574
response.step
@@ -579,15 +581,16 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
579581

580582
// Right now, this is done in all cases but in reality, with how the BlockStreamEvent::Revert
581583
// is defined right now, only block hash and block number is necessary. However, this information
582-
// is not part of the actual bstream::BlockResponseV2 payload. As such, we need to decode the full
584+
// is not part of the actual firehose::Response payload. As such, we need to decode the full
583585
// block which is useless.
584586
//
585-
// Check about adding basic information about the block in the bstream::BlockResponseV2 or maybe
587+
// Check about adding basic information about the block in the firehose::Response or maybe
586588
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
587589
let block = codec::Block::decode(any_block.value.as_ref())?;
588590

591+
use firehose::ForkStep::*;
589592
match step {
590-
bstream::ForkStep::StepNew => {
593+
StepNew => {
591594
let ethereum_block: EthereumBlockWithCalls = (&block).into();
592595
let block_with_triggers = adapter
593596
.triggers_in_block(logger, BlockFinality::NonFinal(ethereum_block), filter)
@@ -599,7 +602,7 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
599602
))
600603
}
601604

602-
bstream::ForkStep::StepUndo => Ok(BlockStreamEvent::Revert(
605+
StepUndo => Ok(BlockStreamEvent::Revert(
603606
BlockPtr {
604607
hash: BlockHash::from(block.hash),
605608
number: block.number as i32,
@@ -611,11 +614,11 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
611614
}),
612615
)),
613616

614-
bstream::ForkStep::StepIrreversible => {
617+
StepIrreversible => {
615618
unreachable!("irreversible step is not handled and should not be requested in the Firehose request")
616619
}
617620

618-
bstream::ForkStep::StepUnknown => {
621+
StepUnknown => {
619622
unreachable!("unknown step should not happen in the Firehose response")
620623
}
621624
}

chain/ethereum/src/codec.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#[path = "protobuf/dfuse.ethereum.codec.v1.rs"]
1+
#[path = "protobuf/sf.ethereum.codec.v1.rs"]
22
mod pbcodec;
33

44
use graph::{

chain/near/src/chain.rs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use graph::blockchain::BlockchainKind;
22
use graph::cheap_clone::CheapClone;
33
use graph::data::subgraph::UnifiedMappingApiVersion;
4-
use graph::firehose::endpoints::FirehoseNetworkEndpoints;
4+
use graph::firehose::FirehoseNetworkEndpoints;
55
use graph::prelude::StopwatchMetrics;
66
use graph::{
77
anyhow,
@@ -14,7 +14,7 @@ use graph::{
1414
BlockHash, BlockPtr, Blockchain, IngestorAdapter as IngestorAdapterTrait, IngestorError,
1515
},
1616
components::store::DeploymentLocator,
17-
firehose::bstream,
17+
firehose::{self as firehose, ForkStep},
1818
log::factory::{ComponentLoggerConfig, ElasticComponentLoggerConfig},
1919
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
2020
};
@@ -288,11 +288,11 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
288288
async fn to_block_stream_event(
289289
&self,
290290
logger: &Logger,
291-
response: &bstream::BlockResponseV2,
291+
response: &firehose::Response,
292292
adapter: &TriggersAdapter,
293293
filter: &TriggerFilter,
294294
) -> Result<BlockStreamEvent<Chain>, FirehoseError> {
295-
let step = bstream::ForkStep::from_i32(response.step).unwrap_or_else(|| {
295+
let step = ForkStep::from_i32(response.step).unwrap_or_else(|| {
296296
panic!(
297297
"unknown step i32 value {}, maybe you forgot update & re-regenerate the protobuf definitions?",
298298
response.step
@@ -312,13 +312,14 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
312312
// define a slimmed down stuct that would decode only a few fields and ignore all the rest.
313313
let block = codec::Block::decode(any_block.value.as_ref())?;
314314

315+
use ForkStep::*;
315316
match step {
316-
bstream::ForkStep::StepNew => Ok(BlockStreamEvent::ProcessBlock(
317+
StepNew => Ok(BlockStreamEvent::ProcessBlock(
317318
adapter.triggers_in_block(logger, block, filter).await?,
318319
Some(response.cursor.clone()),
319320
)),
320321

321-
bstream::ForkStep::StepUndo => {
322+
StepUndo => {
322323
let header = block.header();
323324

324325
Ok(BlockStreamEvent::Revert(
@@ -331,11 +332,11 @@ impl FirehoseMapperTrait<Chain> for FirehoseMapper {
331332
))
332333
}
333334

334-
bstream::ForkStep::StepIrreversible => {
335+
StepIrreversible => {
335336
panic!("irreversible step is not handled and should not be requested in the Firehose request")
336337
}
337338

338-
bstream::ForkStep::StepUnknown => {
339+
StepUnknown => {
339340
panic!("unknown step should not happen in the Firehose response")
340341
}
341342
}

graph/build.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ fn main() {
33
tonic_build::configure()
44
.out_dir("src/firehose")
55
.format(true)
6-
.compile(&["proto/bstream.proto"], &["proto"])
6+
.compile(&["proto/firehose.proto"], &["proto"])
77
.expect("Failed to compile Firehose proto(s)");
88
}
Lines changed: 26 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -1,51 +1,17 @@
11
syntax = "proto3";
22

3-
package dfuse.bstream.v1;
3+
package sf.firehose.v1;
44

5-
import "google/protobuf/timestamp.proto";
65
import "google/protobuf/any.proto";
76

8-
option go_package = "github.com/streamingfast/pbgo/dfuse/bstream/v1;pbbstream";
7+
option go_package = "github.com/streamingfast/pbgo/sf/firehose/v1;pbfirehose";
98

10-
service BlockStream {
11-
rpc Blocks(BlockRequest) returns (stream Block);
12-
}
13-
14-
service BlockStreamV2 {
15-
rpc Blocks(BlocksRequestV2) returns (stream BlockResponseV2);
16-
}
17-
18-
// Version 1 request
19-
20-
message BlockRequest {
21-
// Number of blocks we want to get in burst upon connection, on a best effort basis.
22-
int64 burst = 1;
23-
24-
// Type of blocks we're after here, is it 'ethereum' data, 'eos', etc.. The server can fail early
25-
// if he doesn't match the data he serves (services mismatch, etc..)
26-
string content_type = 2;
27-
28-
// Whether we can assume the data will come ordered, unless there is a chain reorganization.
29-
// mindreaders output ordered data, whereas relayers can output unordered data.
30-
// The server can fail early if the assumption of the caller cannot be fulfilled.
31-
enum Order {
32-
UNSPECIFIED = 0;
33-
ORDERED = 1;
34-
UNORDERED = 2;
35-
}
36-
Order order = 3;
37-
38-
string requester = 4;
39-
}
40-
41-
// Version 2 request
42-
43-
message IrreversibleBlocksRequestV2 {
44-
int64 start_block_num = 1;
9+
service Stream {
10+
rpc Blocks(Request) returns (stream Response);
4511
}
4612

4713
// For historical segments, forks are not passed
48-
message BlocksRequestV2 {
14+
message Request {
4915
// Controls where the stream of blocks will start.
5016
//
5117
// The stream will start **inclusively** at the requested block num.
@@ -66,7 +32,7 @@ message BlocksRequestV2 {
6632
// Controls where the stream of blocks will start which will be immediately after
6733
// the Block pointed by this opaque cursor.
6834
//
69-
// Obtain this value from a previously received BlockResponseV2.cursor.
35+
// Obtain this value from a previously received from `Response.cursor`.
7036
//
7137
// This value takes precedence over `start_block_num`.
7238
string start_cursor = 13;
@@ -92,17 +58,30 @@ message BlocksRequestV2 {
9258

9359
// **Warning** Experimental API, controls how blocks are trimmed for extraneous information before
9460
// being sent back. The actual trimming is chain dependent.
95-
BlockDetails details = 15;
61+
//BlockDetails details = 15;
62+
reserved 15;
9663

9764
// controls how many confirmations will consider a given block as final (STEP_IRREVERSIBLE). Warning, if any reorg goes beyond that number of confirmations, the request will stall forever
98-
uint64 confirmations = 16;
65+
//uint64 confirmations = 16;
66+
reserved 16;
67+
9968

69+
//- EOS "handoffs:3"
70+
//- EOS "lib"
71+
//- EOS "confirms:3"
72+
//- ETH "confirms:200"
73+
//- ETH "confirms:7"
74+
//- SOL "commmitement:finalized"
75+
//- SOL "confirms:200"
76+
string irreversibility_condition = 17;
77+
78+
repeated google.protobuf.Any transforms = 18;
10079
}
10180

102-
message BlockResponseV2 {
81+
message Response {
10382
// Chain specific block payload, one of:
104-
// - dfuse.eosio.codec.v1.Block
105-
// - dfuse.ethereum.codec.v1.Block
83+
// - sf.eosio.codec.v1.Block
84+
// - sf.ethereum.codec.v1.Block
10685
// - sf.near.codec.v1.Block
10786
// - sf.solana.codec.v1.Block
10887
google.protobuf.Any block = 1;
@@ -124,41 +103,8 @@ enum ForkStep {
124103
reserved 5 ;
125104
}
126105

106+
// TODO: move to ethereum specific transforms
127107
enum BlockDetails {
128108
BLOCK_DETAILS_FULL = 0;
129109
BLOCK_DETAILS_LIGHT = 1;
130-
}
131-
132-
message Cursor {
133-
BlockRef block = 1;
134-
BlockRef head_block = 2;
135-
BlockRef lib = 3;
136-
ForkStep step = 4;
137-
}
138-
139-
// General response and structs
140-
141-
message Block {
142-
uint64 number = 1;
143-
string id = 2;
144-
string previous_id = 3;
145-
google.protobuf.Timestamp timestamp = 4;
146-
uint64 lib_num = 5;
147-
148-
Protocol payload_kind = 6;
149-
int32 payload_version = 7;
150-
bytes payload_buffer = 8;
151-
}
152-
153-
message BlockRef {
154-
uint64 num = 1;
155-
string id = 2;
156-
}
157-
158-
enum Protocol {
159-
UNKNOWN = 0;
160-
EOS = 1;
161-
ETH = 2;
162-
SOLANA = 3;
163-
NEAR = 4;
164-
}
110+
}

graph/src/blockchain/block_stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use thiserror::Error;
55

66
use super::{Block, BlockPtr, Blockchain};
77
use crate::components::store::BlockNumber;
8-
use crate::firehose::bstream;
8+
use crate::firehose;
99
use crate::{prelude::*, prometheus::labels};
1010

1111
pub trait BlockStream<C: Blockchain>:
@@ -82,7 +82,7 @@ pub trait FirehoseMapper<C: Blockchain>: Send + Sync {
8282
async fn to_block_stream_event(
8383
&self,
8484
logger: &Logger,
85-
response: &bstream::BlockResponseV2,
85+
response: &firehose::Response,
8686
adapter: &C::TriggersAdapter,
8787
filter: &C::TriggerFilter,
8888
) -> Result<BlockStreamEvent<C>, FirehoseError>;

0 commit comments

Comments
 (0)