Skip to content

Commit 2977b16

Browse files
committed
Concurrency
1 parent 0b7debb commit 2977b16

File tree

3 files changed

+168
-78
lines changed

3 files changed

+168
-78
lines changed

src/database.rs

Lines changed: 59 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,29 +12,38 @@ use datafusion::{
1212
execution::context::SessionContext,
1313
prelude::*,
1414
};
15-
use deltalake::{DeltaOps, DeltaTableBuilder};
15+
use deltalake::storage::StorageOptions;
16+
use deltalake::{DeltaOps, DeltaTable, DeltaTableBuilder};
17+
type ProjectConfig = (String, StorageOptions, Arc<RwLock<DeltaTable>>);
1618

17-
pub type ProjectConfigs =
18-
Arc<RwLock<HashMap<String, (String, Arc<RwLock<deltalake::DeltaTable>>)>>>;
19+
// Corrected ProjectConfigs type definition using the type alias
20+
pub type ProjectConfigs = Arc<RwLock<HashMap<String, ProjectConfig>>>;
1921

2022
pub struct Database {
2123
pub ctx: SessionContext,
2224
project_configs: ProjectConfigs,
2325
}
2426

2527
impl Database {
28+
/// Creates a new Database instance with a default table configuration.
2629
pub async fn new(storage_uri: &str) -> Result<Self> {
2730
let ctx = SessionContext::new();
2831
let mut project_configs = HashMap::new();
2932

30-
// Initialize Delta Table with S3 URI
33+
// Default table with environment credentials
34+
let default_options = StorageOptions::default(); // Uses AWS_* env vars
3135
let table = DeltaTableBuilder::from_uri(storage_uri)
3236
.with_allow_http(true)
37+
.with_storage_options(default_options.0.clone())
3338
.build()?;
3439
ctx.register_table("otel_logs_and_spans", Arc::new(table.clone()))?;
3540
project_configs.insert(
36-
"otel_logs_and_spans".to_string(),
37-
(storage_uri.to_string(), Arc::new(RwLock::new(table))),
41+
"default".to_string(),
42+
(
43+
storage_uri.to_string(),
44+
default_options,
45+
Arc::new(RwLock::new(table)),
46+
),
3847
);
3948

4049
Ok(Self {
@@ -43,17 +52,25 @@ impl Database {
4352
})
4453
}
4554

55+
/// Returns a clone of the SessionContext for querying.
4656
pub fn get_session_context(&self) -> SessionContext {
4757
self.ctx.clone()
4858
}
4959

50-
pub async fn query(&self, sql: &str) -> Result<DataFrame> {
60+
/// Executes an SQL query for a specific project.
61+
pub async fn query(&self, project_id: &str, sql: &str) -> Result<DataFrame> {
62+
let configs = self.project_configs.read().await;
63+
if !configs.contains_key(project_id) {
64+
return Err(anyhow::anyhow!("Project ID '{}' not found", project_id));
65+
}
66+
let adjusted_sql = sql.replace("otel_logs_and_spans", &format!("otel_logs_and_spans_{}", project_id));
5167
self.ctx
52-
.sql(sql)
68+
.sql(&adjusted_sql)
5369
.await
54-
.map_err(|e| anyhow::anyhow!("SQL query failed: {:?}", e))
70+
.map_err(|e| anyhow::anyhow!("SQL query failed for project '{}': {:?}", project_id, e))
5571
}
5672

73+
/// Defines the schema for events stored in the database.
5774
fn event_schema() -> Schema {
5875
Schema::new(vec![
5976
Field::new("traceId", DataType::Utf8, false),
@@ -72,12 +89,17 @@ impl Database {
7289
])
7390
}
7491

75-
pub async fn insert_record(&self, record: &crate::persistent_queue::IngestRecord) -> Result<()> {
76-
let (_conn_str, table_ref) = {
92+
/// Inserts a record into the database for a specific project.
93+
pub async fn insert_record(
94+
&self,
95+
project_id: &str,
96+
record: &crate::persistent_queue::IngestRecord,
97+
) -> Result<()> {
98+
let (_conn_str, _options, table_ref) = {
7799
let configs = self.project_configs.read().await;
78100
configs
79-
.get("otel_logs_and_spans")
80-
.ok_or_else(|| anyhow::anyhow!("Table 'otel_logs_and_spans' not found"))?
101+
.get(project_id)
102+
.ok_or_else(|| anyhow::anyhow!("Project ID '{}' not found", project_id))?
81103
.clone()
82104
};
83105

@@ -100,14 +122,34 @@ impl Database {
100122
Ok(())
101123
}
102124

103-
pub async fn register_table(&self, conn_str: &str) -> Result<()> {
104-
let table = DeltaTableBuilder::from_uri(conn_str)
125+
/// Registers a new project with custom storage options.
126+
pub async fn register_project(
127+
&self,
128+
project_id: &str,
129+
bucket: &str,
130+
access_key: &str,
131+
secret_key: &str,
132+
endpoint: &str,
133+
) -> Result<()> {
134+
let conn_str = format!("s3://{}/otel_logs_and_spans_{}", bucket, project_id);
135+
let mut storage_options = StorageOptions::default();
136+
storage_options.0.insert("AWS_ACCESS_KEY_ID".to_string(), access_key.to_string());
137+
storage_options.0.insert("AWS_SECRET_ACCESS_KEY".to_string(), secret_key.to_string());
138+
storage_options.0.insert("AWS_ENDPOINT".to_string(), endpoint.to_string());
139+
storage_options.0.insert("AWS_ALLOW_HTTP".to_string(), "true".to_string());
140+
141+
let table = DeltaTableBuilder::from_uri(&conn_str)
142+
.with_storage_options(storage_options.0.clone()) // Use storage_options instead
105143
.with_allow_http(true)
106144
.build()?;
145+
146+
self.ctx
147+
.register_table(&format!("otel_logs_and_spans_{}", project_id), Arc::new(table.clone()))?;
148+
107149
let mut configs = self.project_configs.write().await;
108150
configs.insert(
109-
"otel_logs_and_spans".to_string(),
110-
(conn_str.to_string(), Arc::new(RwLock::new(table))),
151+
project_id.to_string(),
152+
(conn_str, storage_options, Arc::new(RwLock::new(table))),
111153
);
112154
Ok(())
113155
}

src/ingest.rs

Lines changed: 55 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::collections::HashMap;
12
use std::sync::Arc;
23

34
use actix_web::{get, web, HttpResponse, Responder};
@@ -51,64 +52,74 @@ pub async fn get_status(
5152
}
5253

5354
#[get("/data")]
54-
pub async fn get_all_data(db: web::Data<Arc<Database>>) -> impl Responder {
55-
// Updated column name: startTimeUnixNano (camelCase as declared in fields.rs)
56-
let query = "SELECT * FROM otel_logs_and_spans ORDER BY startTimeUnixNano DESC LIMIT 100";
57-
match db.query(query).await {
58-
Ok(df) => match df.collect().await {
59-
Ok(batches) => {
60-
let rows = record_batches_to_json_rows(&batches).unwrap_or_default();
61-
HttpResponse::Ok().json(rows)
62-
}
63-
Err(e) => {
64-
error!("Failed to collect data: {:?}", e);
65-
HttpResponse::InternalServerError().json(serde_json::json!({
66-
"error": "Failed to fetch data"
67-
}))
68-
}
69-
},
55+
pub async fn get_all_data(
56+
db: web::Data<Arc<Database>>,
57+
query: web::Query<HashMap<String, String>>,
58+
) -> impl Responder {
59+
let project_id = query.get("project_id").unwrap_or(&"default".to_string()).clone();
60+
let query_str = "SELECT * FROM otel_logs_and_spans ORDER BY startTimeUnixNano DESC LIMIT 100";
61+
62+
let df = match db.query(&project_id, query_str).await {
63+
Ok(df) => df,
7064
Err(e) => {
71-
error!("Failed to query data: {:?}", e);
72-
HttpResponse::InternalServerError().json(serde_json::json!({
65+
error!("Failed to query data for project '{}': {:?}", project_id, e);
66+
return HttpResponse::InternalServerError().json(serde_json::json!({
7367
"error": "Failed to query data"
74-
}))
68+
}));
7569
}
76-
}
70+
};
71+
72+
let batches: Vec<datafusion::arrow::record_batch::RecordBatch> = match df.collect().await {
73+
Ok(batches) => batches,
74+
Err(e) => {
75+
error!("Failed to collect data for project '{}': {:?}", project_id, e);
76+
return HttpResponse::InternalServerError().json(serde_json::json!({
77+
"error": "Failed to fetch data"
78+
}));
79+
}
80+
};
81+
82+
let rows = record_batches_to_json_rows(&batches).unwrap_or_default();
83+
HttpResponse::Ok().json(rows)
7784
}
7885

7986
#[get("/data/{id}")]
8087
pub async fn get_data_by_id(
8188
path: web::Path<String>,
8289
db: web::Data<Arc<Database>>,
90+
query: web::Query<HashMap<String, String>>,
8391
) -> impl Responder {
8492
let id = path.into_inner();
85-
// Updated column name: traceId (camelCase)
93+
let project_id = query.get("project_id").unwrap_or(&"default".to_string()).clone();
8694
let query = format!("SELECT * FROM otel_logs_and_spans WHERE traceId = '{}'", id);
87-
match db.query(&query).await {
88-
Ok(df) => match df.collect().await {
89-
Ok(batches) => {
90-
let rows = record_batches_to_json_rows(&batches).unwrap_or_default();
91-
if rows.is_empty() {
92-
HttpResponse::NotFound().json(serde_json::json!({
93-
"error": "Record not found"
94-
}))
95-
} else {
96-
HttpResponse::Ok().json(rows)
97-
}
98-
}
99-
Err(e) => {
100-
error!("Failed to collect data by ID: {:?}", e);
101-
HttpResponse::InternalServerError().json(serde_json::json!({
102-
"error": "Failed to fetch data"
103-
}))
104-
}
105-
},
95+
96+
let df = match db.query(&project_id, &query).await {
97+
Ok(df) => df,
10698
Err(e) => {
107-
error!("Failed to query data by ID: {:?}", e);
108-
HttpResponse::InternalServerError().json(serde_json::json!({
99+
error!("Failed to query data by ID for project '{}': {:?}", project_id, e);
100+
return HttpResponse::InternalServerError().json(serde_json::json!({
109101
"error": "Failed to query data"
110-
}))
102+
}));
111103
}
104+
};
105+
106+
let batches: Vec<datafusion::arrow::record_batch::RecordBatch> = match df.collect().await {
107+
Ok(batches) => batches,
108+
Err(e) => {
109+
error!("Failed to collect data by ID for project '{}': {:?}", project_id, e);
110+
return HttpResponse::InternalServerError().json(serde_json::json!({
111+
"error": "Failed to fetch data"
112+
}));
113+
}
114+
};
115+
116+
let rows = record_batches_to_json_rows(&batches).unwrap_or_default();
117+
if rows.is_empty() {
118+
HttpResponse::NotFound().json(serde_json::json!({
119+
"error": "Record not found"
120+
}))
121+
} else {
122+
HttpResponse::Ok().json(rows)
112123
}
113124
}
114125

@@ -201,4 +212,4 @@ pub fn record_batches_to_json_rows(
201212
}
202213
}
203214
Ok(rows)
204-
}
215+
}

0 commit comments

Comments
 (0)