Skip to content

Commit 9223fad

Browse files
committed
node: Accept a list of shards in the config
1 parent 3d161bb commit 9223fad

File tree

3 files changed

+65
-18
lines changed

3 files changed

+65
-18
lines changed

node/resources/tests/full_config.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ match = { name = "^custom/.*" }
3535
indexers = [ "index_custom_0" ]
3636

3737
[[deployment.rule]]
38-
shard = "shard_a"
38+
shards = [ "primary", "shard_a" ]
3939
indexers = [ "index_node_1_a",
4040
"index_node_2_a",
4141
"index_node_3_a" ]

node/src/config.rs

Lines changed: 63 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
use graph::{
2+
anyhow::Error,
23
blockchain::{block_ingestor::CLEANUP_BLOCKS, BlockchainKind},
34
prelude::{
45
anyhow::{anyhow, bail, Context, Result},
5-
info, serde_json, Logger, NodeId,
6+
info,
7+
serde::{
8+
de::{self, value, SeqAccess, Visitor},
9+
Deserialize, Deserializer, Serialize,
10+
},
11+
serde_json, Logger, NodeId, StoreError,
612
},
713
};
814
use graph_chain_ethereum::NodeCapabilities;
915
use graph_store_postgres::{DeploymentPlacer, Shard as ShardName, PRIMARY_SHARD};
1016

1117
use http::{HeaderMap, Uri};
1218
use regex::Regex;
13-
use serde::{Deserialize, Serialize};
1419
use std::fs::read_to_string;
1520
use std::{
1621
collections::{BTreeMap, BTreeSet},
@@ -109,12 +114,10 @@ impl Config {
109114

110115
// Check that deployment rules only reference existing stores and chains
111116
for (i, rule) in self.deployment.rules.iter().enumerate() {
112-
if !self.stores.contains_key(&rule.shard) {
113-
return Err(anyhow!(
114-
"unknown shard {} in deployment rule {}",
115-
rule.shard,
116-
i
117-
));
117+
for shard in &rule.shards {
118+
if !self.stores.contains_key(shard) {
119+
return Err(anyhow!("unknown shard {} in deployment rule {}", shard, i));
120+
}
118121
}
119122
if let Some(networks) = &rule.pred.network {
120123
for network in networks.to_vec() {
@@ -809,7 +812,7 @@ impl DeploymentPlacer for Deployment {
809812
// rather than crashing the node and burying the crash in the logs
810813
let placement = match self.rules.iter().find(|rule| rule.matches(name, network)) {
811814
Some(rule) => {
812-
let shard = ShardName::new(rule.shard.clone()).map_err(|e| e.to_string())?;
815+
let shards = rule.shard_names().map_err(|e| e.to_string())?;
813816
let indexers: Vec<_> = rule
814817
.indexers
815818
.iter()
@@ -818,7 +821,7 @@ impl DeploymentPlacer for Deployment {
818821
.map_err(|()| format!("{} is not a valid node name", idx))
819822
})
820823
.collect::<Result<Vec<_>, _>>()?;
821-
Some((vec![shard], indexers))
824+
Some((shards, indexers))
822825
}
823826
None => None,
824827
};
@@ -830,8 +833,13 @@ impl DeploymentPlacer for Deployment {
830833
struct Rule {
831834
#[serde(rename = "match", default)]
832835
pred: Predicate,
833-
#[serde(default = "primary_store")]
834-
shard: String,
836+
// For backwards compatibility, we also accept 'shard' for the shards
837+
#[serde(
838+
alias = "shard",
839+
default = "primary_store",
840+
deserialize_with = "string_or_vec"
841+
)]
842+
shards: Vec<String>,
835843
indexers: Vec<String>,
836844
}
837845

@@ -844,15 +852,22 @@ impl Rule {
844852
self.pred.matches(name, network)
845853
}
846854

855+
fn shard_names(&self) -> Result<Vec<ShardName>, StoreError> {
856+
self.shards
857+
.iter()
858+
.cloned()
859+
.map(ShardName::new)
860+
.collect::<Result<_, _>>()
861+
}
862+
847863
fn validate(&self) -> Result<()> {
848864
if self.indexers.is_empty() {
849865
return Err(anyhow!("useless rule without indexers"));
850866
}
851867
for indexer in &self.indexers {
852868
NodeId::new(indexer).map_err(|()| anyhow!("invalid node id {}", &indexer))?;
853869
}
854-
ShardName::new(self.shard.clone())
855-
.map_err(|e| anyhow!("illegal name for store shard `{}`: {}", &self.shard, e))?;
870+
self.shard_names().map_err(Error::from)?;
856871
Ok(())
857872
}
858873
}
@@ -943,14 +958,46 @@ fn no_name() -> Regex {
943958
Regex::new(NO_NAME).unwrap()
944959
}
945960

946-
fn primary_store() -> String {
947-
PRIMARY_SHARD.to_string()
961+
fn primary_store() -> Vec<String> {
962+
vec![PRIMARY_SHARD.to_string()]
948963
}
949964

950965
fn one() -> usize {
951966
1
952967
}
953968

969+
// From https://github.com/serde-rs/serde/issues/889#issuecomment-295988865
970+
fn string_or_vec<'de, D>(deserializer: D) -> Result<Vec<String>, D::Error>
971+
where
972+
D: Deserializer<'de>,
973+
{
974+
struct StringOrVec;
975+
976+
impl<'de> Visitor<'de> for StringOrVec {
977+
type Value = Vec<String>;
978+
979+
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
980+
formatter.write_str("string or list of strings")
981+
}
982+
983+
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
984+
where
985+
E: de::Error,
986+
{
987+
Ok(vec![s.to_owned()])
988+
}
989+
990+
fn visit_seq<S>(self, seq: S) -> Result<Self::Value, S::Error>
991+
where
992+
S: SeqAccess<'de>,
993+
{
994+
Deserialize::deserialize(value::SeqAccessDeserializer::new(seq))
995+
}
996+
}
997+
998+
deserializer.deserialize_any(StringOrVec)
999+
}
1000+
9541001
#[cfg(test)]
9551002
mod tests {
9561003

store/postgres/src/subgraph_store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl Shard {
8181
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || c == '_')
8282
{
8383
return Err(StoreError::InvalidIdentifier(format!(
84-
"shard names must only contain lowercase alphanumeric characters or '_'"
84+
"shard name `{}` is invalid: shard names must only contain lowercase alphanumeric characters or '_'", name
8585
)));
8686
}
8787
Ok(Shard(name))

0 commit comments

Comments
 (0)