From 1f65045e29164aea27b55cbb2b382a44da0c0c35 Mon Sep 17 00:00:00 2001 From: Shiyas Mohammed Date: Fri, 12 Sep 2025 14:29:56 +0530 Subject: [PATCH] node: pause subgraphs before reassigning in graphman --- Cargo.lock | 8 +++--- node/src/bin/manager.rs | 15 ++++++++++- .../manager/commands/deployment/reassign.rs | 27 ++++++++++++++++--- 3 files changed, 41 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ce8de699bbb..c3113f58701 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6904,7 +6904,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3bab093bdd303a1240bb99b8aba8ea8a69ee19d34c9e2ef9594e708a4878820" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -6915,7 +6915,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -6924,7 +6924,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -7000,7 +7000,7 @@ version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ - "windows-link", + "windows-link 0.1.3", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index aba6595f1c9..25e52af7542 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -169,6 +169,14 @@ pub enum Command { deployment: DeploymentSearch, /// The name of the node that should index the deployment node: String, + /// Sleep for this many seconds between pausing and reassigning subgraphs + #[clap( + long, + short, + default_value = "20", + value_parser = parse_duration_in_secs + )] + sleep: Duration, }, /// Unassign a deployment Unassign { @@ -1230,7 +1238,11 @@ async fn main() -> anyhow::Result<()> { let deployment = make_deployment_selector(deployment); commands::deployment::unassign::run(primary_pool, notifications_sender, deployment) } - Reassign { deployment, node } => { + Reassign { + deployment, + node, + sleep, + } => { let notifications_sender = ctx.notification_sender(); let primary_pool = ctx.primary_pool(); let deployment = make_deployment_selector(deployment); @@ -1240,6 +1252,7 @@ async fn main() -> anyhow::Result<()> { notifications_sender, deployment, &node, + sleep, ) } Pause { deployment } => { diff --git a/node/src/manager/commands/deployment/reassign.rs b/node/src/manager/commands/deployment/reassign.rs index 80122fc90b1..92d8802b347 100644 --- a/node/src/manager/commands/deployment/reassign.rs +++ b/node/src/manager/commands/deployment/reassign.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use graph::prelude::NodeId; @@ -12,10 +13,22 @@ use graphman::deployment::DeploymentSelector; pub fn run( primary_pool: ConnectionPool, notification_sender: Arc, - deployment: DeploymentSelector, + deployment_selector: DeploymentSelector, node: &NodeId, + delay: Duration, ) -> Result<()> { - let deployment = load_deployment(primary_pool.clone(), &deployment)?; + super::pause::run( + primary_pool.clone(), + notification_sender.clone(), + deployment_selector.clone(), + )?; + + println!( + "Waiting {}s to make sure pausing was processed ...", + delay.as_secs() + ); + + let deployment = load_deployment(primary_pool.clone(), &deployment_selector)?; let curr_node = deployment.assigned_node(primary_pool.clone())?; let reassign_msg = match &curr_node { Some(curr_node) => format!( @@ -28,8 +41,8 @@ pub fn run( println!("{}", reassign_msg); let reassign_result = reassign_deployment( - primary_pool, - notification_sender, + primary_pool.clone(), + notification_sender.clone(), &deployment, node, curr_node, @@ -50,5 +63,11 @@ pub fn run( } } + super::resume::run( + primary_pool, + notification_sender, + deployment_selector.clone(), + )?; + Ok(()) }