Skip to content

Commit fad9043

Browse files
committed
add project_id and body for logs. checkpoint before attempt at deriving schema
1 parent 71a934e commit fad9043

File tree

3 files changed

+5
-13
lines changed

3 files changed

+5
-13
lines changed

src/database.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use datafusion::{
1111
};
1212
use deltalake::{storage::StorageOptions, DeltaOps, DeltaTable, DeltaTableBuilder};
1313
use tokio::sync::RwLock;
14+
use datafusion::prelude::DataFrame;
15+
1416
type ProjectConfig = (String, StorageOptions, Arc<RwLock<DeltaTable>>);
1517

1618
pub type ProjectConfigs = Arc<RwLock<HashMap<String, ProjectConfig>>>;

src/main.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -154,17 +154,6 @@ async fn main() -> anyhow::Result<()> {
154154
warn!("'datafusion' catalog not found; proceeding with empty context");
155155
}
156156

157-
let test_schema = Arc::new(Schema::new(vec![
158-
Field::new("id", DataType::Int32, false),
159-
Field::new("name", DataType::Utf8, false),
160-
]));
161-
let test_batch = RecordBatch::try_new(
162-
test_schema.clone(),
163-
vec![Arc::new(Int32Array::from(vec![1, 2, 3])), Arc::new(StringArray::from(vec!["a", "b", "c"]))],
164-
)?;
165-
ctx.register_batch("test_table", test_batch)?;
166-
info!("Registered dummy table: test_table");
167-
168157
register_pg_settings_table(&ctx)?;
169158
register_set_config_udf(&ctx);
170159

@@ -271,7 +260,7 @@ async fn main() -> anyhow::Result<()> {
271260
if !records.is_empty() {
272261
info!("Flushing {} enqueued records", records.len());
273262
for record in records {
274-
process_record(&db_clone, &queue_clone, &status_store_clone, "default", record).await;
263+
process_record(&db_clone, &queue_clone, "default", record).await;
275264
}
276265
}
277266
}

src/persistent_queue.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use tokio::{
1212
#[derive(Serialize, Deserialize, Clone)]
1313
pub struct IngestRecord {
1414
// Top-level fields
15+
pub project_id: String,
1516
pub timestamp: i64,
1617
pub observed_timestamp: i64,
1718

@@ -27,6 +28,7 @@ pub struct IngestRecord {
2728
pub level: Option<String>, // same as severity text
2829
pub severity___severity_text: Option<String>,
2930
pub severity___severity_number: Option<String>,
31+
pub body: Option<String>, // body as json json
3032

3133
pub duration: u64, // nanoseconds
3234
pub start_time: i64,
@@ -192,4 +194,3 @@ impl PersistentQueue {
192194
Ok(Some(record))
193195
}
194196
}
195-

0 commit comments

Comments
 (0)