diff --git a/Cargo.lock b/Cargo.lock index ce8de699bbb..c3113f58701 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6904,7 +6904,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b3bab093bdd303a1240bb99b8aba8ea8a69ee19d34c9e2ef9594e708a4878820" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", "windows-result", "windows-strings", ] @@ -6915,7 +6915,7 @@ version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -6924,7 +6924,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" dependencies = [ - "windows-link 0.1.1", + "windows-link 0.1.3", ] [[package]] @@ -7000,7 +7000,7 @@ version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ - "windows-link", + "windows-link 0.1.3", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", diff --git a/core/src/subgraph/registrar.rs b/core/src/subgraph/registrar.rs index 7c56e5dd583..7fd7bf7c0f2 100644 --- a/core/src/subgraph/registrar.rs +++ b/core/src/subgraph/registrar.rs @@ -173,24 +173,25 @@ where anyhow!("Failed to get subgraph assignment entity: {}", e) }) .map(|assigned| -> Box + Send> { + let logger = logger.new(o!("subgraph_id" => deployment.hash.to_string(), "node_id" => node_id.to_string())); if let Some((assigned,is_paused)) = assigned { if assigned == node_id { if is_paused{ // Subgraph is paused, so we don't start it - debug!(logger, "Deployment assignee is this node, but it is paused, so we don't start it"; "assigned_to" => assigned, "node_id" => &node_id,"paused" => is_paused); + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "paused" => is_paused, "action" => "ignore"); return Box::new(stream::empty()); } // Start subgraph on this node - debug!(logger, "Deployment assignee is this node, broadcasting add event"; "assigned_to" => assigned, "node_id" => &node_id); + debug!(logger, "Deployment assignee is this node"; "assigned_to" => assigned, "action" => "add"); Box::new(stream::once(Ok(AssignmentEvent::Add { deployment, node_id: node_id.clone(), }))) } else { // Ensure it is removed from this node - debug!(logger, "Deployment assignee is not this node, broadcasting remove event"; "assigned_to" => assigned, "node_id" => &node_id); + debug!(logger, "Deployment assignee is not this node"; "assigned_to" => assigned, "action" => "remove"); Box::new(stream::once(Ok(AssignmentEvent::Remove { deployment, node_id: node_id.clone(), @@ -198,7 +199,7 @@ where } } else { // Was added/updated, but is now gone. - debug!(logger, "Deployment has not assignee, we will get a separate remove event later"; "node_id" => &node_id); + debug!(logger, "Deployment assignee not found in database"; "action" => "ignore"); Box::new(stream::empty()) } }) diff --git a/node/src/launcher.rs b/node/src/launcher.rs index 4441131a52e..550da346861 100644 --- a/node/src/launcher.rs +++ b/node/src/launcher.rs @@ -399,7 +399,7 @@ pub async fn run( // Obtain metrics server port let metrics_port = opt.metrics_port; - info!(logger, "Starting up"); + info!(logger, "Starting up"; "node_id" => &node_id); // Set up metrics let (prometheus_registry, metrics_registry) = setup_metrics(&logger); diff --git a/store/postgres/src/notification_listener.rs b/store/postgres/src/notification_listener.rs index ecb7486daf2..583ef91479e 100644 --- a/store/postgres/src/notification_listener.rs +++ b/store/postgres/src/notification_listener.rs @@ -284,6 +284,7 @@ impl NotificationListener { } } } + warn!(logger, "Listener dropped. Terminating listener thread"); })) .unwrap_or_else(|_| std::process::exit(1)) }); diff --git a/store/postgres/src/store_events.rs b/store/postgres/src/store_events.rs index 83b8e3b069b..b9da04f30d7 100644 --- a/store/postgres/src/store_events.rs +++ b/store/postgres/src/store_events.rs @@ -92,15 +92,21 @@ pub struct SubscriptionManager { /// Keep the notification listener alive listener: StoreEventListener, + + logger: Logger, } impl SubscriptionManager { pub fn new(logger: Logger, postgres_url: String, registry: Arc) -> Self { - let (listener, store_events) = StoreEventListener::new(logger, postgres_url, registry); + let logger = logger.new(o!("component" => "StoreEventListener")); + + let (listener, store_events) = + StoreEventListener::new(logger.cheap_clone(), postgres_url, registry); let mut manager = SubscriptionManager { subscriptions: Arc::new(RwLock::new(HashMap::new())), listener, + logger, }; // Deal with store subscriptions @@ -112,6 +118,32 @@ impl SubscriptionManager { manager } + async fn broadcast_event( + logger: &Logger, + subscriptions: &Arc>>>>, + event: StoreEvent, + ) { + let event = Arc::new(event); + + // Send to `subscriptions`. + { + let senders = subscriptions.read().unwrap().clone(); + + // Write change to all matching subscription streams; remove subscriptions + // whose receiving end has been dropped + for (id, sender) in senders { + if let Err(e) = sender.send(event.cheap_clone()).await { + error!( + logger, + "Failed to send store event to subscriber {}: {}", id, e + ); + // Receiver was dropped + subscriptions.write().unwrap().remove(&id); + } + } + } + } + /// Receive store events from Postgres and send them to all active /// subscriptions. Detect stale subscriptions in the process and /// close them. @@ -121,24 +153,22 @@ impl SubscriptionManager { ) { let subscriptions = self.subscriptions.cheap_clone(); let mut store_events = store_events.compat(); + let logger = self.logger.cheap_clone(); // This channel is constantly receiving things and there are locks involved, // so it's best to use a blocking task. graph::spawn_blocking(async move { - while let Some(Ok(event)) = store_events.next().await { - let event = Arc::new(event); - - // Send to `subscriptions`. - { - let senders = subscriptions.read().unwrap().clone(); - - // Write change to all matching subscription streams; remove subscriptions - // whose receiving end has been dropped - for (id, sender) in senders { - if sender.send(event.cheap_clone()).await.is_err() { - // Receiver was dropped - subscriptions.write().unwrap().remove(&id); - } + loop { + match store_events.next().await { + Some(Ok(event)) => { + Self::broadcast_event(&logger, &subscriptions, event).await; + } + Some(Err(_)) => { + error!(logger, "Error receiving store event"); + } + None => { + error!(logger, "Store event stream ended"); + break; } } } @@ -147,6 +177,7 @@ impl SubscriptionManager { fn periodically_clean_up_stale_subscriptions(&self) { let subscriptions = self.subscriptions.cheap_clone(); + let logger = self.logger.cheap_clone(); // Clean up stale subscriptions every 5s graph::spawn(async move { @@ -169,6 +200,7 @@ impl SubscriptionManager { // Remove all stale subscriptions for id in stale_ids { + warn!(logger, "Removing stale subscription {}", id); subscriptions.remove(&id); } }