Skip to content

Commit 9ec56ec

Browse files
leoyvensFilippo Costa
authored andcommitted
fix: Fix panic on first startup when mixing firehose and rpc (#4680)
1 parent 7417fee commit 9ec56ec

File tree

9 files changed

+118
-101
lines changed

9 files changed

+118
-101
lines changed

graph/src/blockchain/types.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,3 +280,13 @@ pub struct ChainIdentifier {
280280
pub net_version: String,
281281
pub genesis_block_hash: BlockHash,
282282
}
283+
284+
impl fmt::Display for ChainIdentifier {
285+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
286+
write!(
287+
f,
288+
"net_version: {}, genesis_block_hash: {}",
289+
self.net_version, self.genesis_block_hash
290+
)
291+
}
292+
}

node/src/bin/manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use graph_store_postgres::{
3131
SubscriptionManager, PRIMARY_SHARD,
3232
};
3333
use lazy_static::lazy_static;
34+
use std::collections::BTreeMap;
3435
use std::{collections::HashMap, env, num::ParseIntError, sync::Arc, time::Duration};
3536
const VERSION_LABEL_KEY: &str = "version";
3637

@@ -882,7 +883,7 @@ impl Context {
882883
pools.clone(),
883884
subgraph_store,
884885
HashMap::default(),
885-
vec![],
886+
BTreeMap::new(),
886887
self.registry,
887888
);
888889

node/src/chain.rs

Lines changed: 45 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use graph::slog::{debug, error, info, o, Logger};
1414
use graph::url::Url;
1515
use graph::util::security::SafeDisplay;
1616
use graph_chain_ethereum::{self as ethereum, EthereumAdapterTrait, Transport};
17-
use std::collections::{BTreeMap, HashMap};
17+
use std::collections::{btree_map, BTreeMap};
1818
use std::sync::Arc;
1919
use std::time::Duration;
2020

@@ -220,7 +220,7 @@ pub fn create_firehose_networks(
220220
pub async fn connect_ethereum_networks(
221221
logger: &Logger,
222222
mut eth_networks: EthereumNetworks,
223-
) -> (EthereumNetworks, Vec<(String, Vec<ChainIdentifier>)>) {
223+
) -> Result<(EthereumNetworks, BTreeMap<String, ChainIdentifier>), anyhow::Error> {
224224
// This has one entry for each provider, and therefore multiple entries
225225
// for each network
226226
let statuses = join_all(
@@ -268,10 +268,10 @@ pub async fn connect_ethereum_networks(
268268
.await;
269269

270270
// Group identifiers by network name
271-
let idents: HashMap<String, Vec<ChainIdentifier>> =
271+
let idents: BTreeMap<String, ChainIdentifier> =
272272
statuses
273273
.into_iter()
274-
.fold(HashMap::new(), |mut networks, status| {
274+
.try_fold(BTreeMap::new(), |mut networks, status| {
275275
match status {
276276
ProviderNetworkStatus::Broken {
277277
chain_id: network,
@@ -280,12 +280,25 @@ pub async fn connect_ethereum_networks(
280280
ProviderNetworkStatus::Version {
281281
chain_id: network,
282282
ident,
283-
} => networks.entry(network).or_default().push(ident),
283+
} => match networks.entry(network.clone()) {
284+
btree_map::Entry::Vacant(entry) => {
285+
entry.insert(ident);
286+
}
287+
btree_map::Entry::Occupied(entry) => {
288+
if &ident != entry.get() {
289+
return Err(anyhow!(
290+
"conflicting network identifiers for chain {}: `{}` != `{}`",
291+
network,
292+
ident,
293+
entry.get()
294+
));
295+
}
296+
}
297+
},
284298
}
285-
networks
286-
});
287-
let idents: Vec<_> = idents.into_iter().collect();
288-
(eth_networks, idents)
299+
Ok(networks)
300+
})?;
301+
Ok((eth_networks, idents))
289302
}
290303

291304
/// Try to connect to all the providers in `firehose_networks` and get their net
@@ -299,7 +312,7 @@ pub async fn connect_ethereum_networks(
299312
pub async fn connect_firehose_networks<M>(
300313
logger: &Logger,
301314
mut firehose_networks: FirehoseNetworks,
302-
) -> (FirehoseNetworks, Vec<(String, Vec<ChainIdentifier>)>)
315+
) -> Result<(FirehoseNetworks, BTreeMap<String, ChainIdentifier>), Error>
303316
where
304317
M: prost::Message + BlockchainBlock + Default + 'static,
305318
{
@@ -341,6 +354,8 @@ where
341354
"genesis_block" => format_args!("{}", &ptr),
342355
);
343356

357+
// BUG: Firehose doesn't provide the net_version.
358+
// See also: firehose-no-net-version
344359
let ident = ChainIdentifier {
345360
net_version: "0".to_string(),
346361
genesis_block_hash: ptr.hash,
@@ -354,20 +369,34 @@ where
354369
.await;
355370

356371
// Group identifiers by chain id
357-
let idents: HashMap<String, Vec<ChainIdentifier>> =
372+
let idents: BTreeMap<String, ChainIdentifier> =
358373
statuses
359374
.into_iter()
360-
.fold(HashMap::new(), |mut networks, status| {
375+
.try_fold(BTreeMap::new(), |mut networks, status| {
361376
match status {
362377
ProviderNetworkStatus::Broken { chain_id, provider } => {
363378
firehose_networks.remove(&chain_id, &provider)
364379
}
365380
ProviderNetworkStatus::Version { chain_id, ident } => {
366-
networks.entry(chain_id).or_default().push(ident)
381+
match networks.entry(chain_id.clone()) {
382+
btree_map::Entry::Vacant(entry) => {
383+
entry.insert(ident);
384+
}
385+
btree_map::Entry::Occupied(entry) => {
386+
if &ident != entry.get() {
387+
return Err(anyhow!(
388+
"conflicting network identifiers for chain {}: `{}` != `{}`",
389+
chain_id,
390+
ident,
391+
entry.get()
392+
));
393+
}
394+
}
395+
}
367396
}
368397
}
369-
networks
370-
});
398+
Ok(networks)
399+
})?;
371400

372401
// Clean-up chains with 0 provider
373402
firehose_networks.networks.retain(|chain_id, endpoints| {
@@ -381,8 +410,7 @@ where
381410
endpoints.len() > 0
382411
});
383412

384-
let idents: Vec<_> = idents.into_iter().collect();
385-
(firehose_networks, idents)
413+
Ok((firehose_networks, idents))
386414
}
387415

388416
/// Parses all Ethereum connection strings and returns their network names and

node/src/main.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -305,17 +305,22 @@ async fn main() {
305305
// `blockchain_map`.
306306
let mut blockchain_map = BlockchainMap::new();
307307

308+
// Unwraps: `connect_ethereum_networks` and `connect_firehose_networks` only fail if
309+
// mismatching chain identifiers are returned for a same network, which indicates a serious
310+
// inconsistency between providers.
308311
let (arweave_networks, arweave_idents) = connect_firehose_networks::<ArweaveBlock>(
309312
&logger,
310313
firehose_networks_by_kind
311314
.remove(&BlockchainKind::Arweave)
312315
.unwrap_or_else(FirehoseNetworks::new),
313316
)
314-
.await;
317+
.await
318+
.unwrap();
315319

316320
// This only has idents for chains with rpc adapters.
317-
let (eth_networks, ethereum_idents) =
318-
connect_ethereum_networks(&logger, eth_networks).await;
321+
let (eth_networks, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks)
322+
.await
323+
.unwrap();
319324

320325
let (eth_firehose_only_networks, eth_firehose_only_idents) =
321326
connect_firehose_networks::<HeaderOnlyBlock>(
@@ -324,7 +329,8 @@ async fn main() {
324329
.remove(&BlockchainKind::Ethereum)
325330
.unwrap_or_else(FirehoseNetworks::new),
326331
)
327-
.await;
332+
.await
333+
.unwrap();
328334

329335
let (near_networks, near_idents) =
330336
connect_firehose_networks::<NearFirehoseHeaderOnlyBlock>(
@@ -333,23 +339,27 @@ async fn main() {
333339
.remove(&BlockchainKind::Near)
334340
.unwrap_or_else(FirehoseNetworks::new),
335341
)
336-
.await;
342+
.await
343+
.unwrap();
337344

338345
let (cosmos_networks, cosmos_idents) = connect_firehose_networks::<CosmosFirehoseBlock>(
339346
&logger,
340347
firehose_networks_by_kind
341348
.remove(&BlockchainKind::Cosmos)
342349
.unwrap_or_else(FirehoseNetworks::new),
343350
)
344-
.await;
345-
346-
let network_identifiers = ethereum_idents
347-
.into_iter()
348-
.chain(eth_firehose_only_idents)
349-
.chain(arweave_idents)
350-
.chain(near_idents)
351-
.chain(cosmos_idents)
352-
.collect();
351+
.await
352+
.unwrap();
353+
354+
// Note that both `eth_firehose_only_idents` and `ethereum_idents` contain Ethereum
355+
// networks. If the same network is configured in both RPC and Firehose, the RPC ident takes
356+
// precedence. This is necessary because Firehose endpoints currently have no `net_version`.
357+
// See also: firehose-no-net-version.
358+
let mut network_identifiers = eth_firehose_only_idents;
359+
network_identifiers.extend(ethereum_idents);
360+
network_identifiers.extend(arweave_idents);
361+
network_identifiers.extend(near_idents);
362+
network_identifiers.extend(cosmos_idents);
353363

354364
let network_store = store_builder.network_store(network_identifiers);
355365

node/src/manager/commands/run.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ pub async fn run(
111111

112112
let eth_adapters2 = eth_adapters.clone();
113113

114-
let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await;
114+
let (_, ethereum_idents) = connect_ethereum_networks(&logger, eth_networks).await?;
115115
// let (near_networks, near_idents) = connect_firehose_networks::<NearFirehoseHeaderOnlyBlock>(
116116
// &logger,
117117
// firehose_networks_by_kind

node/src/store_builder.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::BTreeMap;
12
use std::iter::FromIterator;
23
use std::{collections::HashMap, sync::Arc};
34

@@ -166,7 +167,7 @@ impl StoreBuilder {
166167
pools: HashMap<ShardName, ConnectionPool>,
167168
subgraph_store: Arc<SubgraphStore>,
168169
chains: HashMap<String, ShardName>,
169-
networks: Vec<(String, Vec<ChainIdentifier>)>,
170+
networks: BTreeMap<String, ChainIdentifier>,
170171
registry: Arc<MetricsRegistry>,
171172
) -> Arc<DieselStore> {
172173
let networks = networks
@@ -280,7 +281,7 @@ impl StoreBuilder {
280281

281282
/// Return a store that combines both a `Store` for subgraph data
282283
/// and a `BlockStore` for all chain related data
283-
pub fn network_store(self, networks: Vec<(String, Vec<ChainIdentifier>)>) -> Arc<DieselStore> {
284+
pub fn network_store(self, networks: BTreeMap<String, ChainIdentifier>) -> Arc<DieselStore> {
284285
Self::make_store(
285286
&self.logger,
286287
self.pools,

0 commit comments

Comments
 (0)