Skip to content

Commit 1614cb7

Browse files
author
Devdutt Shenoi
authored
tracing: subscribe to span creation events, log important ingestion and sync steps (#1039)
* refactor: `log` ~> `tracing` (#1013) Using tracing will allow us to capture span related info in addition to the event based context that we already capture --------- Signed-off-by: Devdutt Shenoi <[email protected]> * note span creation and deletion * trace: log steps of ingest and sync --------- Signed-off-by: Devdutt Shenoi <[email protected]>
1 parent b31d46c commit 1614cb7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+301
-265
lines changed

Cargo.lock

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,6 @@ hostname = "0.4.0"
6262
http = "0.2.7"
6363
humantime-serde = "1.1"
6464
itertools = "0.13.0"
65-
log = "0.4"
6665
num_cpus = "1.15"
6766
once_cell = "1.17.1"
6867
prometheus = { version = "0.13", features = ["process"] }
@@ -105,6 +104,7 @@ path-clean = "1.0.1"
105104
prost = "0.13.3"
106105
prometheus-parse = "0.2.5"
107106
sha2 = "0.10.8"
107+
tracing = "0.1.41"
108108

109109
[build-dependencies]
110110
cargo_toml = "0.20.1"

src/alerts/target.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use chrono::Utc;
2828
use http::{header::AUTHORIZATION, HeaderMap, HeaderValue};
2929
use humantime_serde::re::humantime;
3030
use reqwest::ClientBuilder;
31+
use tracing::error;
3132

3233
use crate::utils::json;
3334

@@ -239,7 +240,7 @@ impl CallableTarget for SlackWebHook {
239240
};
240241

241242
if let Err(e) = client.post(&self.endpoint).json(&alert).send().await {
242-
log::error!("Couldn't make call to webhook, error: {}", e)
243+
error!("Couldn't make call to webhook, error: {}", e)
243244
}
244245
}
245246
}
@@ -277,7 +278,7 @@ impl CallableTarget for OtherWebHook {
277278
.headers((&self.headers).try_into().expect("valid_headers"));
278279

279280
if let Err(e) = request.body(alert).send().await {
280-
log::error!("Couldn't make call to webhook, error: {}", e)
281+
error!("Couldn't make call to webhook, error: {}", e)
281282
}
282283
}
283284
}
@@ -356,7 +357,7 @@ impl CallableTarget for AlertManager {
356357
};
357358

358359
if let Err(e) = client.post(&self.endpoint).json(&alerts).send().await {
359-
log::error!("Couldn't make call to alertmanager, error: {}", e)
360+
error!("Couldn't make call to alertmanager, error: {}", e)
360361
}
361362
}
362363
}

src/analytics.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use std::collections::HashMap;
3737
use std::sync::Mutex;
3838
use std::time::Duration;
3939
use sysinfo::System;
40+
use tracing::{error, info};
4041
use ulid::Ulid;
4142

4243
const ANALYTICS_SERVER_URL: &str = "https://analytics.parseable.io:80";
@@ -291,7 +292,7 @@ async fn build_metrics() -> HashMap<String, Value> {
291292
}
292293

293294
pub fn init_analytics_scheduler() -> anyhow::Result<()> {
294-
log::info!("Setting up schedular for anonymous user analytics");
295+
info!("Setting up schedular for anonymous user analytics");
295296

296297
let mut scheduler = AsyncScheduler::new();
297298
scheduler
@@ -302,7 +303,7 @@ pub fn init_analytics_scheduler() -> anyhow::Result<()> {
302303
.unwrap_or_else(|err| {
303304
// panicing because seperate thread
304305
// TODO: a better way to handle this
305-
log::error!("Error while sending analytics: {}", err.to_string());
306+
error!("Error while sending analytics: {}", err.to_string());
306307
panic!("{}", err.to_string());
307308
})
308309
.send()

src/catalog/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use bytes::Bytes;
3737
use chrono::{DateTime, Local, NaiveTime, Utc};
3838
use relative_path::RelativePathBuf;
3939
use std::io::Error as IOError;
40+
use tracing::{error, info};
4041
pub mod column;
4142
pub mod manifest;
4243
pub mod snapshot;
@@ -280,7 +281,7 @@ async fn create_manifest(
280281
};
281282
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
282283
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
283-
log::error!(
284+
error!(
284285
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
285286
stream_name
286287
);
@@ -360,7 +361,7 @@ pub async fn get_first_event(
360361
let manifests = meta_clone.snapshot.manifest_list;
361362
let time_partition = meta_clone.time_partition;
362363
if manifests.is_empty() {
363-
log::info!("No manifest found for stream {stream_name}");
364+
info!("No manifest found for stream {stream_name}");
364365
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
365366
}
366367
let manifest = &manifests[0];
@@ -400,7 +401,7 @@ pub async fn get_first_event(
400401
handlers::http::cluster::get_ingestor_info()
401402
.await
402403
.map_err(|err| {
403-
log::error!("Fatal: failed to get ingestor info: {:?}", err);
404+
error!("Fatal: failed to get ingestor info: {:?}", err);
404405
ObjectStorageError::from(err)
405406
})?;
406407
let mut ingestors_first_event_at: Vec<String> = Vec::new();

src/event/format/json.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,10 @@ use datafusion::arrow::util::bit_util::round_upto_multiple_of_64;
2727
use itertools::Itertools;
2828
use serde_json::Value;
2929
use std::{collections::HashMap, sync::Arc};
30+
use tracing::error;
3031

3132
use super::{EventFormat, Metadata, Tags};
32-
use crate::utils::{arrow::get_field, json::flatten_json_body};
33+
use crate::utils::{arrow::get_field, json};
3334

3435
pub struct Event {
3536
pub data: Value,
@@ -48,7 +49,7 @@ impl EventFormat for Event {
4849
static_schema_flag: Option<String>,
4950
time_partition: Option<String>,
5051
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
51-
let data = flatten_json_body(self.data, None, None, None, false)?;
52+
let data = json::flatten::flatten(self.data, "_", None, None, None, false)?;
5253
let stream_schema = schema;
5354

5455
// incoming event may be a single json or a json array
@@ -224,7 +225,7 @@ fn valid_type(data_type: &DataType, value: &Value) -> bool {
224225
}
225226
DataType::Timestamp(_, _) => value.is_string() || value.is_number(),
226227
_ => {
227-
log::error!("Unsupported datatype {:?}, value {:?}", data_type, value);
228+
error!("Unsupported datatype {:?}, value {:?}", data_type, value);
228229
unreachable!()
229230
}
230231
}

src/event/format/mod.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use arrow_array::{RecordBatch, StringArray};
2424
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2525
use chrono::DateTime;
2626
use serde_json::Value;
27+
use tracing::{debug, error};
2728

2829
use crate::utils::{self, arrow::get_field};
2930

@@ -59,23 +60,17 @@ pub trait EventFormat: Sized {
5960
time_partition.clone(),
6061
)?;
6162

62-
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
63-
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
64-
};
65-
66-
if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
67-
return Err(anyhow!(
68-
"field {} is a reserved field",
69-
DEFAULT_METADATA_KEY
70-
));
71-
};
72-
73-
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
74-
return Err(anyhow!(
75-
"field {} is a reserved field",
76-
DEFAULT_TIMESTAMP_KEY
77-
));
78-
};
63+
for reserved_field in [
64+
DEFAULT_TAGS_KEY,
65+
DEFAULT_METADATA_KEY,
66+
DEFAULT_TIMESTAMP_KEY,
67+
] {
68+
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
69+
let msg = format!("{} is a reserved field", reserved_field);
70+
error!("{msg}");
71+
return Err(anyhow!(msg));
72+
}
73+
}
7974

8075
// add the p_timestamp field to the event schema to the 0th index
8176
schema.insert(
@@ -100,7 +95,9 @@ pub trait EventFormat: Sized {
10095
// prepare the record batch and new fields to be added
10196
let mut new_schema = Arc::new(Schema::new(schema));
10297
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
103-
return Err(anyhow!("Schema mismatch"));
98+
let msg = "Schema mismatch";
99+
error!("{msg}");
100+
return Err(anyhow!(msg));
104101
}
105102
new_schema = update_field_type_in_schema(new_schema, None, time_partition, None);
106103
let rb = Self::decode(data, new_schema.clone())?;
@@ -269,6 +266,10 @@ pub fn update_data_type_to_datetime(
269266
if let Value::Object(map) = &value {
270267
if let Some(Value::String(s)) = map.get(field.name()) {
271268
if DateTime::parse_from_rfc3339(s).is_ok() {
269+
debug!(
270+
"Field type updated to timestamp from string: {}",
271+
field.name()
272+
);
272273
// Update the field's data type to Timestamp
273274
return Field::new(
274275
field.name().clone(),

src/event/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use arrow_array::RecordBatch;
2424
use arrow_schema::{Field, Fields, Schema};
2525
use itertools::Itertools;
2626
use std::sync::Arc;
27+
use tracing::{error, instrument};
2728

2829
use self::error::EventError;
2930
pub use self::writer::STREAM_WRITERS;
@@ -35,7 +36,7 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3536
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
3637
pub const DEFAULT_METADATA_KEY: &str = "p_metadata";
3738

38-
#[derive(Clone)]
39+
#[derive(Clone, Debug)]
3940
pub struct Event {
4041
pub stream_name: String,
4142
pub rb: RecordBatch,
@@ -50,6 +51,7 @@ pub struct Event {
5051

5152
// Events holds the schema related to a each event for a single log stream
5253
impl Event {
54+
#[instrument(level = "trace")]
5355
pub async fn process(&self) -> Result<(), EventError> {
5456
let mut key = get_schema_key(&self.rb.schema().fields);
5557
if self.time_partition.is_some() {
@@ -93,7 +95,7 @@ impl Event {
9395
.check_alerts(&self.stream_name, &self.rb)
9496
.await
9597
{
96-
log::error!("Error checking for alerts. {:?}", e);
98+
error!("Error checking for alerts. {:?}", e);
9799
}
98100

99101
Ok(())
@@ -119,6 +121,7 @@ impl Event {
119121

120122
// event process all events after the 1st event. Concatenates record batches
121123
// and puts them in memory store for each event.
124+
#[instrument(level = "trace")]
122125
fn process_event(
123126
stream_name: &str,
124127
schema_key: &str,

src/event/writer/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use chrono::NaiveDateTime;
3838
use chrono::Utc;
3939
use derive_more::{Deref, DerefMut};
4040
use once_cell::sync::Lazy;
41+
use tracing::debug;
4142

4243
pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
4344

@@ -124,6 +125,8 @@ impl WriterTable {
124125
)?;
125126
}
126127
};
128+
debug!("Successful append to local");
129+
127130
Ok(())
128131
}
129132

src/handlers/airplane.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use serde_json::json;
2626
use std::net::SocketAddr;
2727
use std::time::Instant;
2828
use tonic::codec::CompressionEncoding;
29+
use tracing::{error, info};
2930

3031
use futures_util::{Future, TryFutureExt};
3132

@@ -135,7 +136,7 @@ impl FlightService for AirServiceImpl {
135136

136137
let ticket = get_query_from_ticket(&req)?;
137138

138-
log::info!("query requested to airplane: {:?}", ticket);
139+
info!("query requested to airplane: {:?}", ticket);
139140

140141
// get the query session_state
141142
let session_state = QUERY_SESSION.state();
@@ -145,7 +146,7 @@ impl FlightService for AirServiceImpl {
145146
.create_logical_plan(&ticket.query)
146147
.await
147148
.map_err(|err| {
148-
log::error!("Datafusion Error: Failed to create logical plan: {}", err);
149+
error!("Datafusion Error: Failed to create logical plan: {}", err);
149150
Status::internal("Failed to create logical plan")
150151
})?;
151152

@@ -269,7 +270,7 @@ impl FlightService for AirServiceImpl {
269270
)
270271
.await
271272
{
272-
log::error!("{}", err);
273+
error!("{}", err);
273274
};
274275

275276
/*

0 commit comments

Comments
 (0)