diff --git a/core/graphman/src/commands/deployment/mod.rs b/core/graphman/src/commands/deployment/mod.rs index 9c695a5f74a..4cac2277bbe 100644 --- a/core/graphman/src/commands/deployment/mod.rs +++ b/core/graphman/src/commands/deployment/mod.rs @@ -1,3 +1,5 @@ pub mod info; pub mod pause; +pub mod reassign; pub mod resume; +pub mod unassign; diff --git a/core/graphman/src/commands/deployment/reassign.rs b/core/graphman/src/commands/deployment/reassign.rs new file mode 100644 index 00000000000..2e5916a7aae --- /dev/null +++ b/core/graphman/src/commands/deployment/reassign.rs @@ -0,0 +1,117 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use graph::components::store::DeploymentLocator; +use graph::components::store::StoreEvent; +use graph::prelude::EntityChange; +use graph::prelude::NodeId; +use graph_store_postgres::command_support::catalog; +use graph_store_postgres::command_support::catalog::Site; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use thiserror::Error; + +use crate::deployment::DeploymentSelector; +use crate::deployment::DeploymentVersionSelector; +use crate::GraphmanError; + +pub struct Deployment { + locator: DeploymentLocator, + site: Site, +} + +impl Deployment { + pub fn locator(&self) -> &DeploymentLocator { + &self.locator + } +} + +#[derive(Debug, Error)] +pub enum ReassignDeploymentError { + #[error("deployment '{0}' is already assigned to '{1}'")] + AlreadyAssigned(String, String), + + #[error(transparent)] + Common(#[from] GraphmanError), +} + +#[derive(Clone, Debug)] +pub enum ReassignResult { + EmptyResponse, + CompletedWithWarnings(Vec), +} + +pub fn load_deployment( + primary_pool: ConnectionPool, + deployment: &DeploymentSelector, +) -> Result { + let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + + let locator = crate::deployment::load_deployment_locator( + &mut primary_conn, + deployment, + &DeploymentVersionSelector::All, + )?; + + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let site = catalog_conn + .locate_site(locator.clone()) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("deployment site not found for '{locator}'")) + })?; + + Ok(Deployment { locator, site }) +} + +pub fn reassign_deployment( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: &Deployment, + node: &NodeId, +) -> Result { + let primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let changes: Vec = match catalog_conn + .assigned_node(&deployment.site) + .map_err(GraphmanError::from)? + { + Some(curr) => { + if &curr == node { + vec![] + } else { + catalog_conn + .reassign_subgraph(&deployment.site, &node) + .map_err(GraphmanError::from)? + } + } + None => catalog_conn + .assign_subgraph(&deployment.site, &node) + .map_err(GraphmanError::from)?, + }; + + if changes.is_empty() { + return Err(ReassignDeploymentError::AlreadyAssigned( + deployment.locator.to_string(), + node.to_string(), + )); + } + + catalog_conn + .send_store_event(¬ification_sender, &StoreEvent::new(changes)) + .map_err(GraphmanError::from)?; + + let mirror = catalog::Mirror::primary_only(primary_pool); + let count = mirror + .assignments(&node) + .map_err(GraphmanError::from)? + .len(); + if count == 1 { + let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str()); + Ok(ReassignResult::CompletedWithWarnings(vec![warning_msg])) + } else { + Ok(ReassignResult::EmptyResponse) + } +} diff --git a/core/graphman/src/commands/deployment/unassign.rs b/core/graphman/src/commands/deployment/unassign.rs new file mode 100644 index 00000000000..5233e61ada1 --- /dev/null +++ b/core/graphman/src/commands/deployment/unassign.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use anyhow::anyhow; +use graph::components::store::DeploymentLocator; +use graph::components::store::StoreEvent; +use graph_store_postgres::command_support::catalog; +use graph_store_postgres::command_support::catalog::Site; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use thiserror::Error; + +use crate::deployment::DeploymentSelector; +use crate::deployment::DeploymentVersionSelector; +use crate::GraphmanError; + +pub struct AssignedDeployment { + locator: DeploymentLocator, + site: Site, +} + +impl AssignedDeployment { + pub fn locator(&self) -> &DeploymentLocator { + &self.locator + } +} + +#[derive(Debug, Error)] +pub enum UnassignDeploymentError { + #[error("deployment '{0}' is already unassigned")] + AlreadyUnassigned(String), + + #[error(transparent)] + Common(#[from] GraphmanError), +} + +pub fn load_assigned_deployment( + primary_pool: ConnectionPool, + deployment: &DeploymentSelector, +) -> Result { + let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?; + + let locator = crate::deployment::load_deployment_locator( + &mut primary_conn, + deployment, + &DeploymentVersionSelector::All, + )?; + + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let site = catalog_conn + .locate_site(locator.clone()) + .map_err(GraphmanError::from)? + .ok_or_else(|| { + GraphmanError::Store(anyhow!("deployment site not found for '{locator}'")) + })?; + + match catalog_conn + .assigned_node(&site) + .map_err(GraphmanError::from)? + { + Some(_) => Ok(AssignedDeployment { locator, site }), + None => Err(UnassignDeploymentError::AlreadyUnassigned( + locator.to_string(), + )), + } +} + +pub fn unassign_deployment( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: AssignedDeployment, +) -> Result<(), GraphmanError> { + let primary_conn = primary_pool.get()?; + let mut catalog_conn = catalog::Connection::new(primary_conn); + + let changes = catalog_conn.unassign_subgraph(&deployment.site)?; + catalog_conn.send_store_event(¬ification_sender, &StoreEvent::new(changes))?; + + Ok(()) +} diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index afc09403357..0b1c176468d 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -12,7 +12,7 @@ use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX}; use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry}; use graph::{ prelude::{ - anyhow::{self, Context as AnyhowContextTrait}, + anyhow::{self, anyhow, Context as AnyhowContextTrait}, info, tokio, Logger, NodeId, }, url::Url, @@ -1198,12 +1198,22 @@ async fn main() -> anyhow::Result<()> { Remove { name } => commands::remove::run(ctx.subgraph_store(), &name), Create { name } => commands::create::run(ctx.subgraph_store(), name), Unassign { deployment } => { - let sender = ctx.notification_sender(); - commands::assign::unassign(ctx.primary_pool(), &sender, &deployment).await + let notifications_sender = ctx.notification_sender(); + let primary_pool = ctx.primary_pool(); + let deployment = make_deployment_selector(deployment); + commands::deployment::unassign::run(primary_pool, notifications_sender, deployment) } Reassign { deployment, node } => { - let sender = ctx.notification_sender(); - commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node) + let notifications_sender = ctx.notification_sender(); + let primary_pool = ctx.primary_pool(); + let deployment = make_deployment_selector(deployment); + let node = NodeId::new(node).map_err(|node| anyhow!("invalid node id {:?}", node))?; + commands::deployment::reassign::run( + primary_pool, + notifications_sender, + deployment, + &node, + ) } Pause { deployment } => { let notifications_sender = ctx.notification_sender(); diff --git a/node/src/manager/commands/deployment/mod.rs b/node/src/manager/commands/deployment/mod.rs index 98910d7b4c4..8fd0237d3a7 100644 --- a/node/src/manager/commands/deployment/mod.rs +++ b/node/src/manager/commands/deployment/mod.rs @@ -1,4 +1,6 @@ pub mod info; pub mod pause; +pub mod reassign; pub mod restart; pub mod resume; +pub mod unassign; diff --git a/node/src/manager/commands/deployment/reassign.rs b/node/src/manager/commands/deployment/reassign.rs new file mode 100644 index 00000000000..60528f16206 --- /dev/null +++ b/node/src/manager/commands/deployment/reassign.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; + +use anyhow::Result; +use graph::prelude::NodeId; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graphman::commands::deployment::reassign::{ + load_deployment, reassign_deployment, ReassignResult, +}; +use graphman::deployment::DeploymentSelector; + +pub fn run( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: DeploymentSelector, + node: &NodeId, +) -> Result<()> { + let deployment = load_deployment(primary_pool.clone(), &deployment)?; + + println!("Reassigning deployment {}", deployment.locator()); + + let reassign_result = + reassign_deployment(primary_pool, notification_sender, &deployment, node)?; + + match reassign_result { + ReassignResult::EmptyResponse => { + println!( + "Deployment {} assigned to node {}", + deployment.locator(), + node + ); + } + ReassignResult::CompletedWithWarnings(warnings) => { + for msg in warnings { + println!("{}", msg); + } + } + } + + Ok(()) +} diff --git a/node/src/manager/commands/deployment/unassign.rs b/node/src/manager/commands/deployment/unassign.rs new file mode 100644 index 00000000000..45567e81f63 --- /dev/null +++ b/node/src/manager/commands/deployment/unassign.rs @@ -0,0 +1,22 @@ +use std::sync::Arc; + +use anyhow::Result; +use graph_store_postgres::connection_pool::ConnectionPool; +use graph_store_postgres::NotificationSender; +use graphman::commands::deployment::unassign::load_assigned_deployment; +use graphman::commands::deployment::unassign::unassign_deployment; +use graphman::deployment::DeploymentSelector; + +pub fn run( + primary_pool: ConnectionPool, + notification_sender: Arc, + deployment: DeploymentSelector, +) -> Result<()> { + let assigned_deployment = load_assigned_deployment(primary_pool.clone(), &deployment)?; + + println!("Unassigning deployment {}", assigned_deployment.locator()); + + unassign_deployment(primary_pool, notification_sender, assigned_deployment)?; + + Ok(()) +} diff --git a/server/graphman/src/entities/mod.rs b/server/graphman/src/entities/mod.rs index 8f4b2d8c018..c8d3330c9f7 100644 --- a/server/graphman/src/entities/mod.rs +++ b/server/graphman/src/entities/mod.rs @@ -10,6 +10,7 @@ mod empty_response; mod execution; mod execution_id; mod subgraph_health; +mod warning_response; pub use self::block_hash::BlockHash; pub use self::block_number::BlockNumber; @@ -23,3 +24,4 @@ pub use self::empty_response::EmptyResponse; pub use self::execution::Execution; pub use self::execution_id::ExecutionId; pub use self::subgraph_health::SubgraphHealth; +pub use self::warning_response::CompletedWithWarnings; diff --git a/server/graphman/src/entities/warning_response.rs b/server/graphman/src/entities/warning_response.rs new file mode 100644 index 00000000000..a99c4ce7583 --- /dev/null +++ b/server/graphman/src/entities/warning_response.rs @@ -0,0 +1,12 @@ +use async_graphql::SimpleObject; + +#[derive(Clone, Debug, SimpleObject)] +pub struct CompletedWithWarnings { + pub warnings: Vec, +} + +impl CompletedWithWarnings { + pub fn new(warnings: Vec) -> Self { + Self { warnings } + } +} diff --git a/server/graphman/src/resolvers/deployment_mutation.rs b/server/graphman/src/resolvers/deployment_mutation.rs index 983897391cf..8848578ce27 100644 --- a/server/graphman/src/resolvers/deployment_mutation.rs +++ b/server/graphman/src/resolvers/deployment_mutation.rs @@ -1,10 +1,15 @@ use std::sync::Arc; +use anyhow::anyhow; use async_graphql::Context; use async_graphql::Object; use async_graphql::Result; +use async_graphql::Union; +use graph::prelude::NodeId; use graph_store_postgres::graphman::GraphmanStore; +use graphman::commands::deployment::reassign::ReassignResult; +use crate::entities::CompletedWithWarnings; use crate::entities::DeploymentSelector; use crate::entities::EmptyResponse; use crate::entities::ExecutionId; @@ -12,12 +17,20 @@ use crate::resolvers::context::GraphmanContext; mod create; mod pause; +mod reassign; mod remove; mod restart; mod resume; +mod unassign; pub struct DeploymentMutation; +#[derive(Clone, Debug, Union)] +pub enum ReassignResponse { + EmptyResponse(EmptyResponse), + CompletedWithWarnings(CompletedWithWarnings), +} + /// Mutations related to one or multiple deployments. #[Object] impl DeploymentMutation { @@ -81,4 +94,39 @@ impl DeploymentMutation { remove::run(&ctx, &name)?; Ok(EmptyResponse::new()) } + + /// Unassign a deployment + pub async fn unassign( + &self, + ctx: &Context<'_>, + deployment: DeploymentSelector, + ) -> Result { + let ctx = GraphmanContext::new(ctx)?; + let deployment = deployment.try_into()?; + + unassign::run(&ctx, &deployment)?; + + Ok(EmptyResponse::new()) + } + + /// Assign or reassign a deployment + pub async fn reassign( + &self, + ctx: &Context<'_>, + deployment: DeploymentSelector, + node: String, + ) -> Result { + let ctx = GraphmanContext::new(ctx)?; + let deployment = deployment.try_into()?; + let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?; + let reassign_result = reassign::run(&ctx, &deployment, &node)?; + match reassign_result { + ReassignResult::CompletedWithWarnings(warnings) => Ok( + ReassignResponse::CompletedWithWarnings(CompletedWithWarnings::new(warnings)), + ), + ReassignResult::EmptyResponse => { + Ok(ReassignResponse::EmptyResponse(EmptyResponse::new())) + } + } + } } diff --git a/server/graphman/src/resolvers/deployment_mutation/reassign.rs b/server/graphman/src/resolvers/deployment_mutation/reassign.rs new file mode 100644 index 00000000000..3887d67032a --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation/reassign.rs @@ -0,0 +1,24 @@ +use anyhow::Ok; +use async_graphql::Result; +use graph::prelude::NodeId; +use graphman::commands::deployment::reassign::load_deployment; +use graphman::commands::deployment::reassign::reassign_deployment; +use graphman::commands::deployment::reassign::ReassignResult; +use graphman::deployment::DeploymentSelector; + +use crate::resolvers::context::GraphmanContext; + +pub fn run( + ctx: &GraphmanContext, + deployment: &DeploymentSelector, + node: &NodeId, +) -> Result { + let deployment = load_deployment(ctx.primary_pool.clone(), deployment)?; + let reassign_result = reassign_deployment( + ctx.primary_pool.clone(), + ctx.notification_sender.clone(), + &deployment, + &node, + )?; + Ok(reassign_result) +} diff --git a/server/graphman/src/resolvers/deployment_mutation/unassign.rs b/server/graphman/src/resolvers/deployment_mutation/unassign.rs new file mode 100644 index 00000000000..4af620e8568 --- /dev/null +++ b/server/graphman/src/resolvers/deployment_mutation/unassign.rs @@ -0,0 +1,17 @@ +use async_graphql::Result; +use graphman::commands::deployment::unassign::load_assigned_deployment; +use graphman::commands::deployment::unassign::unassign_deployment; +use graphman::deployment::DeploymentSelector; + +use crate::resolvers::context::GraphmanContext; + +pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector) -> Result<()> { + let deployment = load_assigned_deployment(ctx.primary_pool.clone(), deployment)?; + unassign_deployment( + ctx.primary_pool.clone(), + ctx.notification_sender.clone(), + deployment, + )?; + + Ok(()) +} diff --git a/server/graphman/tests/deployment_mutation.rs b/server/graphman/tests/deployment_mutation.rs index 927cf5bc87a..04049caaf59 100644 --- a/server/graphman/tests/deployment_mutation.rs +++ b/server/graphman/tests/deployment_mutation.rs @@ -390,3 +390,197 @@ fn graphql_cannot_remove_subgraph_with_invalid_name() { assert_ne!(resp, success_resp); }); } + +#[test] +fn graphql_can_unassign_deployments() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let unassign_req = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "unassign": { + "success": true, + } + } + } + }); + + let subgraph_node_id = send_graphql_request( + json!({ + "query": r#"{ + deployment { + info(deployment: { hash: "subgraph_1" }) { + nodeId + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let is_node_null = subgraph_node_id["data"]["deployment"]["info"][0]["nodeId"].is_null(); + + assert_eq!(unassign_req, expected_resp); + assert_eq!(is_node_null, true); + }); +} + +#[test] +fn graphql_cannot_unassign_deployments_twice() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let unassign_again = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "unassign": { + "success": true, + } + } + } + }); + + assert_ne!(unassign_again, expected_resp); + }); +} + +#[test] +fn graphql_can_reassign_deployment() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let deployment_hash = DeploymentHash::new("subgraph_2").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + unassign(deployment: { hash: "subgraph_1" }){ + success + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let reassign = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + reassign(deployment: { hash: "subgraph_1" }, node: "test") { + ... on EmptyResponse { + success + } + ... on CompletedWithWarnings { + warnings + } + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "reassign": { + "success": true, + } + } + } + }); + + assert_eq!(reassign, expected_resp); + }); +} + +#[test] +fn graphql_warns_reassign_on_wrong_node_id() { + run_test(|| async { + let deployment_hash = DeploymentHash::new("subgraph_1").unwrap(); + create_test_subgraph(&deployment_hash, TEST_SUBGRAPH_SCHEMA).await; + + let reassign = send_graphql_request( + json!({ + "query": r#"mutation { + deployment { + reassign(deployment: { hash: "subgraph_1" }, node: "invalid_node") { + ... on EmptyResponse { + success + } + ... on CompletedWithWarnings { + warnings + } + } + } + }"# + }), + VALID_TOKEN, + ) + .await; + + let expected_resp = json!({ + "data": { + "deployment": { + "reassign": { + "warnings": ["This is the only deployment assigned to 'invalid_node'. Please make sure that the node ID is spelled correctly."], + } + } + } + }); + + assert_eq!(reassign, expected_resp); + }); +}