Skip to content

Commit a551bf2

Browse files
author
Devdutt Shenoi
committed
trace: log steps of ingest and sync
1 parent 93949fd commit a551bf2

File tree

13 files changed

+69
-47
lines changed

13 files changed

+69
-47
lines changed

src/event/format/json.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::{collections::HashMap, sync::Arc};
3030
use tracing::error;
3131

3232
use super::{EventFormat, Metadata, Tags};
33-
use crate::utils::{arrow::get_field, json::flatten_json_body};
33+
use crate::utils::{arrow::get_field, json};
3434

3535
pub struct Event {
3636
pub data: Value,
@@ -49,7 +49,7 @@ impl EventFormat for Event {
4949
static_schema_flag: Option<String>,
5050
time_partition: Option<String>,
5151
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
52-
let data = flatten_json_body(self.data, None, None, None, false)?;
52+
let data = json::flatten::flatten(self.data, "_", None, None, None, false)?;
5353
let stream_schema = schema;
5454

5555
// incoming event may be a single json or a json array

src/event/format/mod.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use arrow_array::{RecordBatch, StringArray};
2424
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2525
use chrono::DateTime;
2626
use serde_json::Value;
27+
use tracing::{debug, error};
2728

2829
use crate::utils::{self, arrow::get_field};
2930

@@ -59,23 +60,17 @@ pub trait EventFormat: Sized {
5960
time_partition.clone(),
6061
)?;
6162

62-
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
63-
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
64-
};
65-
66-
if get_field(&schema, DEFAULT_METADATA_KEY).is_some() {
67-
return Err(anyhow!(
68-
"field {} is a reserved field",
69-
DEFAULT_METADATA_KEY
70-
));
71-
};
72-
73-
if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
74-
return Err(anyhow!(
75-
"field {} is a reserved field",
76-
DEFAULT_TIMESTAMP_KEY
77-
));
78-
};
63+
for reserved_field in [
64+
DEFAULT_TAGS_KEY,
65+
DEFAULT_METADATA_KEY,
66+
DEFAULT_TIMESTAMP_KEY,
67+
] {
68+
if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
69+
let msg = format!("{} is a reserved field", reserved_field);
70+
error!("{msg}");
71+
return Err(anyhow!(msg));
72+
}
73+
}
7974

8075
// add the p_timestamp field to the event schema to the 0th index
8176
schema.insert(
@@ -100,7 +95,9 @@ pub trait EventFormat: Sized {
10095
// prepare the record batch and new fields to be added
10196
let mut new_schema = Arc::new(Schema::new(schema));
10297
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
103-
return Err(anyhow!("Schema mismatch"));
98+
let msg = "Schema mismatch";
99+
error!("{msg}");
100+
return Err(anyhow!(msg));
104101
}
105102
new_schema = update_field_type_in_schema(new_schema, None, time_partition, None);
106103
let rb = Self::decode(data, new_schema.clone())?;
@@ -269,6 +266,10 @@ pub fn update_data_type_to_datetime(
269266
if let Value::Object(map) = &value {
270267
if let Some(Value::String(s)) = map.get(field.name()) {
271268
if DateTime::parse_from_rfc3339(s).is_ok() {
269+
debug!(
270+
"Field type updated to timestamp from string: {}",
271+
field.name()
272+
);
272273
// Update the field's data type to Timestamp
273274
return Field::new(
274275
field.name().clone(),

src/event/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_array::RecordBatch;
2424
use arrow_schema::{Field, Fields, Schema};
2525
use itertools::Itertools;
2626
use std::sync::Arc;
27-
use tracing::error;
27+
use tracing::{error, instrument};
2828

2929
use self::error::EventError;
3030
pub use self::writer::STREAM_WRITERS;
@@ -36,7 +36,7 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3636
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
3737
pub const DEFAULT_METADATA_KEY: &str = "p_metadata";
3838

39-
#[derive(Clone)]
39+
#[derive(Clone, Debug)]
4040
pub struct Event {
4141
pub stream_name: String,
4242
pub rb: RecordBatch,
@@ -51,6 +51,7 @@ pub struct Event {
5151

5252
// Events holds the schema related to a each event for a single log stream
5353
impl Event {
54+
#[instrument(level = "trace")]
5455
pub async fn process(&self) -> Result<(), EventError> {
5556
let mut key = get_schema_key(&self.rb.schema().fields);
5657
if self.time_partition.is_some() {
@@ -120,6 +121,7 @@ impl Event {
120121

121122
// event process all events after the 1st event. Concatenates record batches
122123
// and puts them in memory store for each event.
124+
#[instrument(level = "trace")]
123125
fn process_event(
124126
stream_name: &str,
125127
schema_key: &str,

src/event/writer/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use chrono::NaiveDateTime;
3838
use chrono::Utc;
3939
use derive_more::{Deref, DerefMut};
4040
use once_cell::sync::Lazy;
41+
use tracing::debug;
4142

4243
pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
4344

@@ -124,6 +125,8 @@ impl WriterTable {
124125
)?;
125126
}
126127
};
128+
debug!("Successful append to local");
129+
127130
Ok(())
128131
}
129132

src/handlers/http/ingest.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ use http::StatusCode;
4343
use serde_json::Value;
4444
use std::collections::HashMap;
4545
use std::sync::Arc;
46+
use tracing::{debug, error, instrument, trace};
4647

4748
// Handler for POST /api/v1/ingest
4849
// ingests events by extracting stream name from header
4950
// creates if stream does not exist
51+
#[instrument(level = "trace")]
5052
pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
5153
if let Some((_, stream_name)) = req
5254
.headers()
@@ -66,6 +68,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
6668
flatten_and_push_logs(req, body, stream_name).await?;
6769
Ok(HttpResponse::Ok().finish())
6870
} else {
71+
error!("Ingestion request doesn't specify stream name");
6972
Err(PostError::Header(ParseHeaderError::MissingStreamName))
7073
}
7174
}
@@ -143,6 +146,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
143146
// Handler for POST /api/v1/logstream/{logstream}
144147
// only ingests events into the specified logstream
145148
// fails if the logstream does not exist
149+
#[instrument(level = "trace")]
146150
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
147151
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
148152
let internal_stream_names = STREAM_INFO.list_internal_streams();
@@ -198,6 +202,7 @@ pub async fn create_stream_if_not_exists(
198202
let mut stream_exists = false;
199203
if STREAM_INFO.stream_exists(stream_name) {
200204
stream_exists = true;
205+
trace!("Stream: {stream_name} already exists");
201206
return Ok(stream_exists);
202207
}
203208

@@ -220,6 +225,7 @@ pub async fn create_stream_if_not_exists(
220225
stream_type,
221226
)
222227
.await?;
228+
debug!("Stream: {stream_name} created");
223229

224230
Ok(stream_exists)
225231
}

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use arrow_schema::Field;
2626
use bytes::Bytes;
2727
use chrono::{DateTime, NaiveDateTime, Utc};
2828
use serde_json::Value;
29+
use tracing::{debug, instrument, warn};
2930

3031
use crate::{
3132
event::{
@@ -56,7 +57,7 @@ pub async fn flatten_and_push_logs(
5657
json = otel::flatten_otel_logs(&body);
5758
}
5859
_ => {
59-
log::warn!("Unknown log source: {}", log_source);
60+
warn!("Unknown log source: {}", log_source);
6061
push_logs(stream_name.to_string(), req.clone(), body).await?;
6162
}
6263
}
@@ -70,6 +71,7 @@ pub async fn flatten_and_push_logs(
7071
Ok(())
7172
}
7273

74+
#[instrument(level = "trace")]
7375
pub async fn push_logs(
7476
stream_name: String,
7577
req: HttpRequest,
@@ -172,10 +174,13 @@ pub async fn push_logs(
172174
}
173175
}
174176

177+
debug!("Ingestion successful on stream: {stream_name}");
178+
175179
Ok(())
176180
}
177181

178182
#[allow(clippy::too_many_arguments)]
183+
#[instrument(level = "trace")]
179184
pub async fn create_process_record_batch(
180185
stream_name: String,
181186
req: HttpRequest,

src/storage/azure_blob.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ pub fn to_object_store_path(path: &RelativePath) -> StorePath {
191191

192192
// ObjStoreClient is generic client to enable interactions with different cloudprovider's
193193
// object store such as S3 and Azure Blob
194+
#[derive(Debug)]
194195
pub struct BlobStore {
195196
client: LimitStore<MicrosoftAzure>,
196197
account: String,

src/storage/localfs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ impl ObjectStorageProvider for FSConfig {
8080
}
8181
}
8282

83+
#[derive(Debug)]
8384
pub struct LocalFS {
8485
// absolute path of the data directory
8586
root: PathBuf,

src/storage/object_storage.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R
4848
use itertools::Itertools;
4949
use relative_path::RelativePath;
5050
use relative_path::RelativePathBuf;
51-
use tracing::error;
51+
use tracing::{debug, error, instrument, trace};
5252

5353
use std::collections::BTreeMap;
5454
use std::{
@@ -67,7 +67,7 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync
6767
}
6868

6969
#[async_trait]
70-
pub trait ObjectStorage: Send + Sync + 'static {
70+
pub trait ObjectStorage: std::fmt::Debug + Send + Sync + 'static {
7171
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError>;
7272
// TODO: make the filter function optional as we may want to get all objects
7373
async fn get_objects(
@@ -389,8 +389,10 @@ pub trait ObjectStorage: Send + Sync + 'static {
389389
)),
390390
Err(err) => {
391391
if matches!(err, ObjectStorageError::NoSuchKey(_)) {
392+
debug!("Couldn't find manifest file: {path}");
392393
Ok(None)
393394
} else {
395+
error!("Couldn't get the manifest file: {err}");
394396
Err(err)
395397
}
396398
}
@@ -538,8 +540,10 @@ pub trait ObjectStorage: Send + Sync + 'static {
538540
Ok(Bytes::new())
539541
}
540542

543+
#[instrument(level = "debug")]
541544
async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> {
542545
if !Path::new(&CONFIG.staging_dir()).exists() {
546+
trace!("Nothing to sync");
543547
return Ok(());
544548
}
545549

@@ -552,6 +556,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
552556
let cache_enabled = STREAM_INFO
553557
.get_cache_enabled(stream)
554558
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
559+
555560
let time_partition = STREAM_INFO
556561
.get_time_partition(stream)
557562
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
@@ -568,12 +573,17 @@ pub trait ObjectStorage: Send + Sync + 'static {
568573
)
569574
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
570575

576+
debug!("Arrow files compressed into parquet for stream: {stream}");
577+
571578
if let Some(schema) = schema {
572579
let static_schema_flag = STREAM_INFO
573580
.get_static_schema_flag(stream)
574581
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
575582
if static_schema_flag.is_none() {
576-
commit_schema_to_storage(stream, schema).await?;
583+
if let Err(err) = commit_schema_to_storage(stream, schema).await {
584+
error!("Error while committing schema to storage: {err}");
585+
return Err(err);
586+
}
577587
}
578588
}
579589

@@ -614,6 +624,8 @@ pub trait ObjectStorage: Send + Sync + 'static {
614624
if let Err(e) = self.upload_file(&stream_relative_path, &file).await {
615625
error!("Failed to upload file {}: {:?}", filename, e);
616626
continue; // Skip to the next file
627+
} else {
628+
debug!("Parquet file uploaded to s3 for stream: {stream}");
617629
}
618630

619631
let absolute_path = self
@@ -622,7 +634,10 @@ pub trait ObjectStorage: Send + Sync + 'static {
622634
let store = CONFIG.storage().get_object_store();
623635
let manifest =
624636
catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap();
625-
catalog::update_snapshot(store, stream, manifest).await?;
637+
638+
if let Err(err) = catalog::update_snapshot(store, stream, manifest).await {
639+
error!("Error while updating snapshot: {err}");
640+
}
626641
if cache_enabled && cache_manager.is_some() {
627642
cache_updates
628643
.entry(stream)

src/storage/s3.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ fn to_object_store_path(path: &RelativePath) -> StorePath {
316316
StorePath::from(path.as_str())
317317
}
318318

319+
#[derive(Debug)]
319320
pub struct S3 {
320321
client: LimitStore<AmazonS3>,
321322
bucket: String,

0 commit comments

Comments
 (0)