Skip to content

Commit 19708df

Browse files
author
Devdutt Shenoi
committed
refactor: Parseable::commit_schema
1 parent dc34a85 commit 19708df

File tree

3 files changed

+20
-28
lines changed

3 files changed

+20
-28
lines changed

src/event/mod.rs

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,17 +20,12 @@
2020
pub mod format;
2121

2222
use arrow_array::RecordBatch;
23-
use arrow_schema::{Field, Fields, Schema};
23+
use arrow_schema::Field;
2424
use itertools::Itertools;
2525
use std::sync::Arc;
2626

2727
use self::error::EventError;
28-
use crate::{
29-
metadata::update_stats,
30-
parseable::{StagingError, Stream, PARSEABLE},
31-
storage::StreamType,
32-
LOCK_EXPECT,
33-
};
28+
use crate::{metadata::update_stats, parseable::Stream, storage::StreamType};
3429
use chrono::NaiveDateTime;
3530
use std::collections::HashMap;
3631

@@ -71,7 +66,8 @@ impl Event {
7166
}
7267

7368
if self.is_first_event {
74-
commit_schema(&stream.stream_name, partition.rb.schema())?;
69+
let schema = partition.rb.schema().as_ref().clone();
70+
stream.commit_schema(schema)?;
7571
}
7672

7773
stream.push(
@@ -122,23 +118,6 @@ pub fn get_schema_key(fields: &[Arc<Field>]) -> String {
122118
format!("{hash:x}")
123119
}
124120

125-
pub fn commit_schema(stream_name: &str, schema: Arc<Schema>) -> Result<(), StagingError> {
126-
let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned");
127-
128-
let map = &mut stream_metadata
129-
.get_mut(stream_name)
130-
.expect("map has entry for this stream name")
131-
.metadata
132-
.write()
133-
.expect(LOCK_EXPECT)
134-
.schema;
135-
let current_schema = Schema::new(map.values().cloned().collect::<Fields>());
136-
let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?;
137-
map.clear();
138-
map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone())));
139-
Ok(())
140-
}
141-
142121
pub mod error {
143122

144123
use crate::{parseable::StagingError, storage::ObjectStorageError};

src/handlers/http/query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,12 @@ use serde::{Deserialize, Serialize};
2929
use serde_json::{json, Value};
3030
use std::collections::HashMap;
3131
use std::pin::Pin;
32-
use std::sync::Arc;
3332
use std::time::Instant;
3433
use tracing::error;
3534

3635
use crate::event::error::EventError;
3736
use crate::handlers::http::fetch_schema;
3837

39-
use crate::event::commit_schema;
4038
use crate::metrics::QUERY_EXECUTE_TIME;
4139
use crate::option::Mode;
4240
use crate::parseable::PARSEABLE;
@@ -174,7 +172,9 @@ pub async fn update_schema_when_distributed(tables: &Vec<String>) -> Result<(),
174172
// commit schema merges the schema internally and updates the schema in storage.
175173
commit_schema_to_storage(table, new_schema.clone()).await?;
176174

177-
commit_schema(table, Arc::new(new_schema))?;
175+
PARSEABLE
176+
.get_or_create_stream(table)
177+
.commit_schema(new_schema)?;
178178
}
179179
}
180180
}

src/parseable/streams.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,19 @@ impl Stream {
624624
Arc::new(Schema::new(fields))
625625
}
626626

627+
pub fn commit_schema(&self, schema: Schema) -> Result<(), StagingError> {
628+
let current_schema = self.get_schema().as_ref().clone();
629+
let updated_schema = Schema::try_merge([current_schema, schema])?
630+
.fields
631+
.into_iter()
632+
.map(|field| (field.name().to_owned(), field.clone()))
633+
.collect();
634+
635+
self.metadata.write().expect(LOCK_EXPECT).schema = updated_schema;
636+
637+
Ok(())
638+
}
639+
627640
pub fn get_schema_raw(&self) -> HashMap<String, Arc<Field>> {
628641
self.metadata.read().expect(LOCK_EXPECT).schema.clone()
629642
}

0 commit comments

Comments
 (0)