Skip to content

Commit 6a89c26

Browse files
committed
Fix
1 parent e43652f commit 6a89c26

File tree

2 files changed

+84
-84
lines changed

2 files changed

+84
-84
lines changed

src/main.rs

Lines changed: 76 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
mod database;
2-
mod fields;
32
mod ingest;
43
mod metrics;
54
mod metrics_middleware;
@@ -10,7 +9,6 @@ use std::{
109
collections::{HashMap, VecDeque},
1110
env,
1211
sync::Arc,
13-
time::Duration as StdDuration,
1412
};
1513

1614
use actix_web::{App, HttpResponse, HttpServer, Responder, get, middleware::Logger, web};
@@ -19,14 +17,13 @@ use database::Database;
1917
use datafusion::arrow::array::Float64Array;
2018
use datafusion_postgres::{DfSessionService, HandlerFactory};
2119
use dotenv::dotenv;
22-
use ingest::{get_all_data, get_data_by_id, get_status, record_batches_to_json_rows};
23-
use metrics::ERROR_COUNTER;
24-
use persistent_queue::PersistentQueue;
20+
use ingest::{IngestStatusStore, get_all_data, get_data_by_id, get_status, record_batches_to_json_rows};
21+
use metrics::{ERROR_COUNTER, INGESTION_COUNTER};
22+
use persistent_queue::{IngestRecord, PersistentQueue};
2523
use serde::{Deserialize, Serialize};
2624
use tokio::{
2725
net::TcpListener,
2826
sync::Mutex as TokioMutex,
29-
task::spawn_blocking,
3027
time::{Duration, sleep},
3128
};
3229
use tokio_util::sync::CancellationToken;
@@ -69,39 +66,42 @@ async fn dashboard(
6966
db: web::Data<Arc<Database>>,
7067
queue: web::Data<Arc<PersistentQueue>>,
7168
app_info: web::Data<AppInfo>,
72-
status_store: web::Data<Arc<ingest::IngestStatusStore>>,
69+
status_store: web::Data<Arc<IngestStatusStore>>,
7370
query: web::Query<HashMap<String, String>>,
7471
) -> impl Responder {
7572
let uptime = Utc::now().signed_duration_since(app_info.start_time).num_seconds() as f64;
76-
let http_requests = 0.0; // Placeholder
77-
let queue_size = queue.get_ref().len().await.unwrap_or(0); // Fixed to use get_ref()
73+
let http_requests = 0.0;
74+
let queue_size = queue.len().await.unwrap_or(0);
7875
let db_status = match db.query("SELECT 1 AS test").await {
7976
Ok(_) => "success",
8077
Err(_) => "error",
8178
};
82-
let ingestion_rate = 0.0; // Placeholder
79+
let ingestion_rate = INGESTION_COUNTER.get() as f64 / 60.0;
8380

8481
let start = query.get("start").and_then(|s| chrono::DateTime::parse_from_rfc3339(s).ok());
8582
let end = query.get("end").and_then(|e| chrono::DateTime::parse_from_rfc3339(e).ok());
8683
let records_query = if let (Some(start), Some(end)) = (start, end) {
8784
format!(
88-
"SELECT * FROM otel_logs_and_spans WHERE start_time_unix_nano >= {} AND start_time_unix_nano <= {} ORDER BY start_time_unix_nano DESC LIMIT 10",
89-
start.timestamp_nanos_opt().unwrap_or(0),
90-
end.timestamp_nanos_opt().unwrap_or(0)
85+
"SELECT traceId, spanId, startTimeUnixNano as timestamp \
86+
FROM otel_logs_and_spans WHERE startTimeUnixNano >= '{}' AND startTimeUnixNano <= '{}' ORDER BY startTimeUnixNano DESC LIMIT 10",
87+
start.to_rfc3339(),
88+
end.to_rfc3339()
9189
)
9290
} else {
93-
"SELECT * FROM otel_logs_and_spans ORDER BY start_time_unix_nano DESC LIMIT 10".to_string()
91+
"SELECT traceId, spanId, startTimeUnixNano as timestamp FROM otel_logs_and_spans ORDER BY startTimeUnixNano DESC LIMIT 10".to_string()
9492
};
9593

96-
let avg_latency = match db.query("SELECT AVG(end_time_unix_nano - start_time_unix_nano) AS avg_latency FROM otel_logs_and_spans WHERE end_time_unix_nano IS NOT NULL").await {
97-
Ok(df) => df.collect().await.ok().and_then(|batches| {
98-
batches.get(0).map(|b| {
99-
b.column(0)
100-
.as_any()
101-
.downcast_ref::<Float64Array>()
102-
.map_or(0.0, |arr| arr.value(0) / 1_000_000.0)
94+
let avg_latency = match db.query("SELECT AVG(endTimeUnixNano - startTimeUnixNano) AS avg_latency FROM otel_logs_and_spans WHERE endTimeUnixNano IS NOT NULL").await {
95+
Ok(df) => df
96+
.collect()
97+
.await
98+
.ok()
99+
.and_then(|batches| {
100+
batches
101+
.get(0)
102+
.map(|b| b.column(0).as_any().downcast_ref::<Float64Array>().map_or(0.0, |arr| arr.value(0) / 1_000_000.0))
103103
})
104-
}).unwrap_or(0.0),
104+
.unwrap_or(0.0),
105105
Err(_) => 0.0,
106106
};
107107

@@ -138,10 +138,14 @@ async fn dashboard(
138138
trends.pop_front();
139139
}
140140
let trends_vec = if let (Some(start), Some(end)) = (start, end) {
141-
trends.iter().filter(|t| {
142-
let ts = chrono::DateTime::parse_from_rfc3339(&t.timestamp).unwrap();
143-
ts >= start && ts <= end
144-
}).cloned().collect::<Vec<_>>()
141+
trends
142+
.iter()
143+
.filter(|t| {
144+
let ts = chrono::DateTime::parse_from_rfc3339(&t.timestamp).unwrap();
145+
ts >= start && ts <= end
146+
})
147+
.cloned()
148+
.collect::<Vec<_>>()
145149
} else {
146150
trends.iter().cloned().collect::<Vec<_>>()
147151
};
@@ -184,27 +188,27 @@ async fn export_records(db: web::Data<Arc<Database>>, query: web::Query<HashMap<
184188

185189
let records_query = if let (Some(start), Some(end)) = (start, end) {
186190
format!(
187-
"SELECT * FROM otel_logs_and_spans WHERE start_time_unix_nano >= {} AND start_time_unix_nano <= {}",
188-
start.timestamp_nanos_opt().unwrap_or(0),
189-
end.timestamp_nanos_opt().unwrap_or(0)
191+
"SELECT traceId, spanId, startTimeUnixNano as timestamp \
192+
FROM otel_logs_and_spans WHERE startTimeUnixNano >= '{}' AND startTimeUnixNano <= '{}'",
193+
start.to_rfc3339(),
194+
end.to_rfc3339()
190195
)
191196
} else {
192-
"SELECT * FROM otel_logs_and_spans".to_string()
197+
"SELECT traceId, spanId, startTimeUnixNano as timestamp FROM otel_logs_and_spans".to_string()
193198
};
194199

195200
let records = match db.query(&records_query).await {
196201
Ok(df) => df.collect().await.ok().map_or(vec![], |batches| record_batches_to_json_rows(&batches).unwrap_or_default()),
197202
Err(_) => vec![],
198203
};
199204

200-
let mut csv = String::from("Trace ID,Span ID,Timestamp,Name\n");
205+
let mut csv = String::from("Trace ID,Span ID,Timestamp\n");
201206
for record in records {
202207
csv.push_str(&format!(
203-
"{},{},{},{}\n",
208+
"{},{},{}\n",
204209
record["traceId"].as_str().unwrap_or("N/A"),
205210
record["spanId"].as_str().unwrap_or("N/A"),
206-
record["startTimeUnixNano"].as_i64().unwrap_or(0),
207-
record["name"].as_str().unwrap_or("N/A")
211+
record["timestamp"].as_str().unwrap_or("N/A")
208212
));
209213
}
210214

@@ -216,9 +220,7 @@ async fn export_records(db: web::Data<Arc<Database>>, query: web::Query<HashMap<
216220

217221
#[get("/")]
218222
async fn landing() -> impl Responder {
219-
HttpResponse::TemporaryRedirect()
220-
.append_header(("Location", "/dashboard"))
221-
.finish()
223+
HttpResponse::TemporaryRedirect().append_header(("Location", "/dashboard")).finish()
222224
}
223225

224226
#[tokio::main]
@@ -230,10 +232,10 @@ async fn main() -> anyhow::Result<()> {
230232

231233
info!("Starting TimeFusion application");
232234

233-
// AWS S3 Configuration
235+
// Read AWS configuration from environment variables.
234236
let bucket = env::var("AWS_S3_BUCKET").expect("AWS_S3_BUCKET environment variable not set");
235237
let aws_endpoint = env::var("AWS_S3_ENDPOINT").unwrap_or_else(|_| "https://s3.amazonaws.com".to_string());
236-
let storage_uri = format!("s3://{}/otel_logs_and_spans?endpoint={}", bucket, aws_endpoint);
238+
let storage_uri = format!("s3://{}/?endpoint={}", bucket, aws_endpoint);
237239
info!("Storage URI configured: {}", storage_uri);
238240
unsafe {
239241
env::set_var("AWS_ENDPOINT", &aws_endpoint);
@@ -242,7 +244,7 @@ async fn main() -> anyhow::Result<()> {
242244
deltalake::aws::register_handlers(Some(aws_url));
243245
info!("AWS handlers registered");
244246

245-
// Database Initialization with S3
247+
// Initialize the database with the storage URI.
246248
let db = match Database::new(&storage_uri).await {
247249
Ok(db) => {
248250
info!("Database initialized successfully");
@@ -254,21 +256,21 @@ async fn main() -> anyhow::Result<()> {
254256
}
255257
};
256258

257-
let queue_db_path = env::var("QUEUE_DB_PATH").unwrap_or_else(|_| "./queue_db/queue.log".to_string());
259+
// Initialize persistent queue.
260+
let queue_db_path = env::var("QUEUE_DB_PATH").unwrap_or_else(|_| "/app/queue_db".to_string());
258261
info!("Using queue DB path: {}", queue_db_path);
259-
260262
let queue = match PersistentQueue::new(&queue_db_path).await {
261263
Ok(q) => {
262264
info!("PersistentQueue initialized successfully");
263265
Arc::new(q)
264266
}
265267
Err(e) => {
266268
error!("Failed to initialize PersistentQueue: {:?}", e);
267-
return Err(e);
269+
return Err(e.into());
268270
}
269271
};
270272

271-
let status_store = Arc::new(ingest::IngestStatusStore::new());
273+
let status_store = Arc::new(IngestStatusStore::new());
272274
let app_info = web::Data::new(AppInfo {
273275
start_time: Utc::now(),
274276
trends: Arc::new(TokioMutex::new(VecDeque::new())),
@@ -279,7 +281,7 @@ async fn main() -> anyhow::Result<()> {
279281
let http_shutdown = shutdown_token.clone();
280282
let pgwire_shutdown = shutdown_token.clone();
281283

282-
// DataFusion-Postgres Setup
284+
// Set up datafusion-postgres PGWire server without authentication
283285
let pg_service = Arc::new(DfSessionService::new(db.get_session_context()));
284286
let handler_factory = HandlerFactory(pg_service.clone());
285287
let pg_addr = format!("0.0.0.0:{}", env::var("PGWIRE_PORT").unwrap_or_else(|_| "5432".to_string()));
@@ -305,16 +307,25 @@ async fn main() -> anyhow::Result<()> {
305307
}
306308
result = listener.accept() => {
307309
match result {
308-
Ok((socket, _addr)) => {
310+
Ok((socket, addr)) => {
311+
info!("PGWire: Accepted connection from {}", addr);
309312
let handler_factory = HandlerFactory(pg_service.clone());
310313
tokio::spawn(async move {
311-
if let Err(e) = pgwire::tokio::process_socket(socket, None, handler_factory).await {
312-
error!("Error processing PGWire socket: {:?}", e);
314+
debug!("PGWire: Starting to process socket for {}", addr);
315+
match pgwire::tokio::process_socket(socket, None, handler_factory).await {
316+
Ok(_) => {
317+
info!("PGWire: Connection from {} processed successfully", addr);
318+
debug!("PGWire: Socket processing completed for {}", addr);
319+
}
320+
Err(e) => {
321+
error!("PGWire: Error processing connection from {}: {:?}", addr, e);
322+
debug!("PGWire: Failed socket details: {:?}", e);
323+
}
313324
}
314325
});
315326
}
316327
Err(e) => {
317-
error!("Error accepting connection: {:?}", e);
328+
error!("PGWire: Error accepting connection: {:?}", e);
318329
}
319330
}
320331
}
@@ -329,6 +340,7 @@ async fn main() -> anyhow::Result<()> {
329340
return Err(anyhow::anyhow!("PGWire server failed to start"));
330341
}
331342

343+
// Queue flush task
332344
let flush_task = {
333345
let db_clone = db.clone();
334346
let queue_clone = queue.clone();
@@ -342,26 +354,14 @@ async fn main() -> anyhow::Result<()> {
342354
}
343355
_ = sleep(Duration::from_secs(5)) => {
344356
debug!("Checking queue for records to flush");
345-
loop {
346-
let maybe_record = spawn_blocking({
347-
let queue_clone = queue_clone.clone();
348-
move || {
349-
futures::executor::block_on(queue_clone.dequeue())
350-
}
351-
}).await;
352-
match maybe_record {
353-
Ok(Ok(Some(record))) => {
354-
process_record(&db_clone, &status_store_clone, record).await;
355-
},
356-
Ok(Ok(None)) => break,
357-
Ok(Err(e)) => {
358-
error!("Error in dequeue: {:?}", e);
359-
break;
360-
}
361-
Err(e) => {
362-
error!("Spawn blocking error: {:?}", e);
363-
break;
364-
}
357+
let mut records = Vec::new();
358+
while let Ok(Some(record)) = queue_clone.dequeue().await {
359+
records.push(record);
360+
}
361+
if !records.is_empty() {
362+
info!("Flushing {} enqueued records", records.len());
363+
for record in records {
364+
process_record(&db_clone, &queue_clone, &status_store_clone, record).await;
365365
}
366366
}
367367
}
@@ -370,6 +370,7 @@ async fn main() -> anyhow::Result<()> {
370370
})
371371
};
372372

373+
// HTTP server setup
373374
let http_addr = format!("0.0.0.0:{}", env::var("PORT").unwrap_or_else(|_| "80".to_string()));
374375
info!("Binding HTTP server to {}", http_addr);
375376
let server = match HttpServer::new(move || {
@@ -443,18 +444,18 @@ async fn main() -> anyhow::Result<()> {
443444
Ok(())
444445
}
445446

446-
async fn process_record(db: &Arc<Database>, status_store: &Arc<ingest::IngestStatusStore>, record: persistent_queue::IngestRecord) {
447-
let key = format!("{}_{}", record.traceId, record.spanId);
448-
status_store.set_status(key.clone(), "Processing".to_string());
449-
447+
async fn process_record(db: &Arc<Database>, _queue: &Arc<PersistentQueue>, status_store: &IngestStatusStore, record: IngestRecord) {
448+
let id = record.traceId.clone(); // Use traceId as a simple ID
449+
status_store.set_status(id.clone(), "Processing".to_string());
450450
match db.insert_record(&record).await {
451451
Ok(()) => {
452-
status_store.set_status(key, "Ingested".to_string());
452+
INGESTION_COUNTER.inc();
453+
status_store.set_status(id.clone(), "Ingested".to_string());
453454
}
454455
Err(e) => {
455456
ERROR_COUNTER.inc();
456457
error!("Error writing record: {:?}", e);
457-
status_store.set_status(key, format!("Failed: {:?}", e));
458+
status_store.set_status(id, format!("Failed: {:?}", e));
458459
}
459460
}
460461
}

src/persistent_queue.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
use std::path::PathBuf;
2-
use std::sync::{Arc, Mutex};
32
use tokio::fs::{File, OpenOptions};
43
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader, SeekFrom};
4+
use tokio::sync::Mutex; // Use tokio's Mutex
55
use serde::{Deserialize, Serialize};
66
use anyhow::Result;
7+
use std::sync::Arc;
78

89
#[derive(Serialize, Deserialize, Clone)]
910
pub struct IngestRecord {
@@ -246,8 +247,8 @@ pub struct IngestRecord {
246247
#[derive(Clone)]
247248
pub struct PersistentQueue {
248249
path: PathBuf,
249-
file: Arc<Mutex<File>>,
250-
position: Arc<Mutex<u64>>,
250+
file: Arc<Mutex<File>>, // Using tokio::sync::Mutex
251+
position: Arc<Mutex<u64>>, // Using tokio::sync::Mutex
251252
}
252253

253254
impl PersistentQueue {
@@ -272,21 +273,21 @@ impl PersistentQueue {
272273
}
273274

274275
pub async fn enqueue(&self, record: IngestRecord) -> Result<()> {
275-
let mut file = self.file.lock().unwrap();
276+
let mut file = self.file.lock().await; // Lock async
276277
let serialized = serde_json::to_string(&record)?;
277278
let len = serialized.len() as u64;
278279
file.write_all(serialized.as_bytes()).await?;
279280
file.write_all(b"\n").await?;
280281
file.flush().await?;
281282

282-
let mut pos = self.position.lock().unwrap();
283+
let mut pos = self.position.lock().await; // Lock async
283284
*pos += len + 1; // +1 for newline
284285
Ok(())
285286
}
286287

287288
pub async fn dequeue(&self) -> Result<Option<IngestRecord>> {
288-
let mut file = self.file.lock().unwrap();
289-
let mut pos = self.position.lock().unwrap();
289+
let mut file = self.file.lock().await; // Lock async
290+
let mut pos = self.position.lock().await; // Lock async
290291

291292
if *pos == 0 {
292293
return Ok(None);
@@ -303,7 +304,6 @@ impl PersistentQueue {
303304

304305
let record: IngestRecord = serde_json::from_str(&line.trim_end())?;
305306
let consumed = bytes_read as u64;
306-
drop(reader);
307307

308308
*pos -= consumed;
309309
file.set_len(*pos).await?;
@@ -314,7 +314,6 @@ impl PersistentQueue {
314314
let mut new_reader = BufReader::new(&mut *file);
315315
new_reader.seek(SeekFrom::Start(consumed)).await?;
316316
new_reader.read_to_end(&mut remaining).await?;
317-
drop(new_reader);
318317

319318
file.seek(SeekFrom::Start(0)).await?;
320319
file.write_all(&remaining).await?;

0 commit comments

Comments
 (0)