Skip to content

Commit af53a19

Browse files
committed
Fixed the issue
1 parent 160c70d commit af53a19

File tree

3 files changed

+199
-58
lines changed

3 files changed

+199
-58
lines changed

src/ingest.rs

Lines changed: 123 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
1-
// ingest.rs
2-
3-
use std::{
4-
collections::HashMap,
5-
sync::{Arc, RwLock},
6-
};
1+
use std::collections::HashMap;
2+
use std::sync::{Arc, RwLock};
73

84
use actix_web::{get, post, web, HttpResponse, Responder};
5+
use datafusion::arrow::record_batch::RecordBatch;
96
use futures::future::join_all;
107
use serde::Deserialize;
11-
use serde_json::json;
8+
use serde_json::{json, Value};
129
use tracing::{error, info};
1310

1411
use crate::{
@@ -697,4 +694,123 @@ pub async fn queue_length(queue: web::Data<Arc<PersistentQueue>>) -> impl Respon
697694
HttpResponse::InternalServerError().body("Error getting queue length")
698695
}
699696
}
697+
}
698+
699+
#[get("/data")]
700+
pub async fn get_all_data(db: web::Data<Arc<Database>>) -> impl Responder {
701+
let query = "SELECT projectId, id, timestamp, traceId, spanId, eventType, durationNs FROM telemetry_events";
702+
match db.query(query).await {
703+
Ok(df) => {
704+
match df.collect().await {
705+
Ok(batches) => {
706+
let json_rows = record_batches_to_json_rows(&batches).unwrap_or_default();
707+
HttpResponse::Ok().json(json_rows)
708+
}
709+
Err(e) => {
710+
error!("Error collecting data: {:?}", e);
711+
HttpResponse::InternalServerError().body("Error collecting data")
712+
}
713+
}
714+
}
715+
Err(e) => {
716+
error!("Error querying data: {:?}", e);
717+
HttpResponse::InternalServerError().body("Error querying data")
718+
}
719+
}
720+
}
721+
722+
#[get("/data/{id}")]
723+
pub async fn get_data_by_id(
724+
path: web::Path<String>,
725+
db: web::Data<Arc<Database>>,
726+
) -> impl Responder {
727+
let id = path.into_inner();
728+
let query = format!(
729+
"SELECT projectId, id, timestamp, traceId, spanId, eventType, durationNs FROM telemetry_events WHERE id = '{}'",
730+
id
731+
);
732+
match db.query(&query).await {
733+
Ok(df) => {
734+
match df.collect().await {
735+
Ok(batches) => {
736+
let json_rows = record_batches_to_json_rows(&batches).unwrap_or_default();
737+
if json_rows.is_empty() {
738+
HttpResponse::NotFound().body(format!("No data found for id: {}", id))
739+
} else {
740+
HttpResponse::Ok().json(json_rows)
741+
}
742+
}
743+
Err(e) => {
744+
error!("Error collecting data for id {}: {:?}", id, e);
745+
HttpResponse::InternalServerError().body("Error collecting data")
746+
}
747+
}
748+
}
749+
Err(e) => {
750+
error!("Error querying data for id {}: {:?}", id, e);
751+
HttpResponse::InternalServerError().body("Error querying data")
752+
}
753+
}
754+
}
755+
756+
pub fn record_batches_to_json_rows(batches: &[RecordBatch]) -> Result<Vec<Value>, anyhow::Error> {
757+
let mut rows = Vec::new();
758+
for batch in batches {
759+
let schema = batch.schema();
760+
let num_rows = batch.num_rows();
761+
for row_idx in 0..num_rows {
762+
let mut row = json!({});
763+
for (col_idx, field) in schema.fields().iter().enumerate() {
764+
let column = batch.column(col_idx);
765+
let value = if column.is_null(row_idx) {
766+
Value::Null
767+
} else {
768+
match column.data_type() {
769+
datafusion::arrow::datatypes::DataType::Int32 => {
770+
column
771+
.as_any()
772+
.downcast_ref::<datafusion::arrow::array::Int32Array>()
773+
.map_or(Value::Null, |arr| Value::Number(arr.value(row_idx).into()))
774+
}
775+
datafusion::arrow::datatypes::DataType::Int64 => {
776+
column
777+
.as_any()
778+
.downcast_ref::<datafusion::arrow::array::Int64Array>()
779+
.map_or(Value::Null, |arr| Value::Number(arr.value(row_idx).into()))
780+
}
781+
datafusion::arrow::datatypes::DataType::Float64 => {
782+
column
783+
.as_any()
784+
.downcast_ref::<datafusion::arrow::array::Float64Array>()
785+
.map_or(Value::Null, |arr| {
786+
Value::Number(
787+
serde_json::Number::from_f64(arr.value(row_idx))
788+
.unwrap_or_else(|| serde_json::Number::from(0)),
789+
)
790+
})
791+
}
792+
datafusion::arrow::datatypes::DataType::Utf8 => {
793+
column
794+
.as_any()
795+
.downcast_ref::<datafusion::arrow::array::StringArray>()
796+
.map_or(Value::Null, |arr| Value::String(arr.value(row_idx).to_string()))
797+
}
798+
datafusion::arrow::datatypes::DataType::Timestamp(_, _) => {
799+
column
800+
.as_any()
801+
.downcast_ref::<datafusion::arrow::array::TimestampNanosecondArray>()
802+
.map_or(Value::Null, |arr| Value::String(arr.value(row_idx).to_string()))
803+
}
804+
_ => {
805+
// Fallback for unsupported types
806+
Value::Null
807+
}
808+
}
809+
};
810+
row[field.name()] = value;
811+
}
812+
rows.push(row);
813+
}
814+
}
815+
Ok(rows)
700816
}

src/main.rs

Lines changed: 45 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,8 @@ async fn dashboard(
7272
query: web::Query<HashMap<String, String>>,
7373
) -> impl Responder {
7474
let uptime = Utc::now().signed_duration_since(app_info.start_time).num_seconds() as f64;
75-
let http_requests = 0.0;
76-
let queue_size = queue.get_ref().len().await.unwrap_or(0);
75+
let http_requests = 0.0; // Placeholder; update if metrics_middleware tracks this
76+
let queue_size = queue.len().unwrap_or(0);
7777
let db_status = match db.query("SELECT 1 AS test").await {
7878
Ok(_) => "success",
7979
Err(_) => "error",
@@ -238,19 +238,14 @@ async fn main() -> anyhow::Result<()> {
238238

239239
info!("Starting TimeFusion application");
240240

241-
// Read AWS configuration from environment variables.
242-
// AWS_S3_BUCKET should be your bucket name, e.g. "my-aws-bucket"
243-
// AWS_S3_ENDPOINT should be your AWS S3 endpoint, e.g. "https://s3.amazonaws.com"
241+
// Read AWS configuration from environment variables
244242
let bucket = env::var("AWS_S3_BUCKET").expect("AWS_S3_BUCKET environment variable not set");
245243
let aws_endpoint = env::var("AWS_S3_ENDPOINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
246-
// Build the storage URI for AWS S3.
247244
let storage_uri = format!("s3://{}/?endpoint={}", bucket, aws_endpoint);
248245
info!("Storage URI configured: {}", storage_uri);
249-
// Set AWS_ENDPOINT so that the underlying S3 client uses the specified endpoint.
250246
unsafe {
251247
env::set_var("AWS_ENDPOINT", &aws_endpoint);
252248
}
253-
// Parse the AWS endpoint URL and register it.
254249
let aws_url = Url::parse(&aws_endpoint).expect("AWS endpoint must be a valid URL");
255250
deltalake::aws::register_handlers(Some(aws_url));
256251
info!("AWS handlers registered");
@@ -266,7 +261,6 @@ async fn main() -> anyhow::Result<()> {
266261
return Err(e);
267262
}
268263
};
269-
// Use the fixed table name "telemetry_events".
270264
if let Err(e) = db.add_project("telemetry_events", &storage_uri).await {
271265
error!("Failed to add table 'telemetry_events': {:?}", e);
272266
return Err(e);
@@ -287,7 +281,7 @@ async fn main() -> anyhow::Result<()> {
287281
let queue_db_path = env::var("QUEUE_DB_PATH").unwrap_or_else(|_| "/app/queue_db".to_string());
288282
info!("Using queue DB path: {}", queue_db_path);
289283

290-
let queue = match PersistentQueue::new(&queue_db_path) {
284+
let queue = match PersistentQueue::new(&queue_db_path, db.clone()) {
291285
Ok(q) => {
292286
info!("PersistentQueue initialized successfully");
293287
Arc::new(q)
@@ -387,20 +381,18 @@ async fn main() -> anyhow::Result<()> {
387381
}
388382
_ = sleep(Duration::from_secs(5)) => {
389383
debug!("Checking queue for records to flush");
390-
let records = match queue_clone.dequeue_all().await {
391-
Ok(r) => {
392-
debug!("Dequeued {} records", r.len());
393-
r
394-
},
384+
match queue_clone.dequeue_all().await {
385+
Ok(records) => {
386+
debug!("Dequeued {} records", records.len());
387+
if !records.is_empty() {
388+
info!("Flushing {} enqueued records", records.len());
389+
for (key, record) in records {
390+
process_record(&db_clone, &queue_clone, &status_store_clone, key, record).await;
391+
}
392+
}
393+
}
395394
Err(e) => {
396395
error!("Error during dequeue_all: {:?}", e);
397-
Vec::new()
398-
}
399-
};
400-
if !records.is_empty() {
401-
info!("Flushing {} enqueued records", records.len());
402-
for (key, record) in records {
403-
process_record(&db_clone, &queue_clone, &status_store_clone, key, record).await;
404396
}
405397
}
406398
}
@@ -485,18 +477,34 @@ async fn main() -> anyhow::Result<()> {
485477
Ok(())
486478
}
487479

488-
async fn process_record(db: &Arc<Database>, queue: &Arc<PersistentQueue>, status_store: &IngestStatusStore, key: sled::IVec, record: IngestRecord) {
489-
use std::str;
490-
let id = str::from_utf8(&key).unwrap_or("unknown").to_string();
491-
status_store.set_status(id.clone(), "Processing".to_string());
492-
if chrono::DateTime::parse_from_rfc3339(&record.timestamp).is_ok() {
480+
async fn process_record(
481+
db: &Arc<Database>,
482+
queue: &Arc<PersistentQueue>,
483+
status_store: &Arc<IngestStatusStore>,
484+
key: String,
485+
record: IngestRecord,
486+
) {
487+
status_store.set_status(key.clone(), "Processing".to_string());
488+
let timestamp = chrono::DateTime::from_timestamp(
489+
record.start_time_unix_nano / 1_000_000_000,
490+
(record.start_time_unix_nano % 1_000_000_000) as u32,
491+
)
492+
.unwrap_or(Utc::now())
493+
.to_rfc3339();
494+
495+
if chrono::DateTime::parse_from_rfc3339(&timestamp).is_ok() {
493496
match db.write(&record).await {
494497
Ok(()) => {
495498
INGESTION_COUNTER.inc();
496-
status_store.set_status(id.clone(), "Ingested".to_string());
499+
status_store.set_status(key.clone(), "Ingested".to_string());
497500
if let Err(e) = spawn_blocking({
498501
let queue = queue.clone();
499-
move || queue.remove_sync(key)
502+
let key = key.clone();
503+
move || {
504+
if let Err(e) = queue.db.remove(key.as_bytes()) {
505+
error!("Failed to remove record from queue: {:?}", e);
506+
}
507+
}
500508
})
501509
.await
502510
{
@@ -506,16 +514,20 @@ async fn process_record(db: &Arc<Database>, queue: &Arc<PersistentQueue>, status
506514
Err(e) => {
507515
ERROR_COUNTER.inc();
508516
error!("Error writing record: {:?}", e);
509-
status_store.set_status(id, format!("Failed: {:?}", e));
517+
status_store.set_status(key, format!("Failed: {:?}", e));
510518
}
511519
}
512520
} else {
513521
ERROR_COUNTER.inc();
514-
error!("Invalid timestamp in record: {}", record.timestamp);
515-
status_store.set_status(id, "Invalid timestamp".to_string());
522+
error!("Invalid timestamp in record: {}", timestamp);
523+
status_store.set_status(key.clone(), "Invalid timestamp".to_string());
516524
let _ = spawn_blocking({
517525
let queue = queue.clone();
518-
move || queue.remove_sync(key)
526+
move || {
527+
if let Err(e) = queue.db.remove(key.as_bytes()) {
528+
error!("Failed to remove record from queue: {:?}", e);
529+
}
530+
}
519531
})
520532
.await;
521533
}

src/persistent_queue.rs

Lines changed: 31 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
// persistent_queue.rs
2-
31
use std::path::Path;
4-
use std::sync::Arc; // Added missing import
2+
use std::sync::Arc;
53

64
use anyhow::Result;
75
use serde::{Deserialize, Serialize};
@@ -209,26 +207,29 @@ pub struct IngestRecord {
209207
}
210208

211209
pub struct PersistentQueue {
212-
db: Db,
210+
pub db: Db,
213211
sender: Sender<(String, IngestRecord)>,
214-
database: Arc<Database>, // Added to enable writing to the database
212+
database: Arc<Database>,
215213
}
216214

217215
impl PersistentQueue {
218-
pub fn new<P: AsRef<Path>>(path: P, database: &Database) -> Result<Self> {
216+
pub fn new<P: AsRef<Path>>(path: P, database: Arc<Database>) -> Result<Self> {
219217
let db = sled::open(path)?;
220-
let database = Arc::new(database.clone()); // Clone the database reference
221218
let (sender, mut receiver) = mpsc::channel::<(String, IngestRecord)>(1000);
222219

223220
let queue = Self {
224-
db: db.clone(),
225-
sender: sender.clone(),
226-
database: database.clone(),
221+
db,
222+
sender,
223+
database,
227224
};
228225

226+
// Clone necessary components for the spawned task
227+
let db_clone = queue.db.clone();
228+
let database_clone = queue.database.clone();
229+
229230
tokio::spawn(async move {
230231
while let Some((receipt, record)) = receiver.recv().await {
231-
if let Err(e) = queue.process_record(receipt.clone(), &record).await {
232+
if let Err(e) = Self::process_record_static(&db_clone, &database_clone, receipt.clone(), &record).await {
232233
error!("Failed to process record with receipt {}: {:?}", receipt, e);
233234
}
234235
}
@@ -237,6 +238,14 @@ impl PersistentQueue {
237238
Ok(queue)
238239
}
239240

241+
async fn process_record_static(db: &Db, database: &Arc<Database>, receipt: String, record: &IngestRecord) -> Result<()> {
242+
tracing::info!("Processing record with receipt: {}", receipt);
243+
database.write(record).await?;
244+
db.remove(receipt.as_bytes())?;
245+
tracing::info!("Record processed and removed from queue: {}", record.trace_id);
246+
Ok(())
247+
}
248+
240249
pub async fn enqueue(&self, record: &IngestRecord) -> Result<String> {
241250
let receipt = Uuid::new_v4().to_string();
242251
let serialized = bincode::serialize(record)?;
@@ -246,13 +255,7 @@ impl PersistentQueue {
246255
}
247256

248257
pub async fn process_record(&self, receipt: String, record: &IngestRecord) -> Result<()> {
249-
tracing::info!("Processing record with receipt: {}", receipt);
250-
// Write to the database
251-
self.database.write(record).await?;
252-
// Remove from the queue after successful write
253-
self.db.remove(receipt.as_bytes())?;
254-
tracing::info!("Record processed and removed from queue: {}", record.trace_id);
255-
Ok(())
258+
Self::process_record_static(&self.db, &self.database, receipt, record).await
256259
}
257260

258261
pub async fn dequeue(&self) -> Result<Option<(String, IngestRecord)>> {
@@ -265,6 +268,16 @@ impl PersistentQueue {
265268
}
266269
}
267270

271+
pub async fn dequeue_all(&self) -> Result<Vec<(String, IngestRecord)>> {
272+
let mut records = Vec::new();
273+
while let Some((key, value)) = self.db.pop_min()? {
274+
let receipt = String::from_utf8(key.to_vec())?;
275+
let record: IngestRecord = bincode::deserialize(&value)?;
276+
records.push((receipt, record));
277+
}
278+
Ok(records)
279+
}
280+
268281
pub fn len(&self) -> Result<usize> {
269282
Ok(self.db.len())
270283
}

0 commit comments

Comments
 (0)