Skip to content

Commit 09db0ce

Browse files
Refactor graphman rewind to use pause and resume logic (#5213)
* Refactor deployment pause and resume logic * Refactor pause_and_resume to use DeploymentLocator, Add cloning functionality to the Context struct, * Update node/src/manager/commands/rewind.rs Co-authored-by: Krishnanand V P <[email protected]> * Update node/src/bin/manager.rs Co-authored-by: Krishnanand V P <[email protected]> * Update node/src/bin/manager.rs Co-authored-by: Krishnanand V P <[email protected]> * Refactor cloning and pausing logic in manager.rs and rewind.rs * Refactor code to use mutable connection in assign.rs and remove unused constant in rewind.rs * removing cloning in manager.rs * minor refactoring * node: throw an error when a deployment search matches more than one in rewind --------- Co-authored-by: Krishnanand V P <[email protected]> Co-authored-by: incrypto32 <[email protected]>
1 parent ec176b4 commit 09db0ce

File tree

3 files changed

+46
-46
lines changed

3 files changed

+46
-46
lines changed

node/src/bin/manager.rs

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1176,15 +1176,24 @@ async fn main() -> anyhow::Result<()> {
11761176
}
11771177
Pause { deployment } => {
11781178
let sender = ctx.notification_sender();
1179-
commands::assign::pause_or_resume(ctx.primary_pool(), &sender, &deployment, true)
1179+
let pool = ctx.primary_pool();
1180+
let locator = &deployment.locate_unique(&pool)?;
1181+
commands::assign::pause_or_resume(pool, &sender, locator, true)
11801182
}
1183+
11811184
Resume { deployment } => {
11821185
let sender = ctx.notification_sender();
1183-
commands::assign::pause_or_resume(ctx.primary_pool(), &sender, &deployment, false)
1186+
let pool = ctx.primary_pool();
1187+
let locator = &deployment.locate_unique(&pool).unwrap();
1188+
1189+
commands::assign::pause_or_resume(pool, &sender, locator, false)
11841190
}
11851191
Restart { deployment, sleep } => {
11861192
let sender = ctx.notification_sender();
1187-
commands::assign::restart(ctx.primary_pool(), &sender, &deployment, sleep)
1193+
let pool = ctx.primary_pool();
1194+
let locator = &deployment.locate_unique(&pool).unwrap();
1195+
1196+
commands::assign::restart(pool, &sender, locator, sleep)
11881197
}
11891198
Rewind {
11901199
force,
@@ -1194,13 +1203,16 @@ async fn main() -> anyhow::Result<()> {
11941203
deployments,
11951204
start_block,
11961205
} => {
1206+
let notification_sender = ctx.notification_sender();
11971207
let (store, primary) = ctx.store_and_primary();
1208+
11981209
commands::rewind::run(
11991210
primary,
12001211
store,
12011212
deployments,
12021213
block_hash,
12031214
block_number,
1215+
&notification_sender,
12041216
force,
12051217
sleep,
12061218
start_block,

node/src/manager/commands/assign.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use graph::components::store::DeploymentLocator;
12
use graph::prelude::{anyhow::anyhow, Error, NodeId, StoreEvent};
23
use graph_store_postgres::{
34
command_support::catalog, connection_pool::ConnectionPool, NotificationSender,
@@ -75,11 +76,9 @@ pub fn reassign(
7576
pub fn pause_or_resume(
7677
primary: ConnectionPool,
7778
sender: &NotificationSender,
78-
search: &DeploymentSearch,
79+
locator: &DeploymentLocator,
7980
should_pause: bool,
8081
) -> Result<(), Error> {
81-
let locator = search.locate_unique(&primary)?;
82-
8382
let pconn = primary.get()?;
8483
let mut conn = catalog::Connection::new(pconn);
8584

@@ -115,15 +114,15 @@ pub fn pause_or_resume(
115114
pub fn restart(
116115
primary: ConnectionPool,
117116
sender: &NotificationSender,
118-
search: &DeploymentSearch,
117+
locator: &DeploymentLocator,
119118
sleep: Duration,
120119
) -> Result<(), Error> {
121-
pause_or_resume(primary.clone(), sender, search, true)?;
120+
pause_or_resume(primary.clone(), sender, locator, true)?;
122121
println!(
123122
"Waiting {}s to make sure pausing was processed",
124123
sleep.as_secs()
125124
);
126125
thread::sleep(sleep);
127-
pause_or_resume(primary, sender, search, false)?;
126+
pause_or_resume(primary, sender, locator, false)?;
128127
Ok(())
129128
}

node/src/manager/commands/rewind.rs

Lines changed: 26 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ use std::{collections::HashSet, convert::TryFrom};
55

66
use graph::anyhow::bail;
77
use graph::components::store::{BlockStore as _, ChainStore as _};
8-
use graph::prelude::{anyhow, BlockNumber, BlockPtr, NodeId, SubgraphStore};
9-
use graph_store_postgres::BlockStore;
8+
use graph::prelude::{anyhow, BlockNumber, BlockPtr};
109
use graph_store_postgres::{connection_pool::ConnectionPool, Store};
10+
use graph_store_postgres::{BlockStore, NotificationSender};
1111

12+
use crate::manager::commands::assign::pause_or_resume;
1213
use crate::manager::deployment::{Deployment, DeploymentSearch};
1314

1415
async fn block_ptr(
@@ -61,12 +62,11 @@ pub async fn run(
6162
searches: Vec<DeploymentSearch>,
6263
block_hash: Option<String>,
6364
block_number: Option<BlockNumber>,
65+
sender: &NotificationSender,
6466
force: bool,
6567
sleep: Duration,
6668
start_block: bool,
6769
) -> Result<(), anyhow::Error> {
68-
const PAUSED: &str = "paused_";
69-
7070
// Sanity check
7171
if !start_block && (block_hash.is_none() || block_number.is_none()) {
7272
bail!("--block-hash and --block-number must be specified when --start-block is not set");
@@ -75,15 +75,20 @@ pub async fn run(
7575
let subgraph_store = store.subgraph_store();
7676
let block_store = store.block_store();
7777

78-
let deployments = searches
79-
.iter()
80-
.map(|search| search.lookup(&primary))
81-
.collect::<Result<Vec<_>, _>>()?
82-
.into_iter()
83-
.flatten()
84-
.collect::<Vec<_>>();
78+
let mut deployments = Vec::new();
79+
for search in &searches {
80+
let results = search.lookup(&primary)?;
81+
if results.len() > 1 {
82+
bail!(
83+
"Multiple deployments found for the search : {}. Try using the id of the deployment (eg: sgd143) to uniquely identify the deployment.",
84+
search
85+
);
86+
}
87+
deployments.extend(results);
88+
}
89+
8590
if deployments.is_empty() {
86-
println!("nothing to do");
91+
println!("No deployments found");
8792
return Ok(());
8893
}
8994

@@ -104,29 +109,17 @@ pub async fn run(
104109
};
105110

106111
println!("Pausing deployments");
107-
let mut paused = false;
108112
for deployment in &deployments {
109-
if let Some(node) = &deployment.node_id {
110-
if !node.starts_with(PAUSED) {
111-
let loc = deployment.locator();
112-
let node =
113-
NodeId::new(format!("{}{}", PAUSED, node)).expect("paused_ node id is valid");
114-
subgraph_store.reassign_subgraph(&loc, &node)?;
115-
println!(" ... paused {}", loc);
116-
paused = true;
117-
}
118-
}
113+
pause_or_resume(primary.clone(), &sender, &deployment.locator(), true)?;
119114
}
120115

121-
if paused {
122-
// There's no good way to tell that a subgraph has in fact stopped
123-
// indexing. We sleep and hope for the best.
124-
println!(
125-
"\nWaiting {}s to make sure pausing was processed",
126-
sleep.as_secs()
127-
);
128-
thread::sleep(sleep);
129-
}
116+
// There's no good way to tell that a subgraph has in fact stopped
117+
// indexing. We sleep and hope for the best.
118+
println!(
119+
"\nWaiting {}s to make sure pausing was processed",
120+
sleep.as_secs()
121+
);
122+
thread::sleep(sleep);
130123

131124
println!("\nRewinding deployments");
132125
for deployment in &deployments {
@@ -158,11 +151,7 @@ pub async fn run(
158151

159152
println!("Resuming deployments");
160153
for deployment in &deployments {
161-
if let Some(node) = &deployment.node_id {
162-
let loc = deployment.locator();
163-
let node = NodeId::new(node.clone()).expect("node id is valid");
164-
subgraph_store.reassign_subgraph(&loc, &node)?;
165-
}
154+
pause_or_resume(primary.clone(), &sender, &deployment.locator(), false)?;
166155
}
167156
Ok(())
168157
}

0 commit comments

Comments
 (0)