Skip to content

Commit 1a8517c

Browse files
committed
extract schema and batches via schema_arrow, and cleanup redundant logics
1 parent fad9043 commit 1a8517c

File tree

7 files changed

+71
-254
lines changed

7 files changed

+71
-254
lines changed

Cargo.lock

Lines changed: 46 additions & 22 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ datafusion = "46.0.0"
99
arrow = "54.2.0"
1010
uuid = { version = "1.13", features = ["v4", "serde"] }
1111
serde = { version = "1", features = ["derive"] }
12+
serde_arrow = { version = "0.13.1", features = ["arrow-54"] }
1213
serde_json = "1.0.138"
1314
async-trait = "0.1.86"
1415
env_logger = "0.11.6"
@@ -30,7 +31,6 @@ anyhow = "1.0.95"
3031
tokio-util = "0.7.13"
3132
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }
3233
tracing = "0.1.41"
33-
prometheus = "0.13.4"
3434
dotenv = "0.15.0"
3535
task = "0.0.1"
3636
sqlparser = "0.55.0"

src/database.rs

Lines changed: 10 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
use std::{collections::HashMap, sync::Arc};
22

33
use anyhow::Result;
4-
use datafusion::{
5-
arrow::{
6-
array::{ArrayRef, StringArray, TimestampNanosecondArray},
7-
datatypes::{DataType, Field, Schema, TimeUnit},
8-
record_batch::RecordBatch,
9-
},
10-
execution::context::SessionContext,
11-
};
4+
use datafusion::execution::context::SessionContext;
125
use deltalake::{storage::StorageOptions, DeltaOps, DeltaTable, DeltaTableBuilder};
6+
use serde_arrow::schema::{SchemaLike, TracingOptions};
137
use tokio::sync::RwLock;
14-
use datafusion::prelude::DataFrame;
8+
9+
use crate::persistent_queue::OtelLogsAndSpans;
1510

1611
type ProjectConfig = (String, StorageOptions, Arc<RwLock<DeltaTable>>);
1712

@@ -28,10 +23,7 @@ impl Database {
2823
let mut project_configs = HashMap::new();
2924

3025
let default_options = StorageOptions::default();
31-
let table = DeltaTableBuilder::from_uri(storage_uri)
32-
.with_allow_http(true)
33-
.with_storage_options(default_options.0.clone())
34-
.build()?;
26+
let table = DeltaTableBuilder::from_uri(storage_uri).with_allow_http(true).with_storage_options(default_options.0.clone()).build()?;
3527
ctx.register_table("otel_logs_and_spans", Arc::new(table.clone()))?;
3628
project_configs.insert("default".to_string(), (storage_uri.to_string(), default_options, Arc::new(RwLock::new(table))));
3729

@@ -46,75 +38,30 @@ impl Database {
4638
&self.ctx
4739
}
4840

49-
pub async fn query(&self, project_id: &str, sql: &str) -> Result<DataFrame> {
50-
let configs = self.project_configs.read().await;
51-
if !configs.contains_key(project_id) {
52-
return Err(anyhow::anyhow!("Project ID '{}' not found", project_id));
53-
}
54-
let adjusted_sql = sql.replace("otel_logs_and_spans", &format!("otel_logs_and_spans_{}", project_id));
55-
self.ctx
56-
.sql(&adjusted_sql)
57-
.await
58-
.map_err(|e| anyhow::anyhow!("SQL query failed for project '{}': {:?}", project_id, e))
59-
}
60-
61-
fn event_schema() -> Schema {
62-
Schema::new(vec![
63-
Field::new("traceId", DataType::Utf8, false),
64-
Field::new("spanId", DataType::Utf8, false),
65-
Field::new("startTimeUnixNano", DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("UTC"))), false),
66-
Field::new("endTimeUnixNano", DataType::Timestamp(TimeUnit::Nanosecond, Some(Arc::from("UTC"))), true),
67-
Field::new("name", DataType::Utf8, false),
68-
])
69-
}
70-
71-
pub async fn insert_record(
72-
&self,
73-
project_id: &str,
74-
record: &crate::persistent_queue::IngestRecord,
75-
) -> Result<()> {
41+
pub async fn insert_records(&self, project_id: &str, records: &Vec<crate::persistent_queue::OtelLogsAndSpans>) -> Result<()> {
7642
let (_conn_str, _options, table_ref) = {
7743
let configs = self.project_configs.read().await;
7844
configs.get(project_id).ok_or_else(|| anyhow::anyhow!("Project ID '{}' not found", project_id))?.clone()
7945
};
8046

81-
let arrays: Vec<ArrayRef> = vec![
82-
Arc::new(StringArray::from(vec![Some(&record.context___trace_id[..])])),
83-
Arc::new(StringArray::from(vec![Some(&record.context___span_id[..])])),
84-
Arc::new(TimestampNanosecondArray::from(vec![record.timestamp])),
85-
Arc::new(TimestampNanosecondArray::from(vec![record.start_time])),
86-
Arc::new(TimestampNanosecondArray::from(vec![record.end_time.unwrap_or(0)])),
87-
Arc::new(StringArray::from(vec![Some(&record.name[..])])),
88-
];
89-
90-
let schema = Self::event_schema();
91-
let batch = RecordBatch::try_new(Arc::new(schema), arrays)?;
47+
let fields = Vec::<arrow_schema::FieldRef>::from_type::<OtelLogsAndSpans>(TracingOptions::default())?;
48+
let batch = serde_arrow::to_record_batch(&fields, &records)?;
9249

9350
let mut table = table_ref.write().await;
9451
let ops = DeltaOps(table.clone());
9552
*table = ops.write(vec![batch]).await?;
9653
Ok(())
9754
}
9855

99-
pub async fn register_project(
100-
&self,
101-
project_id: &str,
102-
bucket: &str,
103-
access_key: &str,
104-
secret_key: &str,
105-
endpoint: &str,
106-
) -> Result<()> {
56+
pub async fn register_project(&self, project_id: &str, bucket: &str, access_key: &str, secret_key: &str, endpoint: &str) -> Result<()> {
10757
let conn_str = format!("s3://{}/otel_logs_and_spans_{}", bucket, project_id);
10858
let mut storage_options = StorageOptions::default();
10959
storage_options.0.insert("AWS_ACCESS_KEY_ID".to_string(), access_key.to_string());
11060
storage_options.0.insert("AWS_SECRET_ACCESS_KEY".to_string(), secret_key.to_string());
11161
storage_options.0.insert("AWS_ENDPOINT".to_string(), endpoint.to_string());
11262
storage_options.0.insert("AWS_ALLOW_HTTP".to_string(), "true".to_string());
11363

114-
let table = DeltaTableBuilder::from_uri(&conn_str)
115-
.with_storage_options(storage_options.0.clone())
116-
.with_allow_http(true)
117-
.build()?;
64+
let table = DeltaTableBuilder::from_uri(&conn_str).with_storage_options(storage_options.0.clone()).with_allow_http(true).build()?;
11865

11966
self.ctx.register_table(&format!("otel_logs_and_spans_{}", project_id), Arc::new(table.clone()))?;
12067

src/fields.rs

Lines changed: 0 additions & 139 deletions
This file was deleted.

src/lib.rs

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)