Skip to content

Commit 4a776d0

Browse files
committed
Fix: DB and Ingestion
1 parent 9946cd9 commit 4a776d0

File tree

4 files changed

+192
-100
lines changed

4 files changed

+192
-100
lines changed

Cargo.lock

Lines changed: 89 additions & 40 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,3 +49,6 @@ datafusion-uwheel = { git = "https://github.com/uwheel/datafusion-uwheel.git" }
4949
sqllogictest = { git = "https://github.com/risinglightdb/sqllogictest-rs.git" }
5050
criterion = {version = "0.5.1", features = ["async"] }
5151
tempfile = "3.18.0"
52+
aws-config = "1.6.0"
53+
aws-types = "1.3.6"
54+
url = "2.5.4"

src/database.rs

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ pub type ProjectConfigs = Arc<RwLock<HashMap<String, (String, Arc<RwLock<deltala
2323

2424
pub struct Database {
2525
pub ctx: SessionContext,
26-
// Keyed by table name.
2726
project_configs: ProjectConfigs,
2827
}
2928

@@ -41,7 +40,7 @@ impl Database {
4140
self.ctx.clone()
4241
}
4342

44-
/// Add a table (project) using the fixed key "telemetry_events".
43+
/// Add a table using the fixed key "telemetry_events"
4544
pub async fn add_project(&self, table_name: &str, connection_string: &str) -> Result<()> {
4645
let table = match DeltaTableBuilder::from_uri(connection_string).load().await {
4746
Ok(table) => table,
@@ -58,14 +57,11 @@ impl Database {
5857
};
5958
self.project_configs.write()
6059
.map_err(|e| anyhow::anyhow!("Failed to acquire write lock: {:?}", e))?
61-
.insert(
62-
table_name.to_string(),
63-
(connection_string.to_string(), Arc::new(RwLock::new(table))),
64-
);
60+
.insert(table_name.to_string(), (connection_string.to_string(), Arc::new(RwLock::new(table))));
6561
Ok(())
6662
}
6763

68-
/// Schema matching your new DB table.
64+
/// Schema matching your new DB table definition.
6965
fn event_schema() -> Schema {
7066
Schema::new(vec![
7167
Field::new("projectId", DataType::Utf8, false),
@@ -122,13 +118,41 @@ impl Database {
122118
let schema = Self::event_schema();
123119
schema.fields().iter().map(|f| {
124120
match f.data_type() {
125-
DataType::Utf8 => deltalake::kernel::StructField::new(f.name().to_string(), deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::String), f.is_nullable()),
126-
DataType::Timestamp(_, _) => deltalake::kernel::StructField::new(f.name().to_string(), deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Timestamp), f.is_nullable()),
127-
DataType::Int64 => deltalake::kernel::StructField::new(f.name().to_string(), deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Long), f.is_nullable()),
128-
DataType::Int32 => deltalake::kernel::StructField::new(f.name().to_string(), deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Integer), f.is_nullable()),
129-
DataType::Boolean => deltalake::kernel::StructField::new(f.name().to_string(), deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Boolean), f.is_nullable()),
130-
DataType::List(_) => deltalake::kernel::StructField::new(f.name().to_string(), deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::String), f.is_nullable()),
131-
_ => deltalake::kernel::StructField::new(f.name().to_string(), deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::String), f.is_nullable()),
121+
DataType::Utf8 => deltalake::kernel::StructField::new(
122+
f.name().to_string(),
123+
deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::String),
124+
f.is_nullable()
125+
),
126+
DataType::Timestamp(_, _) => deltalake::kernel::StructField::new(
127+
f.name().to_string(),
128+
deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Timestamp),
129+
f.is_nullable()
130+
),
131+
DataType::Int64 => deltalake::kernel::StructField::new(
132+
f.name().to_string(),
133+
deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Long),
134+
f.is_nullable()
135+
),
136+
DataType::Int32 => deltalake::kernel::StructField::new(
137+
f.name().to_string(),
138+
deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Integer),
139+
f.is_nullable()
140+
),
141+
DataType::Boolean => deltalake::kernel::StructField::new(
142+
f.name().to_string(),
143+
deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::Boolean),
144+
f.is_nullable()
145+
),
146+
DataType::List(_) => deltalake::kernel::StructField::new(
147+
f.name().to_string(),
148+
deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::String),
149+
f.is_nullable()
150+
),
151+
_ => deltalake::kernel::StructField::new(
152+
f.name().to_string(),
153+
deltalake::kernel::DataType::Primitive(deltalake::kernel::PrimitiveType::String),
154+
f.is_nullable()
155+
),
132156
}
133157
}).collect()
134158
}
@@ -223,7 +247,7 @@ impl Database {
223247
let sdk_type_array = StringArray::from(vec![record.sdkType.clone()]);
224248
let service_version_array = StringArray::from(vec![record.serviceVersion.clone().unwrap_or_default()]);
225249
let errors_array = StringArray::from(vec![record.errors.clone().unwrap_or_else(|| "{}".to_string())]);
226-
250+
227251
let mut tags_builder = ListBuilder::new(StringBuilder::new());
228252
for tag in &record.tags {
229253
tags_builder.values().append_value(tag);

0 commit comments

Comments
 (0)