Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions core/src/subgraph/registrar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,32 +173,33 @@ where
anyhow!("Failed to get subgraph assignment entity: {}", e)
})
.map(|assigned| -> Box<dyn Stream<Item = _, Error = _> + Send> {
let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string()));
if let Some((assigned,is_paused)) = assigned {
if assigned == node_id {

if is_paused{
// Subgraph is paused, so we don't start it
debug!(logger, "Deployment assignee is this node, but it is paused, so we don't start it"; "assigned_to" => assigned, "node_id" => &node_id,"paused" => is_paused);
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore");
return Box::new(stream::empty());
}

// Start subgraph on this node
debug!(logger, "Deployment assignee is this node, broadcasting add event"; "assigned_to" => assigned, "node_id" => &node_id);
debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add");
Box::new(stream::once(Ok(AssignmentEvent::Add {
deployment,
node_id: node_id.clone(),
})))
} else {
// Ensure it is removed from this node
debug!(logger, "Deployment assignee is not this node, broadcasting remove event"; "assigned_to" => assigned, "node_id" => &node_id);
debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove");
Box::new(stream::once(Ok(AssignmentEvent::Remove {
deployment,
node_id: node_id.clone(),
})))
}
} else {
// Was added/updated, but is now gone.
debug!(logger, "Deployment has not assignee, we will get a separate remove event later"; "node_id" => &node_id);
debug!(logger, "Deployment assignee not found in database"; "action" => "ignore");
Box::new(stream::empty())
}
})
Expand Down
2 changes: 1 addition & 1 deletion node/src/launcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ pub async fn run(
// Obtain metrics server port
let metrics_port = opt.metrics_port;

info!(logger, "Starting up");
info!(logger, "Starting up"; "node_id" => &node_id);

// Set up metrics
let (prometheus_registry, metrics_registry) = setup_metrics(&logger);
Expand Down
1 change: 1 addition & 0 deletions store/postgres/src/notification_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ impl NotificationListener {
}
}
}
warn!(logger, "Listener dropped. Terminating listener thread");
}))
.unwrap_or_else(|_| std::process::exit(1))
});
Expand Down
62 changes: 47 additions & 15 deletions store/postgres/src/store_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,21 @@ pub struct SubscriptionManager {

/// Keep the notification listener alive
listener: StoreEventListener,

logger: Logger,
}

impl SubscriptionManager {
pub fn new(logger: Logger, postgres_url: String, registry: Arc<MetricsRegistry>) -> Self {
let (listener, store_events) = StoreEventListener::new(logger, postgres_url, registry);
let logger = logger.new(o!("component" => "StoreEventListener"));

let (listener, store_events) =
StoreEventListener::new(logger.cheap_clone(), postgres_url, registry);

let mut manager = SubscriptionManager {
subscriptions: Arc::new(RwLock::new(HashMap::new())),
listener,
logger,
};

// Deal with store subscriptions
Expand All @@ -112,6 +118,32 @@ impl SubscriptionManager {
manager
}

async fn broadcast_event(
logger: &Logger,
subscriptions: &Arc<RwLock<HashMap<usize, Sender<Arc<StoreEvent>>>>>,
event: StoreEvent,
) {
let event = Arc::new(event);

// Send to `subscriptions`.
{
let senders = subscriptions.read().unwrap().clone();

// Write change to all matching subscription streams; remove subscriptions
// whose receiving end has been dropped
for (id, sender) in senders {
if let Err(e) = sender.send(event.cheap_clone()).await {
error!(
logger,
"Failed to send store event to subscriber {}: {}", id, e
);
// Receiver was dropped
subscriptions.write().unwrap().remove(&id);
}
}
}
}

/// Receive store events from Postgres and send them to all active
/// subscriptions. Detect stale subscriptions in the process and
/// close them.
Expand All @@ -121,24 +153,22 @@ impl SubscriptionManager {
) {
let subscriptions = self.subscriptions.cheap_clone();
let mut store_events = store_events.compat();
let logger = self.logger.cheap_clone();

// This channel is constantly receiving things and there are locks involved,
// so it's best to use a blocking task.
graph::spawn_blocking(async move {
while let Some(Ok(event)) = store_events.next().await {
let event = Arc::new(event);

// Send to `subscriptions`.
{
let senders = subscriptions.read().unwrap().clone();

// Write change to all matching subscription streams; remove subscriptions
// whose receiving end has been dropped
for (id, sender) in senders {
if sender.send(event.cheap_clone()).await.is_err() {
// Receiver was dropped
subscriptions.write().unwrap().remove(&id);
}
loop {
match store_events.next().await {
Some(Ok(event)) => {
Self::broadcast_event(&logger, &subscriptions, event).await;
}
Some(Err(_)) => {
error!(logger, "Error receiving store event");
}
None => {
error!(logger, "Store event stream ended");
break;
}
}
}
Expand All @@ -147,6 +177,7 @@ impl SubscriptionManager {

fn periodically_clean_up_stale_subscriptions(&self) {
let subscriptions = self.subscriptions.cheap_clone();
let logger = self.logger.cheap_clone();

// Clean up stale subscriptions every 5s
graph::spawn(async move {
Expand All @@ -169,6 +200,7 @@ impl SubscriptionManager {

// Remove all stale subscriptions
for id in stale_ids {
warn!(logger, "Removing stale subscription {}", id);
subscriptions.remove(&id);
}
}
Expand Down
Loading