Skip to content

Commit 3d161bb

Browse files
committed
node, store: Allow passing a list of shards when placing deployments
1 parent 45a29c2 commit 3d161bb

File tree

4 files changed

+40
-15
lines changed

4 files changed

+40
-15
lines changed

node/src/config.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -798,7 +798,11 @@ impl Deployment {
798798
}
799799

800800
impl DeploymentPlacer for Deployment {
801-
fn place(&self, name: &str, network: &str) -> Result<Option<(ShardName, Vec<NodeId>)>, String> {
801+
fn place(
802+
&self,
803+
name: &str,
804+
network: &str,
805+
) -> Result<Option<(Vec<ShardName>, Vec<NodeId>)>, String> {
802806
// Errors here are really programming errors. We should have validated
803807
// everything already so that the various conversions can't fail. We
804808
// still return errors so that they bubble up to the deployment request
@@ -814,7 +818,7 @@ impl DeploymentPlacer for Deployment {
814818
.map_err(|()| format!("{} is not a valid node name", idx))
815819
})
816820
.collect::<Result<Vec<_>, _>>()?;
817-
Some((shard, indexers))
821+
Some((vec![shard], indexers))
818822
}
819823
None => None,
820824
};

node/src/manager/commands/config.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@ pub fn place(placer: &dyn DeploymentPlacer, name: &str, network: &str) -> Result
1515
"no matching placement rule; default placement from JSON RPC call would be used"
1616
);
1717
}
18-
Some((shard, nodes)) => {
18+
Some((shards, nodes)) => {
1919
let nodes: Vec<_> = nodes.into_iter().map(|n| n.to_string()).collect();
20+
let shards: Vec<_> = shards.into_iter().map(|s| s.to_string()).collect();
2021
println!("subgraph: {}", name);
2122
println!("network: {}", network);
22-
println!("shard: {}", shard);
23+
println!("shard: {}", shards.join(", "));
2324
println!("nodes: {}", nodes.join(", "));
2425
}
2526
}

store/postgres/src/subgraph_store.rs

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ impl ToSql<Text, Pg> for Shard {
117117
/// indexers that should index it. The deployment should then be assigned to
118118
/// one of the returned indexers.
119119
pub trait DeploymentPlacer {
120-
fn place(&self, name: &str, network: &str) -> Result<Option<(Shard, Vec<NodeId>)>, String>;
120+
fn place(&self, name: &str, network: &str)
121+
-> Result<Option<(Vec<Shard>, Vec<NodeId>)>, String>;
121122
}
122123

123124
/// Tools for managing unused deployments
@@ -382,6 +383,31 @@ impl SubgraphStoreInner {
382383
store.find_layout(site)
383384
}
384385

386+
fn place_on_node(
387+
&self,
388+
mut nodes: Vec<NodeId>,
389+
default_node: NodeId,
390+
) -> Result<NodeId, StoreError> {
391+
match nodes.len() {
392+
0 => {
393+
// This is really a configuration error
394+
Ok(default_node)
395+
}
396+
1 => Ok(nodes.pop().unwrap()),
397+
_ => {
398+
let conn = self.primary_conn()?;
399+
400+
// unwrap is fine since nodes is not empty
401+
let node = conn.least_assigned_node(&nodes)?.unwrap();
402+
Ok(node)
403+
}
404+
}
405+
}
406+
407+
fn place_in_shard(&self, mut shards: Vec<Shard>) -> Result<Shard, StoreError> {
408+
Ok(shards.pop().unwrap())
409+
}
410+
385411
fn place(
386412
&self,
387413
name: &SubgraphName,
@@ -402,16 +428,10 @@ impl SubgraphStoreInner {
402428

403429
match placement {
404430
None => Ok((PRIMARY_SHARD.clone(), default_node)),
405-
Some((_, nodes)) if nodes.is_empty() => {
406-
// This is really a configuration error
407-
Ok((PRIMARY_SHARD.clone(), default_node))
408-
}
409-
Some((shard, mut nodes)) if nodes.len() == 1 => Ok((shard, nodes.pop().unwrap())),
410-
Some((shard, nodes)) => {
411-
let conn = self.primary_conn()?;
431+
Some((shards, nodes)) => {
432+
let node = self.place_on_node(nodes, default_node)?;
433+
let shard = self.place_in_shard(shards)?;
412434

413-
// unwrap is fine since nodes is not empty
414-
let node = conn.least_assigned_node(&nodes)?.unwrap();
415435
Ok((shard, node))
416436
}
417437
}

store/test-store/src/store.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ pub fn remove_subgraphs() {
142142
.expect("deleting test entities succeeds");
143143
}
144144

145-
pub fn place(name: &str) -> Result<Option<(Shard, Vec<NodeId>)>, String> {
145+
pub fn place(name: &str) -> Result<Option<(Vec<Shard>, Vec<NodeId>)>, String> {
146146
CONFIG.deployment.place(name, NETWORK_NAME)
147147
}
148148

0 commit comments

Comments
 (0)