Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/graphman/src/commands/deployment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod info;
pub mod pause;
pub mod reassign;
pub mod resume;
pub mod unassign;
117 changes: 117 additions & 0 deletions core/graphman/src/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
use std::sync::Arc;

use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::components::store::StoreEvent;
use graph::prelude::EntityChange;
use graph::prelude::NodeId;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use thiserror::Error;

use crate::deployment::DeploymentSelector;
use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;

pub struct Deployment {
locator: DeploymentLocator,
site: Site,
}

impl Deployment {
pub fn locator(&self) -> &DeploymentLocator {
&self.locator
}
}

#[derive(Debug, Error)]
pub enum ReassignDeploymentError {
#[error("deployment '{0}' is already assigned to '{1}'")]
AlreadyAssigned(String, String),

#[error(transparent)]
Common(#[from] GraphmanError),
}

#[derive(Clone, Debug)]
pub enum ReassignResult {
EmptyResponse,
CompletedWithWarnings(Vec<String>),
}
Comment on lines +39 to +42
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note. It is not very common for a Rust function to return T::EmptyResponse on success (it makes sense for GraphQL types because of GraphQL style and conventions). Maybe a better name for this variant is ReassignResult::Ok, but that's not really important right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I've created a new issue for that.


pub fn load_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<Deployment, ReassignDeploymentError> {
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
deployment,
&DeploymentVersionSelector::All,
)?;

let mut catalog_conn = catalog::Connection::new(primary_conn);

let site = catalog_conn
.locate_site(locator.clone())
.map_err(GraphmanError::from)?
.ok_or_else(|| {
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
})?;

Ok(Deployment { locator, site })
}

pub fn reassign_deployment(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: &Deployment,
node: &NodeId,
) -> Result<ReassignResult, ReassignDeploymentError> {
let primary_conn = primary_pool.get().map_err(GraphmanError::from)?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes: Vec<EntityChange> = match catalog_conn
.assigned_node(&deployment.site)
.map_err(GraphmanError::from)?
{
Some(curr) => {
if &curr == node {
vec![]
} else {
catalog_conn
.reassign_subgraph(&deployment.site, &node)
.map_err(GraphmanError::from)?
}
}
None => catalog_conn
.assign_subgraph(&deployment.site, &node)
.map_err(GraphmanError::from)?,
};

if changes.is_empty() {
return Err(ReassignDeploymentError::AlreadyAssigned(
deployment.locator.to_string(),
node.to_string(),
));
}

catalog_conn
.send_store_event(&notification_sender, &StoreEvent::new(changes))
.map_err(GraphmanError::from)?;

let mirror = catalog::Mirror::primary_only(primary_pool);
let count = mirror
.assignments(&node)
.map_err(GraphmanError::from)?
.len();
if count == 1 {
let warning_msg = format!("This is the only deployment assigned to '{}'. Please make sure that the node ID is spelled correctly.",node.as_str());
Ok(ReassignResult::CompletedWithWarnings(vec![warning_msg]))
} else {
Ok(ReassignResult::EmptyResponse)
}
}
80 changes: 80 additions & 0 deletions core/graphman/src/commands/deployment/unassign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::sync::Arc;

use anyhow::anyhow;
use graph::components::store::DeploymentLocator;
use graph::components::store::StoreEvent;
use graph_store_postgres::command_support::catalog;
use graph_store_postgres::command_support::catalog::Site;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use thiserror::Error;

use crate::deployment::DeploymentSelector;
use crate::deployment::DeploymentVersionSelector;
use crate::GraphmanError;

pub struct AssignedDeployment {
locator: DeploymentLocator,
site: Site,
}

impl AssignedDeployment {
pub fn locator(&self) -> &DeploymentLocator {
&self.locator
}
}

#[derive(Debug, Error)]
pub enum UnassignDeploymentError {
#[error("deployment '{0}' is already unassigned")]
AlreadyUnassigned(String),

#[error(transparent)]
Common(#[from] GraphmanError),
}

pub fn load_assigned_deployment(
primary_pool: ConnectionPool,
deployment: &DeploymentSelector,
) -> Result<AssignedDeployment, UnassignDeploymentError> {
let mut primary_conn = primary_pool.get().map_err(GraphmanError::from)?;

let locator = crate::deployment::load_deployment_locator(
&mut primary_conn,
deployment,
&DeploymentVersionSelector::All,
)?;

let mut catalog_conn = catalog::Connection::new(primary_conn);

let site = catalog_conn
.locate_site(locator.clone())
.map_err(GraphmanError::from)?
.ok_or_else(|| {
GraphmanError::Store(anyhow!("deployment site not found for '{locator}'"))
})?;

match catalog_conn
.assigned_node(&site)
.map_err(GraphmanError::from)?
{
Some(_) => Ok(AssignedDeployment { locator, site }),
None => Err(UnassignDeploymentError::AlreadyUnassigned(
locator.to_string(),
)),
}
}

pub fn unassign_deployment(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: AssignedDeployment,
) -> Result<(), GraphmanError> {
let primary_conn = primary_pool.get()?;
let mut catalog_conn = catalog::Connection::new(primary_conn);

let changes = catalog_conn.unassign_subgraph(&deployment.site)?;
catalog_conn.send_store_event(&notification_sender, &StoreEvent::new(changes))?;

Ok(())
}
20 changes: 15 additions & 5 deletions node/src/bin/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use graph::prelude::{MetricsRegistry, BLOCK_NUMBER_MAX};
use graph::{data::graphql::load_manager::LoadManager, prelude::chrono, prometheus::Registry};
use graph::{
prelude::{
anyhow::{self, Context as AnyhowContextTrait},
anyhow::{self, anyhow, Context as AnyhowContextTrait},
info, tokio, Logger, NodeId,
},
url::Url,
Expand Down Expand Up @@ -1198,12 +1198,22 @@ async fn main() -> anyhow::Result<()> {
Remove { name } => commands::remove::run(ctx.subgraph_store(), &name),
Create { name } => commands::create::run(ctx.subgraph_store(), name),
Unassign { deployment } => {
let sender = ctx.notification_sender();
commands::assign::unassign(ctx.primary_pool(), &sender, &deployment).await
let notifications_sender = ctx.notification_sender();
let primary_pool = ctx.primary_pool();
let deployment = make_deployment_selector(deployment);
commands::deployment::unassign::run(primary_pool, notifications_sender, deployment)
}
Reassign { deployment, node } => {
let sender = ctx.notification_sender();
commands::assign::reassign(ctx.primary_pool(), &sender, &deployment, node)
let notifications_sender = ctx.notification_sender();
let primary_pool = ctx.primary_pool();
let deployment = make_deployment_selector(deployment);
let node = NodeId::new(node).map_err(|node| anyhow!("invalid node id {:?}", node))?;
commands::deployment::reassign::run(
primary_pool,
notifications_sender,
deployment,
&node,
)
}
Pause { deployment } => {
let notifications_sender = ctx.notification_sender();
Expand Down
2 changes: 2 additions & 0 deletions node/src/manager/commands/deployment/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod info;
pub mod pause;
pub mod reassign;
pub mod restart;
pub mod resume;
pub mod unassign;
41 changes: 41 additions & 0 deletions node/src/manager/commands/deployment/reassign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use std::sync::Arc;

use anyhow::Result;
use graph::prelude::NodeId;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use graphman::commands::deployment::reassign::{
load_deployment, reassign_deployment, ReassignResult,
};
use graphman::deployment::DeploymentSelector;

pub fn run(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: DeploymentSelector,
node: &NodeId,
) -> Result<()> {
let deployment = load_deployment(primary_pool.clone(), &deployment)?;

println!("Reassigning deployment {}", deployment.locator());

let reassign_result =
reassign_deployment(primary_pool, notification_sender, &deployment, node)?;

match reassign_result {
ReassignResult::EmptyResponse => {
println!(
"Deployment {} assigned to node {}",
deployment.locator(),
node
);
}
ReassignResult::CompletedWithWarnings(warnings) => {
for msg in warnings {
println!("{}", msg);
}
}
}

Ok(())
}
22 changes: 22 additions & 0 deletions node/src/manager/commands/deployment/unassign.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use std::sync::Arc;

use anyhow::Result;
use graph_store_postgres::connection_pool::ConnectionPool;
use graph_store_postgres::NotificationSender;
use graphman::commands::deployment::unassign::load_assigned_deployment;
use graphman::commands::deployment::unassign::unassign_deployment;
use graphman::deployment::DeploymentSelector;

pub fn run(
primary_pool: ConnectionPool,
notification_sender: Arc<NotificationSender>,
deployment: DeploymentSelector,
) -> Result<()> {
let assigned_deployment = load_assigned_deployment(primary_pool.clone(), &deployment)?;

println!("Unassigning deployment {}", assigned_deployment.locator());

unassign_deployment(primary_pool, notification_sender, assigned_deployment)?;

Ok(())
}
2 changes: 2 additions & 0 deletions server/graphman/src/entities/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod empty_response;
mod execution;
mod execution_id;
mod subgraph_health;
mod warning_response;

pub use self::block_hash::BlockHash;
pub use self::block_number::BlockNumber;
Expand All @@ -23,3 +24,4 @@ pub use self::empty_response::EmptyResponse;
pub use self::execution::Execution;
pub use self::execution_id::ExecutionId;
pub use self::subgraph_health::SubgraphHealth;
pub use self::warning_response::CompletedWithWarnings;
12 changes: 12 additions & 0 deletions server/graphman/src/entities/warning_response.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use async_graphql::SimpleObject;

#[derive(Clone, Debug, SimpleObject)]
pub struct CompletedWithWarnings {
pub warnings: Vec<String>,
}

impl CompletedWithWarnings {
pub fn new(warnings: Vec<String>) -> Self {
Self { warnings }
}
}
Loading
Loading