Skip to content

Commit 2e20019

Browse files
committed
checkpoint. Create project_id and timestamp partitions
1 parent 2641e54 commit 2e20019

File tree

2 files changed

+37
-17
lines changed

2 files changed

+37
-17
lines changed

src/database.rs

Lines changed: 26 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,16 @@ pub struct Database {
2323
impl Database {
2424
pub async fn new(storage_uri: &str) -> Result<Self> {
2525
let session_context = SessionContext::new();
26-
let mut project_configs = HashMap::new();
26+
let project_configs = HashMap::new();
2727

28-
let default_options = StorageOptions::default();
29-
let table = DeltaTableBuilder::from_uri(storage_uri).with_allow_http(true).with_storage_options(default_options.0.clone()).build()?;
30-
session_context.register_table("otel_logs_and_spans", Arc::new(table.clone()))?;
31-
project_configs.insert("default".to_string(), (storage_uri.to_string(), default_options, Arc::new(RwLock::new(table))));
32-
33-
Ok(Self {
28+
let db = Self {
3429
session_context,
3530
project_configs: Arc::new(RwLock::new(project_configs)),
36-
})
31+
};
32+
33+
db.register_project("default", storage_uri, None, None, None).await?;
34+
35+
Ok(db)
3736
}
3837

3938
pub async fn insert_records(&self, records: &Vec<crate::persistent_queue::OtelLogsAndSpans>) -> Result<()> {
@@ -55,12 +54,23 @@ impl Database {
5554
Ok(())
5655
}
5756

58-
pub async fn register_project(&self, project_id: &str, bucket: &str, access_key: &str, secret_key: &str, endpoint: &str) -> Result<()> {
59-
let conn_str = format!("s3://{}/otel_logs_and_spans", bucket);
57+
pub async fn register_project(
58+
&self, project_id: &str, conn_str: &str, access_key: Option<&str>, secret_key: Option<&str>, endpoint: Option<&str>,
59+
) -> Result<()> {
6060
let mut storage_options = StorageOptions::default();
61-
storage_options.0.insert("AWS_ACCESS_KEY_ID".to_string(), access_key.to_string());
62-
storage_options.0.insert("AWS_SECRET_ACCESS_KEY".to_string(), secret_key.to_string());
63-
storage_options.0.insert("AWS_ENDPOINT".to_string(), endpoint.to_string());
61+
62+
if let Some(key) = access_key.filter(|k| !k.is_empty()) {
63+
storage_options.0.insert("AWS_ACCESS_KEY_ID".to_string(), key.to_string());
64+
}
65+
66+
if let Some(key) = secret_key.filter(|k| !k.is_empty()) {
67+
storage_options.0.insert("AWS_SECRET_ACCESS_KEY".to_string(), key.to_string());
68+
}
69+
70+
if let Some(ep) = endpoint.filter(|e| !e.is_empty()) {
71+
storage_options.0.insert("AWS_ENDPOINT".to_string(), ep.to_string());
72+
}
73+
6474
storage_options.0.insert("AWS_ALLOW_HTTP".to_string(), "true".to_string());
6575

6676
let table = match DeltaTableBuilder::from_uri(&conn_str)
@@ -71,9 +81,10 @@ impl Database {
7181
{
7282
Ok(table) => table,
7383
Err(err) => {
74-
warn!("table doesn't exist. creating new ond. err: {}", err);
84+
warn!("table doesn't exist. creating new table. err: {:?}", err);
7585

7686
let fields = Vec::<arrow_schema::FieldRef>::from_type::<OtelLogsAndSpans>(TracingOptions::default())?;
87+
warn!("22table doesn't exist. creating new table. err: {:?}", err);
7788
let vec_refs: Vec<StructField> = fields.iter().map(|arc_field| arc_field.as_ref().try_into().unwrap()).collect();
7889

7990
// Create the table with partitioning for project_id and timestamp
@@ -90,7 +101,7 @@ impl Database {
90101
self.session_context.register_table("otel_logs_and_spans", Arc::new(table.clone()))?;
91102

92103
let mut configs = self.project_configs.write().await;
93-
configs.insert(project_id.to_string(), (conn_str, storage_options, Arc::new(RwLock::new(table))));
104+
configs.insert(project_id.to_string(), (conn_str.to_string(), storage_options, Arc::new(RwLock::new(table))));
94105
Ok(())
95106
}
96107
}

src/main.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,21 @@ struct RegisterProjectRequest {
8181
bucket: String,
8282
access_key: String,
8383
secret_key: String,
84-
endpoint: String,
84+
endpoint: Option<String>,
8585
}
8686

8787
#[post("/register_project")]
8888
async fn register_project(req: web::Json<RegisterProjectRequest>, db: web::Data<Arc<Database>>) -> impl Responder {
89-
match db.register_project(&req.project_id, &req.bucket, &req.access_key, &req.secret_key, &req.endpoint).await {
89+
match db
90+
.register_project(
91+
&req.project_id,
92+
&req.bucket,
93+
Some(&req.access_key),
94+
Some(&req.secret_key),
95+
req.endpoint.as_deref(),
96+
)
97+
.await
98+
{
9099
Ok(()) => HttpResponse::Ok().json(serde_json::json!({
91100
"message": format!("Project '{}' registered successfully", req.project_id)
92101
})),

0 commit comments

Comments
 (0)