Skip to content

Commit cc366fe

Browse files
committed
store: Place new deployment on least used eligible shard
1 parent 9223fad commit cc366fe

File tree

2 files changed

+45
-1
lines changed

2 files changed

+45
-1
lines changed

store/postgres/src/primary.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,6 +1241,40 @@ impl<'a> Connection<'a> {
12411241
})
12421242
}
12431243

1244+
/// Return the shard that has the fewest deployments out of the given
1245+
/// `shards`. If `shards` is empty, return `None`
1246+
///
1247+
/// Usage of a shard is taken to be the number of assigned deployments
1248+
/// that are stored in it. Unassigned deployments are ignored; in
1249+
/// particular, that ignores deployments that are going to be removed
1250+
/// soon.
1251+
pub fn least_used_shard(&self, shards: &[Shard]) -> Result<Option<Shard>, StoreError> {
1252+
use deployment_schemas as ds;
1253+
use subgraph_deployment_assignment as a;
1254+
1255+
let used = ds::table
1256+
.inner_join(a::table.on(a::id.eq(ds::id)))
1257+
.filter(ds::shard.eq(any(shards)))
1258+
.select((ds::shard, sql("count(*)")))
1259+
.group_by(ds::shard)
1260+
.order_by(sql::<i64>("count(*)"))
1261+
.load::<(String, i64)>(self.conn.as_ref())?;
1262+
1263+
let missing = shards
1264+
.into_iter()
1265+
.filter(|shard| !used.iter().any(|(s, _)| s == shard.as_str()))
1266+
.map(|shard| (shard.as_str(), 0));
1267+
1268+
used.iter()
1269+
.map(|(shard, count)| (shard.as_str(), *count))
1270+
.chain(missing)
1271+
.min_by(|(_, a), (_, b)| a.cmp(b))
1272+
.map(|(shard, _)| Shard::new(shard.to_string()))
1273+
.transpose()
1274+
// This can't really happen since we filtered by valid shards
1275+
.map_err(|e| constraint_violation!("database has illegal shard name: {}", e))
1276+
}
1277+
12441278
#[cfg(debug_assertions)]
12451279
pub fn versions_for_subgraph(
12461280
&self,

store/postgres/src/subgraph_store.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -405,7 +405,17 @@ impl SubgraphStoreInner {
405405
}
406406

407407
fn place_in_shard(&self, mut shards: Vec<Shard>) -> Result<Shard, StoreError> {
408-
Ok(shards.pop().unwrap())
408+
match shards.len() {
409+
0 => Ok(PRIMARY_SHARD.clone()),
410+
1 => Ok(shards.pop().unwrap()),
411+
_ => {
412+
let conn = self.primary_conn()?;
413+
414+
// unwrap is fine since shards is not empty
415+
let shard = conn.least_used_shard(&shards)?.unwrap();
416+
Ok(shard)
417+
}
418+
}
409419
}
410420

411421
fn place(

0 commit comments

Comments
 (0)