|
| 1 | +use crate::driver::{ |
| 2 | + constants::COLOCATE_CONTAINER_NAME, context::GroupContext, resource::AllocatedVm, |
| 3 | + universal_vm::UNIVERSAL_VMS_DIR, |
| 4 | +}; |
| 5 | +use anyhow::{Context, Result}; |
| 6 | +use serde::Deserialize; |
| 7 | +use slog::{debug, error, info, warn}; |
| 8 | +use std::collections::BTreeSet; |
| 9 | +use std::path::PathBuf; |
| 10 | +use std::time::Duration; |
| 11 | +use std::{collections::HashMap, net::Ipv6Addr}; |
| 12 | +use tokio::{ |
| 13 | + io::{AsyncBufReadExt, AsyncWriteExt, BufReader}, |
| 14 | + net::TcpSocket, |
| 15 | + runtime::Runtime, |
| 16 | +}; |
| 17 | +use walkdir::WalkDir; |
| 18 | + |
| 19 | +pub(crate) const UVMS_LOGS_STREAM_TASK_NAME: &str = "uvms_logs_stream"; |
| 20 | + |
| 21 | +const RETRY_DELAY_JOURNALD_STREAM: Duration = Duration::from_secs(5); |
| 22 | +const RETRY_DELAY_DISCOVER_UVMS: Duration = Duration::from_secs(5); |
| 23 | + |
| 24 | +pub(crate) fn uvms_logs_stream_task(group_ctx: GroupContext) -> () { |
| 25 | + let logger = group_ctx.logger().clone(); |
| 26 | + debug!(logger, ">>> {UVMS_LOGS_STREAM_TASK_NAME}"); |
| 27 | + let rt: Runtime = tokio::runtime::Builder::new_multi_thread() |
| 28 | + .worker_threads(1) |
| 29 | + .max_blocking_threads(1) |
| 30 | + .enable_all() |
| 31 | + .build() |
| 32 | + .unwrap_or_else(|err| panic!("Could not create tokio runtime: {err}")); |
| 33 | + let root_search_dir = { |
| 34 | + let root_env = group_ctx |
| 35 | + .clone() |
| 36 | + .get_root_env() |
| 37 | + .expect("root_env should already exist"); |
| 38 | + let base_path = root_env.base_path(); |
| 39 | + base_path |
| 40 | + .parent() |
| 41 | + .expect("root_env dir should have a parent dir") |
| 42 | + .to_path_buf() |
| 43 | + }; |
| 44 | + let mut streamed_uvms: HashMap<String, Ipv6Addr> = HashMap::new(); |
| 45 | + let mut skipped_uvms: BTreeSet<String> = BTreeSet::new(); |
| 46 | + loop { |
| 47 | + match discover_uvms(root_search_dir.clone()) { |
| 48 | + Ok(discovered_uvms) => { |
| 49 | + for (key, value) in discovered_uvms { |
| 50 | + if skipped_uvms.contains(&key) { |
| 51 | + continue; |
| 52 | + } |
| 53 | + |
| 54 | + let key_match = group_ctx |
| 55 | + .exclude_logs |
| 56 | + .iter() |
| 57 | + .any(|pattern| pattern.is_match(&key)); |
| 58 | + |
| 59 | + if key_match { |
| 60 | + debug!( |
| 61 | + logger, |
| 62 | + "Skipping journald streaming of [uvm={key}] because it was excluded by the `--exclude-logs` pattern" |
| 63 | + ); |
| 64 | + skipped_uvms.insert(key); |
| 65 | + continue; |
| 66 | + } |
| 67 | + |
| 68 | + streamed_uvms.entry(key.clone()).or_insert_with(|| { |
| 69 | + let logger = logger.clone(); |
| 70 | + info!( |
| 71 | + logger, |
| 72 | + "Streaming Journald for newly discovered [uvm={key}] with ipv6={value}" |
| 73 | + ); |
| 74 | + // The task starts, but the handle is never joined. |
| 75 | + rt.spawn(stream_journald_with_retries(logger, key.clone(), value)); |
| 76 | + value |
| 77 | + }); |
| 78 | + } |
| 79 | + } |
| 80 | + Err(err) => { |
| 81 | + warn!(logger, "Discovering deployed uvms failed with err:{err}"); |
| 82 | + } |
| 83 | + } |
| 84 | + std::thread::sleep(RETRY_DELAY_DISCOVER_UVMS); |
| 85 | + } |
| 86 | +} |
| 87 | + |
| 88 | +#[derive(Debug, Deserialize)] |
| 89 | +struct JournalRecord { |
| 90 | + #[serde(rename = "__CURSOR")] |
| 91 | + cursor: String, |
| 92 | + #[serde(rename = "MESSAGE")] |
| 93 | + message: String, |
| 94 | + #[serde(rename = "_SYSTEMD_UNIT")] |
| 95 | + system_unit: Option<String>, |
| 96 | + #[serde(rename = "CONTAINER_NAME")] |
| 97 | + container_name: Option<String>, |
| 98 | + #[serde(rename = "_COMM")] |
| 99 | + comm: Option<String>, |
| 100 | +} |
| 101 | + |
| 102 | +impl std::fmt::Display for JournalRecord { |
| 103 | + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { |
| 104 | + if let Some(ref container) = self.container_name |
| 105 | + && container == COLOCATE_CONTAINER_NAME |
| 106 | + { |
| 107 | + return write!(f, "TEST_LOG: {}", self.message); |
| 108 | + } |
| 109 | + let mut display = format!("message: \"{}\"", self.message); |
| 110 | + if let Some(x) = &self.system_unit { |
| 111 | + display += format!(", system_unit: \"{x}\"").as_str() |
| 112 | + } |
| 113 | + if let Some(x) = &self.container_name { |
| 114 | + display += format!(", container_name: \"{x}\"").as_str() |
| 115 | + } |
| 116 | + if let Some(x) = &self.comm { |
| 117 | + display += format!(", comm: \"{x}\"").as_str() |
| 118 | + } |
| 119 | + write!(f, "JournalRecord {{{display}}}") |
| 120 | + } |
| 121 | +} |
| 122 | + |
| 123 | +fn discover_uvms(root_path: PathBuf) -> Result<HashMap<String, Ipv6Addr>> { |
| 124 | + let mut uvms: HashMap<String, Ipv6Addr> = HashMap::new(); |
| 125 | + for entry in WalkDir::new(root_path) |
| 126 | + .into_iter() |
| 127 | + .filter_map(Result::ok) |
| 128 | + .filter(|e| { |
| 129 | + e.path() |
| 130 | + .to_str() |
| 131 | + .map(|p| p.contains(UNIVERSAL_VMS_DIR)) |
| 132 | + .unwrap_or(false) |
| 133 | + }) |
| 134 | + .filter(|e| { |
| 135 | + let file_name = String::from(e.file_name().to_string_lossy()); |
| 136 | + e.file_type().is_file() && file_name == "vm.json" |
| 137 | + }) |
| 138 | + .map(|e| e.path().to_owned()) |
| 139 | + { |
| 140 | + let file = |
| 141 | + std::fs::File::open(&entry).with_context(|| format!("Could not open: {:?}", &entry))?; |
| 142 | + let vm: AllocatedVm = serde_json::from_reader(file) |
| 143 | + .with_context(|| format!("{:?}: Could not read json.", &entry))?; |
| 144 | + uvms.insert(vm.name.to_string(), vm.ipv6); |
| 145 | + } |
| 146 | + Ok(uvms) |
| 147 | +} |
| 148 | + |
| 149 | +async fn stream_journald_with_retries(logger: slog::Logger, uvm_name: String, ipv6: Ipv6Addr) { |
| 150 | + // Start streaming Journald from the very beginning, which corresponds to the cursor="". |
| 151 | + let mut cursor = Cursor::Start; |
| 152 | + loop { |
| 153 | + // In normal scenarios, i.e. without errors/interrupts, the function below should never return. |
| 154 | + // In case it returns unexpectedly, we restart reading logs from the checkpoint cursor. |
| 155 | + let (cursor_next, result) = |
| 156 | + stream_journald_from_cursor(uvm_name.clone(), ipv6, cursor).await; |
| 157 | + cursor = cursor_next; |
| 158 | + if let Err(err) = result { |
| 159 | + error!( |
| 160 | + logger, |
| 161 | + "Streaming Journald for uvm={uvm_name} with ipv6={ipv6} failed with: {err}" |
| 162 | + ); |
| 163 | + } |
| 164 | + // Should we stop reading Journald here? |
| 165 | + warn!( |
| 166 | + logger, |
| 167 | + "All entries of Journald are read to completion. Streaming Journald will start again in {} sec ...", |
| 168 | + RETRY_DELAY_JOURNALD_STREAM.as_secs() |
| 169 | + ); |
| 170 | + tokio::time::sleep(RETRY_DELAY_JOURNALD_STREAM).await; |
| 171 | + } |
| 172 | +} |
| 173 | + |
| 174 | +enum Cursor { |
| 175 | + Start, |
| 176 | + Position(String), |
| 177 | +} |
| 178 | + |
| 179 | +impl std::fmt::Display for Cursor { |
| 180 | + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { |
| 181 | + match self { |
| 182 | + Cursor::Start => write!(f, ""), |
| 183 | + Cursor::Position(x) => write!(f, "{x}"), |
| 184 | + } |
| 185 | + } |
| 186 | +} |
| 187 | + |
| 188 | +macro_rules! unwrap_or_return { |
| 189 | + ( $val1:expr_2021, $val2:expr_2021 ) => { |
| 190 | + match $val2 { |
| 191 | + Ok(x) => x, |
| 192 | + Err(x) => return ($val1, Err(x.into())), |
| 193 | + } |
| 194 | + }; |
| 195 | +} |
| 196 | + |
| 197 | +async fn stream_journald_from_cursor( |
| 198 | + uvm_name: String, |
| 199 | + ipv6: Ipv6Addr, |
| 200 | + mut cursor: Cursor, |
| 201 | +) -> (Cursor, anyhow::Result<()>) { |
| 202 | + let socket_addr = std::net::SocketAddr::new(ipv6.into(), 19531); |
| 203 | + let socket = unwrap_or_return!(cursor, TcpSocket::new_v6()); |
| 204 | + let mut stream = unwrap_or_return!(cursor, socket.connect(socket_addr).await); |
| 205 | + unwrap_or_return!( |
| 206 | + cursor, |
| 207 | + stream.write_all(b"GET /entries?follow HTTP/1.1\n").await |
| 208 | + ); |
| 209 | + unwrap_or_return!( |
| 210 | + cursor, |
| 211 | + stream.write_all(b"Accept: application/json\n").await |
| 212 | + ); |
| 213 | + unwrap_or_return!( |
| 214 | + cursor, |
| 215 | + stream |
| 216 | + .write_all(format!("Host: {ipv6}:19531\n").as_bytes()) |
| 217 | + .await |
| 218 | + ); |
| 219 | + unwrap_or_return!( |
| 220 | + cursor, |
| 221 | + stream |
| 222 | + .write_all(format!("Range: entries={cursor}\n\r\n\r").as_bytes()) |
| 223 | + .await |
| 224 | + ); |
| 225 | + let buf_reader = BufReader::new(stream); |
| 226 | + let mut lines = buf_reader.lines(); |
| 227 | + while let Some(line) = unwrap_or_return!(cursor, lines.next_line().await) { |
| 228 | + let record_result: Result<JournalRecord, serde_json::Error> = serde_json::from_str(&line); |
| 229 | + if let Ok(record) = record_result { |
| 230 | + println!("[uvm={uvm_name}] {record}"); |
| 231 | + // We update the cursor value, so that in case function errors, journald entries can be streamed from this checkpoint. |
| 232 | + cursor = Cursor::Position(record.cursor); |
| 233 | + } |
| 234 | + } |
| 235 | + (cursor, Ok(())) |
| 236 | +} |
0 commit comments