Skip to content

Commit d231b87

Browse files
committed
fix: label all log lines with level
1 parent 6ad98d2 commit d231b87

File tree

5 files changed

+25
-23
lines changed

5 files changed

+25
-23
lines changed

src/config.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,9 @@ pub fn assemble_config(paths: (std::path::PathBuf, Option<std::path::PathBuf>))
227227
let intervals = MetricIntervalsConfig::from_file(intervals_reference_file.clone());
228228
config.metrics.intervals.extend(intervals.intervals);
229229
} else {
230-
println!("Metric intervals config file not found at {intervals_reference_file:?}")
230+
println!(
231+
"ERROR: Metric intervals config file not found at {intervals_reference_file:?}"
232+
)
231233
}
232234
}
233235
if let Some(intervals) = metrics_path.map(MetricIntervalsConfig::from_file) {

src/loki_push.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,13 @@ pub async fn consumer_loop(
8989
.await
9090
{
9191
Ok(res) => {
92-
println!("Sent {open} logs to loki.");
92+
println!("DEBUG: Sent {open} logs to loki.");
9393
if !res.status().is_success() {
9494
let text = res
9595
.text()
9696
.await
9797
.unwrap_or("[Unable to decode response text!]".into());
98-
println!("Received error response: {text} ");
98+
println!("ERROR: Received error response: {text} ");
9999
}
100100
}
101101
Err(res) => {
@@ -104,7 +104,7 @@ pub async fn consumer_loop(
104104
};
105105
buffer.clear();
106106
}
107-
println!("Producer dropped, consumer exiting");
107+
println!("INFO: Producer dropped, consumer exiting");
108108
}
109109

110110
#[cfg(test)]

src/main.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,15 @@ fn config_paths() -> (std::path::PathBuf, Option<std::path::PathBuf>) {
3939
let args = Args::parse();
4040
if !args.config.exists() {
4141
println!(
42-
"Error, config file not found at {:?}",
42+
"ERROR: config file not found at {:?}",
4343
args.config.as_path()
4444
);
4545
exit(1)
4646
}
4747
if let Some(ref metric_path) = args.metrics_config {
4848
if !metric_path.exists() {
4949
println!(
50-
"Error, metrics config file not found at {:?}",
50+
"ERROR: metrics config file not found at {:?}",
5151
metric_path.as_path()
5252
);
5353
exit(1)
@@ -57,17 +57,17 @@ fn config_paths() -> (std::path::PathBuf, Option<std::path::PathBuf>) {
5757
}
5858

5959
async fn run_services(config: &'static IngestorConfig) {
60-
println!("Starting log ingestor with config: \n {:?}", &config);
60+
println!("INFO: Starting log ingestor with config: \n {:?}", &config);
6161

6262
let metrics = if config.enable_metrics {
63-
println!("Starting metrics task...");
63+
println!("DEBUG: Starting metrics task...");
6464
tokio::spawn(metrics_loop(config))
6565
} else {
6666
tokio::spawn(futures::future::ready(()))
6767
};
6868

6969
if config.enable_logging {
70-
println!("Starting log ingestor task...");
70+
println!("DEBUG: Starting log ingestor task...");
7171
let (tx, mut rx) = mpsc::unbounded_channel::<LogMsg>();
7272
let producer = tokio::spawn(producer_loop(tx, config));
7373
consumer_loop(&mut rx, config).await;
@@ -84,6 +84,6 @@ async fn main() {
8484
let config = Box::leak(Box::new(assemble_config(config_paths())));
8585
rustls::crypto::aws_lc_rs::default_provider()
8686
.install_default()
87-
.expect("Failed to install rustls crypto provider");
87+
.expect("ERROR: Failed to install rustls crypto provider");
8888
run_services(config).await
8989
}

src/metrics.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -262,7 +262,7 @@ async fn watchdog_loop(
262262
break;
263263
}
264264
interval.tick().await;
265-
println!("Watchdog checking {:?}", futs.lock().await.keys());
265+
println!("DEBUG: Watchdog checking {:?}", futs.lock().await.keys());
266266
let finished: Vec<String> = {
267267
futs.lock()
268268
.await
@@ -288,7 +288,7 @@ async fn watchdog_loop(
288288
}
289289
}
290290
}
291-
println!("Watchdog done.");
291+
println!("DEBUG: Watchdog done.");
292292
}
293293
}
294294

@@ -337,30 +337,30 @@ async fn consumer_loop(rx: &mut mpsc::UnboundedReceiver<TimeSeries>, config: Met
337337
{
338338
Ok(res) => {
339339
retries = 0;
340-
println!("Sent {open} metrics to Mimir.");
340+
println!("DEBUG: Sent {open} metrics to Mimir.");
341341
if !res.status().is_success() {
342342
let text = res
343343
.text()
344344
.await
345345
.unwrap_or("[Unable to decode response text!]".into());
346-
println!("Received error response: {text} ");
346+
println!("ERROR: Received error response: {text} ");
347347
}
348348
}
349349
Err(res) => {
350350
println!("ERROR: {res:?}");
351351
if retries == 3 {
352-
println!("Maximum retry attempts exceeded, exiting.");
352+
println!("ERROR: Maximum retry attempts exceeded, exiting.");
353353
exit(0x45); // Service unavailable
354354
}
355-
println!("Retrying in 5s.");
355+
println!("DEBUG: Retrying in 5s.");
356356
sleep(Duration::from_secs(5)).await;
357357
retries += 1;
358358
}
359359
};
360360
buffer.clear();
361361
proto_encoded_buffer.clear();
362362
}
363-
println!("Producer dropped, consumer exiting");
363+
println!("INFO: Producer dropped, consumer exiting");
364364
}
365365

366366
/// Main routine to start the metrics service

src/redis_logs.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -115,11 +115,11 @@ fn check_connection(
115115
) -> Result<(), ()> {
116116
if let Ok(key_exists) = redis_conn.exists::<&str, bool>(&LOGGING_ENDPOINT[0]) {
117117
if !key_exists {
118-
println!("Logging endpoint doesn't exist, exiting.");
118+
println!("ERROR: Logging endpoint doesn't exist, exiting.");
119119
return Err(());
120120
}
121121
} else {
122-
panic!("Something went wrong checking the logs endpoint")
122+
panic!("ERROR: Something went wrong checking the logs endpoint")
123123
}
124124
setup_consumer_group(redis_conn, &config);
125125
Ok(())
@@ -128,7 +128,7 @@ pub async fn producer_loop(tx: mpsc::UnboundedSender<LogMsg>, config: &'static I
128128
let mut redis_conn =
129129
create_redis_conn(&config.redis.url.full_url()).expect("Could not connect to Redis!");
130130
if check_connection(&mut redis_conn, &config).is_err() {
131-
println!("");
131+
println!("ERROR: could not correctly set up redis connection");
132132
return;
133133
}
134134

@@ -146,7 +146,7 @@ pub async fn producer_loop(tx: mpsc::UnboundedSender<LogMsg>, config: &'static I
146146

147147
for record in records {
148148
if tx.send(record).is_err() {
149-
println!("Receiver dropped, stopping...");
149+
println!("INFO: Receiver dropped, stopping...");
150150
break 'main;
151151
}
152152
}
@@ -157,10 +157,10 @@ pub async fn producer_loop(tx: mpsc::UnboundedSender<LogMsg>, config: &'static I
157157
{
158158
let new_conn = create_redis_conn(&config.redis.url.full_url());
159159
if new_conn.is_err() {
160-
println!("Error reading from redis, retrying connection in 1s");
160+
println!("ERROR: Error reading from redis, retrying connection in 1s");
161161
thread::sleep(Duration::from_millis(1000));
162162
} else {
163-
println!("Reconnected");
163+
println!("INFO: Reconnected to redis");
164164
let mut conn = new_conn.unwrap();
165165
if check_connection(&mut conn, &config).is_err() {
166166
return;

0 commit comments

Comments
 (0)