Skip to content

Commit 8ddb732

Browse files
authored
node/bin/graphman: New command index create
Adds a new subcommand tree `index / create` for graphman. Closes #3151
1 parent c20a630 commit 8ddb732

File tree

6 files changed

+191
-2
lines changed

6 files changed

+191
-2
lines changed

node/src/bin/manager.rs

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ pub enum Command {
179179
Chain(ChainCommand),
180180
/// Manipulate internal subgraph statistics
181181
Stats(StatsCommand),
182+
183+
/// Manage database indexes
184+
Index(IndexCommand),
182185
}
183186

184187
impl Command {
@@ -353,8 +356,37 @@ pub enum StatsCommand {
353356
Analyze {
354357
/// The id of the deployment
355358
id: String,
356-
/// The name of the Entity to ANALYZE
359+
/// The name of the Entity to ANALYZE, in camel case
360+
entity: String,
361+
},
362+
}
363+
364+
#[derive(Clone, Debug, StructOpt)]
365+
pub enum IndexCommand {
366+
/// Creates a new database index.
367+
///
368+
/// The new index will be created concurrenly for the provided entity and its fields. whose
369+
/// names must be declared the in camel case, following GraphQL conventions.
370+
///
371+
/// The index will have its validity checked after the operation and will be dropped if it is
372+
/// invalid.
373+
///
374+
/// This command may be time-consuming.
375+
Create {
376+
/// The id of the deployment
377+
id: String,
378+
/// The Entity name, in camel case.
379+
#[structopt(empty_values = false)]
357380
entity: String,
381+
/// The Field names, in camel case.
382+
#[structopt(min_values = 1, required = true)]
383+
fields: Vec<String>,
384+
/// The index method. Defaults to `btree`.
385+
#[structopt(
386+
short, long, default_value = "btree",
387+
possible_values = &["btree", "hash", "gist", "spgist", "gin", "brin"]
388+
)]
389+
method: String,
358390
},
359391
}
360392

@@ -709,6 +741,21 @@ async fn main() {
709741
}
710742
}
711743
}
744+
Index(cmd) => {
745+
use IndexCommand::*;
746+
match cmd {
747+
Create {
748+
id,
749+
entity,
750+
fields,
751+
method,
752+
} => {
753+
let store = ctx.store();
754+
let subgraph_store = store.subgraph_store();
755+
commands::index::create(subgraph_store, id, entity, fields, method).await
756+
}
757+
}
758+
}
712759
};
713760
if let Err(e) = result {
714761
die!("error: {}", e)

node/src/manager/commands/index.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use graph::{
2+
components::store::EntityType,
3+
prelude::{anyhow, DeploymentHash, StoreError},
4+
};
5+
use graph_store_postgres::SubgraphStore;
6+
use std::{collections::HashSet, sync::Arc};
7+
8+
fn validate_fields<T: AsRef<str>>(fields: &[T]) -> Result<(), anyhow::Error> {
9+
// Must be non-empty. Double checking, since [`StructOpt`] already checks this.
10+
if fields.is_empty() {
11+
anyhow::bail!("at least one field must be informed")
12+
}
13+
// All values must be unique
14+
let unique: HashSet<_> = fields.iter().map(AsRef::as_ref).collect();
15+
if fields.len() != unique.len() {
16+
anyhow::bail!("entity fields must be unique")
17+
}
18+
Ok(())
19+
}
20+
pub async fn create(
21+
store: Arc<SubgraphStore>,
22+
id: String,
23+
entity_name: String,
24+
field_names: Vec<String>,
25+
index_method: String,
26+
) -> Result<(), anyhow::Error> {
27+
validate_fields(&field_names)?;
28+
let deployment_hash = DeploymentHash::new(id)
29+
.map_err(|e| anyhow::anyhow!("Subgraph hash must be a valid IPFS hash: {}", e))?;
30+
let entity_type = EntityType::new(entity_name);
31+
println!("Index creation started. Please wait.");
32+
match store
33+
.create_manual_index(&deployment_hash, entity_type, field_names, index_method)
34+
.await
35+
{
36+
Ok(()) => Ok(()),
37+
Err(StoreError::Canceled) => {
38+
eprintln!("Index creation attempt faield. Please retry.");
39+
::std::process::exit(1);
40+
}
41+
Err(other) => Err(anyhow::anyhow!(other)),
42+
}
43+
}

node/src/manager/commands/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod chain;
33
pub mod config;
44
pub mod copy;
55
pub mod create;
6+
pub mod index;
67
pub mod info;
78
pub mod listen;
89
pub mod query;

store/postgres/src/catalog.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use diesel::sql_types::Integer;
1+
use diesel::sql_types::{Bool, Integer};
22
use diesel::{connection::SimpleConnection, prelude::RunQueryDsl, select};
33
use diesel::{insert_into, OptionalExtension};
44
use diesel::{pg::PgConnection, sql_query};
@@ -382,3 +382,35 @@ pub fn create_foreign_table(
382382
})?;
383383
Ok(query)
384384
}
385+
386+
/// Checks in the database if a given index is valid.
387+
pub(crate) fn check_index_is_valid(
388+
conn: &PgConnection,
389+
schema_name: &str,
390+
index_name: &str,
391+
) -> Result<bool, StoreError> {
392+
#[derive(Queryable, QueryableByName)]
393+
struct ManualIndexCheck {
394+
#[sql_type = "Bool"]
395+
is_valid: bool,
396+
}
397+
398+
let query = "
399+
select
400+
i.indisvalid as is_valid
401+
from
402+
pg_class c
403+
join pg_index i on i.indexrelid = c.oid
404+
join pg_namespace n on c.relnamespace = n.oid
405+
where
406+
n.nspname = $1
407+
and c.relname = $2";
408+
let result = sql_query(query)
409+
.bind::<Text, _>(schema_name)
410+
.bind::<Text, _>(index_name)
411+
.get_result::<ManualIndexCheck>(conn)
412+
.optional()
413+
.map_err::<StoreError, _>(Into::into)?
414+
.map(|check| check.is_valid);
415+
Ok(matches!(result, Some(true)))
416+
}

store/postgres/src/deployment_store.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -693,6 +693,59 @@ impl DeploymentStore {
693693
})
694694
.await
695695
}
696+
697+
/// Creates a new index in the specified Entity table if it doesn't already exist.
698+
///
699+
/// This is a potentially time-consuming operation.
700+
pub(crate) async fn create_manual_index(
701+
&self,
702+
site: Arc<Site>,
703+
entity_type: EntityType,
704+
field_names: Vec<String>,
705+
index_method: String,
706+
) -> Result<(), StoreError> {
707+
let store = self.clone();
708+
709+
self.with_conn(move |conn, _| {
710+
let schema_name = site.namespace.clone();
711+
let layout = store.layout(conn, site)?;
712+
let table = layout.table_for_entity(&entity_type)?;
713+
let table_name = &table.name;
714+
715+
// resolve column names
716+
let column_names = field_names
717+
.iter()
718+
.map(|f| table.column_for_field(f).map(|column| column.name.as_str()))
719+
.collect::<Result<Vec<_>, _>>()?;
720+
721+
let column_names_sep_by_underscores = column_names.join("_");
722+
let column_names_sep_by_commas = column_names.join(", ");
723+
let index_name = format!("manual_{table_name}_{column_names_sep_by_underscores}");
724+
725+
let sql = format!(
726+
"create index concurrently if not exists {index_name} \
727+
on {schema_name}.{table_name} using {index_method} \
728+
({column_names_sep_by_commas})"
729+
);
730+
// This might take a long time.
731+
conn.execute(&sql)?;
732+
733+
// check if the index creation was successfull
734+
let index_is_valid =
735+
catalog::check_index_is_valid(conn, schema_name.as_str(), &index_name)?;
736+
if index_is_valid {
737+
Ok(())
738+
} else {
739+
// Index creation falied. We should drop the index before returning.
740+
let drop_index_sql =
741+
format!("drop index concurrently if exists {schema_name}.{index_name}");
742+
conn.execute(&drop_index_sql)?;
743+
Err(StoreError::Canceled)
744+
}
745+
.map_err(Into::into)
746+
})
747+
.await
748+
}
696749
}
697750

698751
/// Methods that back the trait `graph::components::Store`, but have small

store/postgres/src/subgraph_store.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,19 @@ impl SubgraphStoreInner {
956956
let (store, site) = self.store(&id)?;
957957
store.analyze(site, entity_type).await
958958
}
959+
960+
pub async fn create_manual_index(
961+
&self,
962+
id: &DeploymentHash,
963+
entity_type: EntityType,
964+
field_names: Vec<String>,
965+
index_method: String,
966+
) -> Result<(), StoreError> {
967+
let (store, site) = self.store(&id)?;
968+
store
969+
.create_manual_index(site, entity_type, field_names, index_method)
970+
.await
971+
}
959972
}
960973

961974
struct EnsLookup {

0 commit comments

Comments
 (0)