Skip to content

Commit 09ebef5

Browse files
committed
update schema
1 parent 2ea08ed commit 09ebef5

File tree

2 files changed

+24
-20
lines changed

2 files changed

+24
-20
lines changed

schemas/otel_logs_and_spans.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ z_order_columns:
1212
- timestamp
1313
- resource___service___name
1414
fields:
15+
- name: date
16+
data_type: Date32
17+
nullable: false
1518
- name: timestamp
1619
data_type: 'Timestamp(Microsecond, Some("UTC"))'
1720
nullable: false
@@ -270,7 +273,9 @@ fields:
270273
- name: summary
271274
data_type: "List(Utf8)"
272275
nullable: false
273-
- name: date
274-
data_type: Date32
275-
nullable: false
276-
276+
- name: errors
277+
data_type: Utf8
278+
nullable: true
279+
- name: log_pattern
280+
data_type: Utf8
281+
nullable: true

src/database.rs

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use std::fmt;
3636
use std::{any::Any, collections::HashMap, env, sync::Arc};
3737
use tokio::sync::RwLock;
3838
use tokio_util::sync::CancellationToken;
39-
use tracing::{debug, error, info, warn, instrument, Instrument};
4039
use tracing::field::Empty;
40+
use tracing::{debug, error, info, instrument, warn, Instrument};
4141
use url::Url;
4242

4343
// Changed to support multiple tables per project: (project_id, table_name) -> DeltaTable
@@ -978,9 +978,7 @@ impl Database {
978978
}
979979

980980
// Create the base S3 object store
981-
let base_store = self.create_object_store(&storage_uri, &storage_options)
982-
.instrument(tracing::trace_span!("create_object_store"))
983-
.await?;
981+
let base_store = self.create_object_store(&storage_uri, &storage_options).instrument(tracing::trace_span!("create_object_store")).await?;
984982

985983
// Wrap with instrumentation for tracing
986984
let instrumented_store = instrument_object_store(base_store, "s3");
@@ -1190,7 +1188,7 @@ impl Database {
11901188
}
11911189
return Ok(());
11921190
}
1193-
1191+
11941192
span.record("use_queue", false);
11951193

11961194
// Extract project_id from first batch if not provided
@@ -1229,10 +1227,13 @@ impl Database {
12291227

12301228
let write_span = tracing::trace_span!(parent: &span, "delta.write_operation", retry_attempt = retry_count + 1);
12311229
let write_result = async {
1230+
// Schema evolution enabled: new columns will be automatically added to the table
12321231
DeltaOps(table.clone())
12331232
.write(batches.clone())
12341233
.with_partition_columns(schema.partitions.clone())
12351234
.with_writer_properties(writer_properties.clone())
1235+
.with_save_mode(deltalake::protocol::SaveMode::Append)
1236+
.with_schema_mode(deltalake::operations::write::SchemaMode::Merge)
12361237
.await
12371238
}
12381239
.instrument(write_span)
@@ -1554,6 +1555,8 @@ impl ProjectRoutingTable {
15541555
}
15551556

15561557
fn schema(&self) -> SchemaRef {
1558+
// For now, return the YAML schema.
1559+
// TODO: Consider caching the actual Delta schema to handle evolution better
15571560
self.schema.clone()
15581561
}
15591562

@@ -1840,7 +1843,7 @@ impl TableProvider for ProjectRoutingTable {
18401843
)]
18411844
async fn scan(&self, state: &dyn Session, projection: Option<&Vec<usize>>, filters: &[Expr], limit: Option<usize>) -> DFResult<Arc<dyn ExecutionPlan>> {
18421845
let span = tracing::Span::current();
1843-
1846+
18441847
// Apply our custom optimizations to the filters
18451848
let optimized_filters = self.apply_time_series_optimizations(filters)?;
18461849

@@ -1850,21 +1853,17 @@ impl TableProvider for ProjectRoutingTable {
18501853

18511854
// Execute query and create plan with optimized filters
18521855
let resolve_span = tracing::trace_span!(parent: &span, "resolve_delta_table");
1853-
let delta_table = self.database.resolve_table(&project_id, &self.table_name)
1854-
.instrument(resolve_span)
1855-
.await?;
1856+
let delta_table = self.database.resolve_table(&project_id, &self.table_name).instrument(resolve_span).await?;
18561857
let table = delta_table.read().await;
1857-
1858+
18581859
// Create a span for the table scan that will be the parent for all object store operations
1859-
let scan_span = tracing::trace_span!("delta_table.scan",
1860-
table.name = %self.table_name,
1860+
let scan_span = tracing::trace_span!("delta_table.scan",
1861+
table.name = %self.table_name,
18611862
table.project_id = %project_id,
18621863
partition_filters = ?optimized_filters.iter().filter(|f| matches!(f, Expr::BinaryExpr(_))).count()
18631864
);
1864-
1865-
let plan = table.scan(state, projection, &optimized_filters, limit)
1866-
.instrument(scan_span)
1867-
.await?;
1865+
1866+
let plan = table.scan(state, projection, &optimized_filters, limit).instrument(scan_span).await?;
18681867

18691868
Ok(plan)
18701869
}

0 commit comments

Comments
 (0)