Skip to content

Commit a91b3bf

Browse files
committed
cleanups
1 parent 02cf646 commit a91b3bf

File tree

7 files changed

+56
-27
lines changed

7 files changed

+56
-27
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ ahash = "0.8"
6868
lru = "0.16.1"
6969
serde_bytes = "0.11.19"
7070
dashmap = "6.1"
71+
parking_lot = "0.12"
7172
envy = "0.4"
7273
tdigests = "1.0"
7374
bincode = { version = "2.0", features = ["serde"] }

src/buffered_write_layer.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,14 @@ impl BufferedWriteLayer {
138138
return Ok(estimated_size);
139139
}
140140

141-
// Exponential backoff: spin_loop for first few attempts, then yield
141+
// Exponential backoff: spin_loop for first few attempts, then brief sleep.
142+
// Note: Using std::thread::sleep in this sync function called from async context.
143+
// This is acceptable because: (1) max sleep is ~1ms, (2) only under high contention,
144+
// (3) converting to async would require spawn_blocking which adds more overhead.
142145
if attempt < 5 {
143146
std::hint::spin_loop();
144147
} else {
148+
// Max backoff = 1μs << 10 = 1024μs ≈ 1ms
145149
let backoff_micros = CAS_BACKOFF_BASE_MICROS << attempt.min(CAS_BACKOFF_MAX_EXPONENT);
146150
std::thread::sleep(std::time::Duration::from_micros(backoff_micros));
147151
}

src/database.rs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,26 @@ use deltalake::operations::create::CreateBuilder;
3636
use deltalake::{DeltaTable, DeltaTableBuilder};
3737
use futures::StreamExt;
3838
use instrumented_object_store::instrument_object_store;
39+
use std::sync::Mutex;
3940
use serde::{Deserialize, Serialize};
4041
use sqlx::{PgPool, postgres::PgPoolOptions};
4142
use std::fmt;
43+
use std::sync::OnceLock;
4244
use std::{any::Any, collections::HashMap, sync::Arc};
4345
use tokio::sync::RwLock;
4446
use tokio_util::sync::CancellationToken;
4547
use tracing::field::Empty;
4648
use tracing::{Instrument, debug, error, info, instrument, warn};
4749
use url::Url;
4850

51+
/// Mutex to serialize access to environment variable modifications.
52+
/// Required because delta-rs uses std::env::var() for AWS credential resolution,
53+
/// and std::env::set_var is unsafe in multi-threaded contexts.
54+
static ENV_MUTEX: OnceLock<Mutex<()>> = OnceLock::new();
55+
fn env_mutex() -> &'static Mutex<()> {
56+
ENV_MUTEX.get_or_init(|| Mutex::new(()))
57+
}
58+
4959
// Changed to support multiple tables per project: (project_id, table_name) -> DeltaTable
5060
pub type ProjectConfigs = Arc<RwLock<HashMap<(String, String), Arc<RwLock<DeltaTable>>>>>;
5161

@@ -1136,17 +1146,18 @@ impl Database {
11361146
async fn create_or_load_delta_table(
11371147
&self, storage_uri: &str, storage_options: HashMap<String, String>, cached_store: Arc<dyn object_store::ObjectStore>,
11381148
) -> Result<DeltaTable> {
1139-
// SAFETY: delta-rs internally uses std::env::var() for AWS credential resolution.
1140-
// While set_var is unsafe in multi-threaded contexts (potential data races with concurrent
1141-
// env reads), this is acceptable here because:
1142-
// 1. We only set AWS_* vars which are read by the AWS SDK during client initialization
1143-
// 2. The values are consistent across calls (same credentials for same storage_options)
1144-
// 3. Delta table creation happens early in request processing, before parallel query execution
1145-
// 4. The alternative (forking processes or thread-local storage) adds significant complexity
1146-
for (key, value) in &storage_options {
1147-
if key.starts_with("AWS_") {
1148-
unsafe {
1149-
std::env::set_var(key, value);
1149+
// delta-rs uses std::env::var() for AWS credential resolution.
1150+
// We serialize access with ENV_MUTEX to prevent data races from concurrent set_var calls.
1151+
{
1152+
let _guard = env_mutex().lock();
1153+
for (key, value) in &storage_options {
1154+
if key.starts_with("AWS_") {
1155+
// SAFETY: Protected by ENV_MUTEX. set_var is only unsafe due to potential
1156+
// concurrent reads, which we prevent by holding the mutex during the entire
1157+
// block. The mutex ensures only one thread modifies env vars at a time.
1158+
unsafe {
1159+
std::env::set_var(key, value);
1160+
}
11501161
}
11511162
}
11521163
}
@@ -1194,9 +1205,8 @@ impl Database {
11941205

11951206
// Fallback to legacy batch queue if configured
11961207
let enable_queue = self.config.core.enable_batch_queue;
1197-
if !skip_queue && enable_queue && self.batch_queue.is_some() {
1208+
if !skip_queue && enable_queue && let Some(ref queue) = self.batch_queue {
11981209
span.record("use_queue", true);
1199-
let queue = self.batch_queue.as_ref().unwrap();
12001210
for batch in batches {
12011211
if let Err(e) = queue.queue(batch) {
12021212
return Err(anyhow::anyhow!("Queue error: {}", e));
@@ -1724,12 +1734,19 @@ impl ProjectRoutingTable {
17241734
// delta table provider expects indices based on its own schema.
17251735
let delta_schema = provider.schema();
17261736
let translated_projection = projection.map(|proj| {
1727-
proj.iter()
1728-
.filter_map(|&idx| {
1729-
let col_name = self.schema.field(idx).name();
1730-
delta_schema.fields().iter().position(|f| f.name() == col_name)
1731-
})
1732-
.collect::<Vec<_>>()
1737+
let mut translated = Vec::with_capacity(proj.len());
1738+
for &idx in proj {
1739+
let col_name = self.schema.field(idx).name();
1740+
if let Some(delta_idx) = delta_schema.fields().iter().position(|f| f.name() == col_name) {
1741+
translated.push(delta_idx);
1742+
} else {
1743+
warn!(
1744+
"Column '{}' requested in projection but not found in Delta schema for table '{}'",
1745+
col_name, self.table_name
1746+
);
1747+
}
1748+
}
1749+
translated
17331750
});
17341751

17351752
let delta_plan = provider.scan(state, translated_projection.as_ref(), filters, limit).await?;

src/mem_buffer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use datafusion::sql::sqlparser::dialect::GenericDialect;
1212
use datafusion::sql::sqlparser::parser::Parser as SqlParser;
1313
use std::sync::atomic::{AtomicI64, AtomicUsize, Ordering};
1414
use std::sync::{Arc, RwLock};
15-
use tracing::{debug, instrument, warn};
15+
use tracing::{debug, info, instrument, warn};
1616

1717
// 10-minute buckets balance flush granularity vs overhead. Shorter = more flushes,
1818
// longer = larger Delta files. Matches default flush interval for aligned boundaries.
@@ -46,7 +46,7 @@ fn schemas_compatible(existing: &SchemaRef, incoming: &SchemaRef) -> bool {
4646
}
4747
}
4848
if new_fields > 0 {
49-
debug!("Schema evolution: {} new nullable field(s) added", new_fields);
49+
info!("Schema evolution: {} new nullable field(s) added", new_fields);
5050
}
5151
true
5252
}

src/pgwire_handlers.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ pub struct LoggingSimpleQueryHandler {
7171
}
7272

7373
impl LoggingSimpleQueryHandler {
74+
/// Create a new LoggingSimpleQueryHandler.
75+
/// Note: auth_manager is unused since datafusion-postgres 0.14.0 moved auth to server level.
7476
pub fn new(session_context: Arc<SessionContext>, _auth_manager: Arc<AuthManager>) -> Self {
7577
Self {
7678
inner: DfSessionService::new(session_context),
@@ -144,6 +146,8 @@ pub struct LoggingExtendedQueryHandler {
144146
}
145147

146148
impl LoggingExtendedQueryHandler {
149+
/// Create a new LoggingExtendedQueryHandler.
150+
/// Note: auth_manager is unused since datafusion-postgres 0.14.0 moved auth to server level.
147151
pub fn new(session_context: Arc<SessionContext>, _auth_manager: Arc<AuthManager>) -> Self {
148152
Self {
149153
inner: DfSessionService::new(session_context),

src/wal.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ impl CompactColumn {
118118
Self {
119119
null_bitmap: data.nulls().map(|n| n.buffer().as_slice().to_vec()),
120120
buffers: data.buffers().iter().map(|b| b.as_slice().to_vec()).collect(),
121-
children: data.child_data().iter().map(|c| Self::from_array_data(c)).collect(),
121+
children: data.child_data().iter().map(Self::from_array_data).collect(),
122122
null_count: data.null_count(),
123123
child_lens: data.child_data().iter().map(|c| c.len()).collect(),
124124
}
@@ -128,7 +128,7 @@ impl CompactColumn {
128128
Self {
129129
null_bitmap: data.nulls().map(|n| n.buffer().as_slice().to_vec()),
130130
buffers: data.buffers().iter().map(|b| b.as_slice().to_vec()).collect(),
131-
children: data.child_data().iter().map(|c| Self::from_array_data(c)).collect(),
131+
children: data.child_data().iter().map(Self::from_array_data).collect(),
132132
null_count: data.null_count(),
133133
child_lens: data.child_data().iter().map(|c| c.len()).collect(),
134134
}
@@ -454,9 +454,11 @@ fn deserialize_wal_entry(data: &[u8]) -> Result<WalEntry, WalError> {
454454
}
455455

456456
if data[0..4] == WAL_MAGIC {
457-
// v1+ format: data[4] is version byte (>= 1), data[5] is operation
458-
// v0 format: data[4] is operation (0-2), no version byte
459-
// Distinguish: if data[4] > 2, it must be a version byte
457+
// WAL format detection based on byte 4:
458+
// - v0 (legacy): data[4] is operation byte (0=Insert, 1=Delete, 2=Update)
459+
// - v1+ (current): data[4] is version byte (>=128), data[5] is operation
460+
// Since WalOperation values are 0-2 and WAL_VERSION is 128, we can safely
461+
// distinguish formats: if data[4] > 2, it must be a version byte, not an operation.
460462
if data[4] > 2 {
461463
if data.len() < 6 {
462464
return Err(WalError::TooShort { len: data.len() });

0 commit comments

Comments
 (0)