Skip to content

Commit 3163c5c

Browse files
author
Devdutt Shenoi
committed
trace: log steps of ingest and sync
1 parent 93949fd commit 3163c5c

File tree

9 files changed

+36
-6
lines changed

9 files changed

+36
-6
lines changed

src/event/mod.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use arrow_array::RecordBatch;
2424
use arrow_schema::{Field, Fields, Schema};
2525
use itertools::Itertools;
2626
use std::sync::Arc;
27-
use tracing::error;
27+
use tracing::{error, instrument};
2828

2929
use self::error::EventError;
3030
pub use self::writer::STREAM_WRITERS;
@@ -36,7 +36,7 @@ pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp";
3636
pub const DEFAULT_TAGS_KEY: &str = "p_tags";
3737
pub const DEFAULT_METADATA_KEY: &str = "p_metadata";
3838

39-
#[derive(Clone)]
39+
#[derive(Clone, Debug)]
4040
pub struct Event {
4141
pub stream_name: String,
4242
pub rb: RecordBatch,
@@ -51,6 +51,7 @@ pub struct Event {
5151

5252
// Events holds the schema related to a each event for a single log stream
5353
impl Event {
54+
#[instrument(level = "trace")]
5455
pub async fn process(&self) -> Result<(), EventError> {
5556
let mut key = get_schema_key(&self.rb.schema().fields);
5657
if self.time_partition.is_some() {
@@ -120,6 +121,7 @@ impl Event {
120121

121122
// event process all events after the 1st event. Concatenates record batches
122123
// and puts them in memory store for each event.
124+
#[instrument(level = "trace")]
123125
fn process_event(
124126
stream_name: &str,
125127
schema_key: &str,

src/event/writer/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ use chrono::NaiveDateTime;
3838
use chrono::Utc;
3939
use derive_more::{Deref, DerefMut};
4040
use once_cell::sync::Lazy;
41+
use tracing::debug;
4142

4243
pub static STREAM_WRITERS: Lazy<WriterTable> = Lazy::new(WriterTable::default);
4344

@@ -124,6 +125,9 @@ impl WriterTable {
124125
)?;
125126
}
126127
};
128+
129+
debug!("Successful append to local");
130+
127131
Ok(())
128132
}
129133

src/handlers/http/ingest.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ use http::StatusCode;
4343
use serde_json::Value;
4444
use std::collections::HashMap;
4545
use std::sync::Arc;
46+
use tracing::{debug, error, instrument, trace};
4647

4748
// Handler for POST /api/v1/ingest
4849
// ingests events by extracting stream name from header
4950
// creates if stream does not exist
51+
#[instrument(level = "trace")]
5052
pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
5153
if let Some((_, stream_name)) = req
5254
.headers()
@@ -66,6 +68,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostE
6668
flatten_and_push_logs(req, body, stream_name).await?;
6769
Ok(HttpResponse::Ok().finish())
6870
} else {
71+
error!("Ingestion request doesn't specify stream name");
6972
Err(PostError::Header(ParseHeaderError::MissingStreamName))
7073
}
7174
}
@@ -143,6 +146,7 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result<HttpRespo
143146
// Handler for POST /api/v1/logstream/{logstream}
144147
// only ingests events into the specified logstream
145148
// fails if the logstream does not exist
149+
#[instrument(level = "trace")]
146150
pub async fn post_event(req: HttpRequest, body: Bytes) -> Result<HttpResponse, PostError> {
147151
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
148152
let internal_stream_names = STREAM_INFO.list_internal_streams();
@@ -198,6 +202,7 @@ pub async fn create_stream_if_not_exists(
198202
let mut stream_exists = false;
199203
if STREAM_INFO.stream_exists(stream_name) {
200204
stream_exists = true;
205+
trace!("Stream: {stream_name} already exists");
201206
return Ok(stream_exists);
202207
}
203208

@@ -221,6 +226,8 @@ pub async fn create_stream_if_not_exists(
221226
)
222227
.await?;
223228

229+
debug!("Stream: {stream_name} created");
230+
224231
Ok(stream_exists)
225232
}
226233

src/handlers/http/modal/utils/ingest_utils.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use arrow_schema::Field;
2626
use bytes::Bytes;
2727
use chrono::{DateTime, NaiveDateTime, Utc};
2828
use serde_json::Value;
29+
use tracing::{debug, instrument, warn};
2930

3031
use crate::{
3132
event::{
@@ -56,7 +57,7 @@ pub async fn flatten_and_push_logs(
5657
json = otel::flatten_otel_logs(&body);
5758
}
5859
_ => {
59-
log::warn!("Unknown log source: {}", log_source);
60+
warn!("Unknown log source: {}", log_source);
6061
push_logs(stream_name.to_string(), req.clone(), body).await?;
6162
}
6263
}
@@ -70,6 +71,7 @@ pub async fn flatten_and_push_logs(
7071
Ok(())
7172
}
7273

74+
#[instrument(level = "trace")]
7375
pub async fn push_logs(
7476
stream_name: String,
7577
req: HttpRequest,
@@ -172,10 +174,13 @@ pub async fn push_logs(
172174
}
173175
}
174176

177+
debug!("Ingestion successful on stream: {stream_name}");
178+
175179
Ok(())
176180
}
177181

178182
#[allow(clippy::too_many_arguments)]
183+
#[instrument(level = "trace")]
179184
pub async fn create_process_record_batch(
180185
stream_name: String,
181186
req: HttpRequest,

src/storage/azure_blob.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,7 @@ pub fn to_object_store_path(path: &RelativePath) -> StorePath {
191191

192192
// ObjStoreClient is generic client to enable interactions with different cloudprovider's
193193
// object store such as S3 and Azure Blob
194+
#[derive(Debug)]
194195
pub struct BlobStore {
195196
client: LimitStore<MicrosoftAzure>,
196197
account: String,

src/storage/localfs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ impl ObjectStorageProvider for FSConfig {
8080
}
8181
}
8282

83+
#[derive(Debug)]
8384
pub struct LocalFS {
8485
// absolute path of the data directory
8586
root: PathBuf,

src/storage/object_storage.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R
4848
use itertools::Itertools;
4949
use relative_path::RelativePath;
5050
use relative_path::RelativePathBuf;
51-
use tracing::error;
51+
use tracing::{debug, error, instrument, trace};
5252

5353
use std::collections::BTreeMap;
5454
use std::{
@@ -67,7 +67,7 @@ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync
6767
}
6868

6969
#[async_trait]
70-
pub trait ObjectStorage: Send + Sync + 'static {
70+
pub trait ObjectStorage: std::fmt::Debug + Send + Sync + 'static {
7171
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError>;
7272
// TODO: make the filter function optional as we may want to get all objects
7373
async fn get_objects(
@@ -538,8 +538,10 @@ pub trait ObjectStorage: Send + Sync + 'static {
538538
Ok(Bytes::new())
539539
}
540540

541+
#[instrument(level = "debug")]
541542
async fn sync(&self, shutdown_signal: bool) -> Result<(), ObjectStorageError> {
542543
if !Path::new(&CONFIG.staging_dir()).exists() {
544+
trace!("Nothing to sync");
543545
return Ok(());
544546
}
545547

@@ -552,6 +554,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
552554
let cache_enabled = STREAM_INFO
553555
.get_cache_enabled(stream)
554556
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
557+
555558
let time_partition = STREAM_INFO
556559
.get_time_partition(stream)
557560
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
@@ -568,6 +571,8 @@ pub trait ObjectStorage: Send + Sync + 'static {
568571
)
569572
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
570573

574+
debug!("Arrow files compressed into parquet for stream: {stream}");
575+
571576
if let Some(schema) = schema {
572577
let static_schema_flag = STREAM_INFO
573578
.get_static_schema_flag(stream)
@@ -614,6 +619,8 @@ pub trait ObjectStorage: Send + Sync + 'static {
614619
if let Err(e) = self.upload_file(&stream_relative_path, &file).await {
615620
error!("Failed to upload file {}: {:?}", filename, e);
616621
continue; // Skip to the next file
622+
} else {
623+
debug!("Parquet file uploaded to s3 for stream: {stream}");
617624
}
618625

619626
let absolute_path = self
@@ -622,6 +629,7 @@ pub trait ObjectStorage: Send + Sync + 'static {
622629
let store = CONFIG.storage().get_object_store();
623630
let manifest =
624631
catalog::create_from_parquet_file(absolute_path.clone(), &file).unwrap();
632+
625633
catalog::update_snapshot(store, stream, manifest).await?;
626634
if cache_enabled && cache_manager.is_some() {
627635
cache_updates

src/storage/s3.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ fn to_object_store_path(path: &RelativePath) -> StorePath {
316316
StorePath::from(path.as_str())
317317
}
318318

319+
#[derive(Debug)]
319320
pub struct S3 {
320321
client: LimitStore<AmazonS3>,
321322
bucket: String,

src/storage/staging.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ use std::{
5050
process,
5151
sync::Arc,
5252
};
53-
use tracing::{error, info};
53+
use tracing::{error, info, instrument};
5454

5555
const ARROW_FILE_EXTENSION: &str = "data.arrows";
5656
// const PARQUET_FILE_EXTENSION: &str = "data.parquet";
@@ -223,6 +223,7 @@ impl StorageDir {
223223
// data_path.join(dir)
224224
// }
225225

226+
#[instrument(level = "debug")]
226227
pub fn convert_disk_files_to_parquet(
227228
stream: &str,
228229
dir: &StorageDir,

0 commit comments

Comments
 (0)