Skip to content

Commit 7a02d8a

Browse files
committed
feat: add graphman unassign/reassign command to graphql api
1 parent 1642eaf commit 7a02d8a

File tree

7 files changed

+256
-5
lines changed

7 files changed

+256
-5
lines changed
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
11
pub mod info;
22
pub mod pause;
3+
pub mod reassign;
34
pub mod resume;
5+
pub mod unassign;
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::anyhow;
4+
use graph::components::store::DeploymentLocator;
5+
use graph::components::store::StoreEvent;
6+
use graph::prelude::EntityChange;
7+
use graph::prelude::NodeId;
8+
use graph_store_postgres::command_support::catalog;
9+
use graph_store_postgres::command_support::catalog::Site;
10+
use graph_store_postgres::connection_pool::ConnectionPool;
11+
use graph_store_postgres::NotificationSender;
12+
use thiserror::Error;
13+
14+
use crate::deployment::DeploymentSelector;
15+
use crate::deployment::DeploymentVersionSelector;
16+
use crate::GraphmanError;
17+
18+
pub struct ActiveDeployment {
19+
locator: DeploymentLocator,
20+
site: Site,
21+
}
22+
23+
#[derive(Debug, Error)]
24+
pub enum ReassignDeploymentError {
25+
#[error("deployment '{0}' is already assigned to '{1}'")]
26+
AlreadyAssigned(String, String),
27+
28+
#[error(transparent)]
29+
Common(#[from] GraphmanError),
30+
}
31+
32+
pub fn load_deployment(
33+
primary_pool: ConnectionPool,
34+
deployment: &DeploymentSelector,
35+
) -> Result<ActiveDeployment, ReassignDeploymentError> {
36+
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
37+
38+
let locator = crate::deployment::load_deployment(
39+
&mut primary_conn,
40+
deployment,
41+
&DeploymentVersionSelector::All,
42+
)?
43+
.locator();
44+
45+
let mut catalog_conn = catalog::Connection::new(primary_conn);
46+
47+
let site = catalog_conn
48+
.locate_site(locator.clone())
49+
.map_err(GraphmanError::from)?
50+
.ok_or_else(|| {
51+
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
52+
})?;
53+
54+
Ok(ActiveDeployment { locator, site })
55+
}
56+
57+
pub fn reassign_deployment(
58+
primary_pool: ConnectionPool,
59+
notification_sender: Arc<NotificationSender>,
60+
deployment: ActiveDeployment,
61+
node: &NodeId,
62+
) -> Result<(), ReassignDeploymentError> {
63+
let primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
64+
let mut catalog_conn = catalog::Connection::new(primary_conn);
65+
66+
let changes: Vec<EntityChange> = match catalog_conn
67+
.assigned_node(&deployment.site)
68+
.map_err(GraphmanError::from)?
69+
{
70+
Some(curr) => {
71+
if &curr == node {
72+
vec![]
73+
} else {
74+
catalog_conn
75+
.reassign_subgraph(&deployment.site, &node)
76+
.map_err(GraphmanError::from)?
77+
}
78+
}
79+
None => catalog_conn
80+
.assign_subgraph(&deployment.site, &node)
81+
.map_err(GraphmanError::from)?,
82+
};
83+
84+
if changes.is_empty() {
85+
return Err(ReassignDeploymentError::AlreadyAssigned(
86+
deployment.locator.hash.to_string(),
87+
node.to_string(),
88+
));
89+
}
90+
91+
catalog_conn
92+
.send_store_event(&notification_sender, &StoreEvent::new(changes))
93+
.map_err(GraphmanError::from)?;
94+
95+
Ok(())
96+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
use std::sync::Arc;
2+
3+
use anyhow::anyhow;
4+
use graph::components::store::StoreEvent;
5+
use graph_store_postgres::command_support::catalog;
6+
use graph_store_postgres::command_support::catalog::Site;
7+
use graph_store_postgres::connection_pool::ConnectionPool;
8+
use graph_store_postgres::NotificationSender;
9+
use thiserror::Error;
10+
11+
use crate::deployment::DeploymentSelector;
12+
use crate::deployment::DeploymentVersionSelector;
13+
use crate::GraphmanError;
14+
15+
pub struct AssignedDeployment {
16+
site: Site,
17+
}
18+
19+
#[derive(Debug, Error)]
20+
pub enum UnassignDeploymentError {
21+
#[error("deployment '{0}' is already unassigned")]
22+
AlreadyUnassigned(String),
23+
24+
#[error(transparent)]
25+
Common(#[from] GraphmanError),
26+
}
27+
28+
pub fn load_assigned_deployment(
29+
primary_pool: ConnectionPool,
30+
deployment: &DeploymentSelector,
31+
) -> Result<AssignedDeployment, UnassignDeploymentError> {
32+
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
33+
34+
let locator = crate::deployment::load_deployment(
35+
&mut primary_conn,
36+
deployment,
37+
&DeploymentVersionSelector::All,
38+
)?
39+
.locator();
40+
41+
let mut catalog_conn = catalog::Connection::new(primary_conn);
42+
43+
let site = catalog_conn
44+
.locate_site(locator.clone())
45+
.map_err(GraphmanError::from)?
46+
.ok_or_else(|| {
47+
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
48+
})?;
49+
50+
match catalog_conn
51+
.assigned_node(&site)
52+
.map_err(GraphmanError::from)?
53+
{
54+
Some(_) => Ok(AssignedDeployment { site }),
55+
None => Err(UnassignDeploymentError::AlreadyUnassigned(format!(
56+
"{:?}",
57+
deployment
58+
))),
59+
}
60+
}
61+
62+
pub fn unassign_deployment(
63+
primary_pool: ConnectionPool,
64+
notification_sender: Arc<NotificationSender>,
65+
deployment: AssignedDeployment,
66+
) -> Result<(), GraphmanError> {
67+
let primary_conn = primary_pool.get()?;
68+
let mut catalog_conn = catalog::Connection::new(primary_conn);
69+
70+
let changes = catalog_conn.unassign_subgraph(&deployment.site)?;
71+
catalog_conn.send_store_event(&notification_sender, &StoreEvent::new(changes))?;
72+
73+
Ok(())
74+
}

server/graphman/src/entities/empty_response.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ use async_graphql::SimpleObject;
55
#[derive(Clone, Debug, SimpleObject)]
66
pub struct EmptyResponse {
77
pub success: bool,
8+
pub message: Option<String>,
89
}
910

1011
impl EmptyResponse {
1112
/// Returns a successful response.
12-
pub fn new() -> Self {
13-
Self { success: true }
13+
pub fn new(msg: Option<String>) -> Self {
14+
Self {
15+
success: true,
16+
message: msg,
17+
}
1418
}
1519
}

server/graphman/src/resolvers/deployment_mutation.rs

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::sync::Arc;
22

3+
use anyhow::anyhow;
34
use async_graphql::Context;
45
use async_graphql::Object;
56
use async_graphql::Result;
7+
use graph::prelude::NodeId;
8+
use graph_store_postgres::command_support::catalog;
69
use graph_store_postgres::graphman::GraphmanStore;
710

811
use crate::entities::DeploymentSelector;
@@ -11,9 +14,10 @@ use crate::entities::ExecutionId;
1114
use crate::resolvers::context::GraphmanContext;
1215

1316
mod pause;
17+
mod reassign;
1418
mod restart;
1519
mod resume;
16-
20+
mod unassign;
1721
pub struct DeploymentMutation;
1822

1923
/// Mutations related to one or multiple deployments.
@@ -30,7 +34,7 @@ impl DeploymentMutation {
3034

3135
pause::run(&ctx, &deployment)?;
3236

33-
Ok(EmptyResponse::new())
37+
Ok(EmptyResponse::new(None))
3438
}
3539

3640
/// Resumes a deployment that has been previously paused.
@@ -44,7 +48,7 @@ impl DeploymentMutation {
4448

4549
resume::run(&ctx, &deployment)?;
4650

47-
Ok(EmptyResponse::new())
51+
Ok(EmptyResponse::new(None))
4852
}
4953

5054
/// Pauses a deployment and resumes it after a delay.
@@ -65,4 +69,39 @@ impl DeploymentMutation {
6569

6670
restart::run_in_background(ctx, store, deployment, delay_seconds).await
6771
}
72+
73+
/// Unassign a deployment
74+
pub async fn unassign(
75+
&self,
76+
ctx: &Context<'_>,
77+
deployment: DeploymentSelector,
78+
) -> Result<EmptyResponse> {
79+
let ctx = GraphmanContext::new(ctx)?;
80+
let deployment = deployment.try_into()?;
81+
82+
unassign::run(&ctx, &deployment)?;
83+
84+
Ok(EmptyResponse::new(None))
85+
}
86+
87+
/// Assign or reassign a deployment
88+
pub async fn reassign(
89+
&self,
90+
ctx: &Context<'_>,
91+
deployment: DeploymentSelector,
92+
node: String,
93+
) -> Result<EmptyResponse> {
94+
let ctx = GraphmanContext::new(ctx)?;
95+
let deployment = deployment.try_into()?;
96+
let node = NodeId::new(node.clone()).map_err(|()| anyhow!("illegal node id `{}`", node))?;
97+
reassign::run(&ctx, &deployment, &node)?;
98+
99+
let mirror = catalog::Mirror::primary_only(ctx.primary_pool);
100+
let count = mirror.assignments(&node)?.len();
101+
if count == 1 {
102+
Ok(EmptyResponse::new(Some(format!("warning: this is the only deployment assigned to '{}'. Are you sure it is spelled correctly?",node.as_str()))))
103+
} else {
104+
Ok(EmptyResponse::new(None))
105+
}
106+
}
68107
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
use async_graphql::Result;
2+
use graph::prelude::NodeId;
3+
use graphman::commands::deployment::reassign::load_deployment;
4+
use graphman::commands::deployment::reassign::reassign_deployment;
5+
use graphman::deployment::DeploymentSelector;
6+
7+
use crate::resolvers::context::GraphmanContext;
8+
9+
pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector, node: &NodeId) -> Result<()> {
10+
let deployment = load_deployment(ctx.primary_pool.clone(), deployment)?;
11+
reassign_deployment(
12+
ctx.primary_pool.clone(),
13+
ctx.notification_sender.clone(),
14+
deployment,
15+
&node,
16+
)?;
17+
18+
Ok(())
19+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use async_graphql::Result;
2+
use graphman::commands::deployment::unassign::load_assigned_deployment;
3+
use graphman::commands::deployment::unassign::unassign_deployment;
4+
use graphman::deployment::DeploymentSelector;
5+
6+
use crate::resolvers::context::GraphmanContext;
7+
8+
pub fn run(ctx: &GraphmanContext, deployment: &DeploymentSelector) -> Result<()> {
9+
let deployment = load_assigned_deployment(ctx.primary_pool.clone(), deployment)?;
10+
unassign_deployment(
11+
ctx.primary_pool.clone(),
12+
ctx.notification_sender.clone(),
13+
deployment,
14+
)?;
15+
16+
Ok(())
17+
}

0 commit comments

Comments
 (0)