Add Variant SELECT rewriter and comprehensive architecture docs#16
Add Variant SELECT rewriter and comprehensive architecture docs#16tonyalaribe wants to merge 11 commits intomasterfrom
Conversation
- Consolidate WALRUS_DATA_DIR and FOYER_CACHE_DIR into single TIMEFUSION_DATA_DIR with derived subdirs (wal/, cache/) - Add VariantConversionExec to convert string columns to Variant during INSERT - Add VariantInsertRewriter analyzer rule to rewrite string literals for Variant columns - Add is_schema_compatible_for_insert() for flexible INSERT type checking - Split optimizers.rs into optimizers/ module directory - Improve query sanitization for INSERT and long queries
- Add VariantSelectRewriter analyzer rule to wrap Variant columns with variant_to_json() in SELECT projections for PostgreSQL wire protocol - Add comprehensive documentation: - docs/ARCHITECTURE.md: Full system architecture overview - docs/VARIANT_TYPE_SYSTEM.md: Variant type implementation details - docs/WAL.md: Write-ahead log implementation and recovery - Update database.rs with unified table storage model improvements - Update DML operations with buffered layer integration - Align otel_logs_and_spans schema with monoscope - Fix test configurations for new architecture
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsSummaryThis is a well-structured PR that adds critical functionality for Variant type handling and significantly improves documentation. The implementation is thoughtful and addresses PostgreSQL wire protocol compatibility correctly. ✅ StrengthsCode Quality
Best Practices
🔍 Issues & RecommendationsSecurity ConcernsHIGH: Credentials in Database Schema (database.rs:570-571) s3_access_key_id VARCHAR(500) NOT NULL,
s3_secret_access_key VARCHAR(500) NOT NULL,Storing credentials in plaintext is a security vulnerability. Consider:
MEDIUM: Potential SQL Injection in Dynamic Queries Code Quality Issues1. Missing Error Context (variant_insert_rewriter.rs:32) plan.transform_up(|node| rewrite_insert_node(node)).map(|t| t.data)Consider adding error context to help with debugging: plan.transform_up(|node| rewrite_insert_node(node))
.map(|t| t.data)
.map_err(|e| DataFusionError::Plan(format!("VariantInsertRewriter failed: {}", e)))2. Recursive Transformation Could Stack Overflow (variant_insert_rewriter.rs:94-104)
3. Magic Number (optimizers/mod.rs:29) let date_scalar = ScalarValue::Date32(Some(date.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp() as i32 / 86400));The division by 86400 (seconds per day) should be a named constant: const SECONDS_PER_DAY: i32 = 86400;4. Unused Code Warning (optimizers/mod.rs:7-10) // Remove unused imports warning - these are used by the submodules indirectly
use datafusion::logical_expr::{BinaryExpr, Expr, Operator};
use datafusion::scalar::ScalarValue;These imports ARE used directly in the time_range_partition_pruner module below. The comment is misleading - just remove it. Performance Considerations1. Cloning in Hot Path (variant_select_rewriter.rs:40) expr.clone()While necessary for the TreeNode pattern, consider documenting that this is acceptable because:
2. Batch Conversion Efficiency (database.rs:100-152) Test CoverageGOOD:
NEEDS IMPROVEMENT:
Recommendation: Add unit tests in #[cfg(test)]
mod tests {
use super::*;
use datafusion::prelude::*;
#[test]
fn test_variant_select_no_double_wrap() {
// Test that already-wrapped columns aren't wrapped again
}
#[test]
fn test_variant_insert_with_partial_columns() {
// Test INSERT with fewer columns than table schema
}
}DocumentationMinor Issues:
🎯 Action ItemsBefore Merge (Required):
Post-Merge (Recommended):
📊 Metrics
ConclusionThis is high-quality work that significantly improves the Variant type system and documentation. The security concern around credential storage should be addressed before merging, but otherwise the implementation is solid. The new documentation is comprehensive and will help onboard contributors. Recommendation: Approve with requested changes to credential storage Co-Authored-By: Claude Sonnet 4.5 noreply@anthropic.com |
variant_columns_to_json() was using RecordBatch::try_new() which fails when creating batches with 0 columns (empty projections used by COUNT(*)) because Arrow requires either columns or an explicit row count. Changed to try_new_with_options() to preserve the original batch's row count, fixing queries like SELECT COUNT(*) that don't need any columns.
Transient network errors like "error sending request" were failing immediately with no retries. Added: - RetryConfig: 5 retries with exponential backoff (100ms-15s) - ClientOptions: 30s connect timeout, 5min request timeout This should resolve intermittent flush failures to R2/S3.
- Replace blocking std::thread::sleep with tokio::time::sleep in CAS retry loop to avoid starving the Tokio executor under contention - Fix DML memory tracking: recalculate bucket memory_bytes after DELETE/UPDATE operations to prevent premature flush triggers - Improve WAL recovery resilience: catch schema-incompatible entries instead of aborting recovery, add empty batch skip - Add timestamp range filtering to MemBuffer queries: extract bounds from filter expressions and skip non-overlapping time buckets - Switch from GreedyMemoryPool to FairSpillPool for per-query memory fairness and automatic spill-to-disk under pressure - Make WAL fsync interval configurable via TIMEFUSION_WAL_FSYNC_MS env var (default 200ms)
… optimization - WAL serialization now uses Arrow IPC (v129) instead of custom CompactBatch, with automatic fallback to legacy v128 format for existing WAL entries - MemBuffer compacts multiple small batches into a single RecordBatch before flush to reduce small file writes - Optimization window, min file threshold, and light optimize target size are now configurable via maintenance config
- Switch TimeBucket from RwLock to parking_lot::Mutex for lower overhead - Add compact-on-read in query()/query_partitioned(): first read compacts batches in-place, subsequent reads get pre-compacted single batch - Remove insert-time compaction that caused +64% batch_api write regression - Revert WAL serialization from Arrow IPC back to bincode CompactBatch (IPC schema preamble overhead caused +18-20% SQL insert regression) - Keep IPC deserialization as fallback for backward compatibility - Skip VariantToJsonExec wrapper for tables with no Variant columns - Add bloom filter config (timefusion_bloom_filter_disabled), enabled by default - Add WAL file monitoring and emergency flush on file count threshold - Add criterion benchmarks for write, read, S3 flush, and S3 read paths
Code Review: PR #16 - Variant SELECT/INSERT Rewriter and Architecture DocsOverviewThis is a well-architected and production-ready PR that adds comprehensive Variant type support, extensive documentation, and performance optimizations. The implementation is solid with clean separation of concerns and proper integration into the DataFusion query pipeline. Overall Assessment: ✅ Approve with minor recommendations 🎯 Key Strengths1. Clean Implementation
2. Comprehensive Documentation
3. Performance Focus
4. Type Safety & Error Handling
|
Code Review - PR #16: Variant SELECT Rewriter and Architecture DocumentationOverall AssessmentApproval Status: ✅ Approve with minor suggestions This is a high-quality PR with well-designed components. The VariantSelectRewriter and VariantInsertRewriter are clean, focused implementations that integrate well with DataFusion's analyzer pipeline. Component Reviews1. VariantSelectRewriter (
|
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsOverall this is a substantial, well-structured PR with meaningful improvements across the board. The unified storage model, session-aware DML, and timestamp-based bucket pruning are all solid additions. I've flagged several issues worth addressing before merging. Bugs / Correctness Issues1. In 2. // tests/buffer_consistency_test.rs
unsafe { std::env::set_var("WALRUS_DATA_DIR", &cfg.core.timefusion_data_dir) };
3. WAL deserialization:
4. // src/wal.rs
return Err(WalError::UnsupportedVersion {
version: data[4],
expected: WAL_VERSION_IPC, // reports 129, but 128 is also valid
});Both 128 and 129 are now valid. Reporting 5. // src/pgwire_handlers.rs
format!("{}...", &query[..MAX_LEN])Slicing a Performance Concerns6. In both 7. if let Some(ref pool) = self.config_pool
&& let Ok(new_configs) = Self::load_storage_configs(pool).await
{
let mut configs = self.storage_configs.write().await;
*configs = new_configs;
}This issues a DB query and takes a write lock on every 8.
9. Bloom filters enabled globally with fixed NDV .set_bloom_filter_enabled(!self.config.parquet.timefusion_bloom_filter_disabled)
.set_bloom_filter_ndv(100_000)Applying bloom filters with a fixed NDV of 100k to every column — including high-cardinality columns (UUIDs, full-text fields) and array/struct columns — will significantly increase Parquet file sizes without benefit for those column types. Consider restricting to known low-to-medium cardinality key columns. Design / Architecture Concerns10. Dual Variant→JSON conversion paths may interact unexpectedly The PR introduces two separate mechanisms for converting Variant to JSON on SELECT:
If both are active on the same query, a Variant column could be double-converted, potentially causing a runtime panic (the physical exec expects a 11. fn delta_session_from(session: &SessionState) -> Arc<dyn Session> {
Arc::new(
SessionStateBuilder::new()
.with_config(session.config().clone())
.with_runtime_env(session.runtime_env().clone())
.with_default_features()
.with_query_planner(deltalake::delta_datafusion::planner::DeltaPlanner::new())
.build(),
)
}This builds a fresh session without the registered UDFs (JSON functions, Variant UDFs), catalog providers, or other extensions. If delta-rs internally plans the predicate using those functions, this will fail silently or produce incorrect DELETE/UPDATE results. Consider building this session once at startup, or finding a way to extend the existing session state with only the planner swapped. 12. The filter pruning only recognizes flat Minor / Nits
DocumentationThe three architecture docs (
|
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsOverall this is a solid PR with well-motivated changes — the unified storage model is a meaningful architectural improvement and the CriticalIncomplete analyzer rules list breaks
let analyzer_rules = vec![
ResolveGroupingFunction,
VariantInsertRewriter,
TypeCoercion,
VariantSelectRewriter,
];DataFusion's default set also includes
This directly explains the test regression in // Verify we can query specific columns (SELECT * fails due to Variant column encoding)The failure is not a Variant encoding issue — it is the missing wildcard expansion rule. The fix is to insert the custom rules into the default list rather than replacing it entirely, e.g. by building around BugsVersion tracking for unified tables is always stale In versions.get(&("".to_string(), table_name.to_string())).cloned()But when data is written, the version is recorded under the actual project ID (e.g. Wrong path set in
unsafe { std::env::set_var("WALRUS_DATA_DIR", &cfg.core.timefusion_data_dir) };But WAL unsupported-version error message is misleading
Err(WalError::UnsupportedVersion {
version: data[4],
expected: WAL_VERSION_IPC, // always says "expected 129"
})If a truly unknown version appears, the message incorrectly implies v129 was the only valid option. Consider a message string like Design ConcernsIn-place compaction during reads
let mut batches = bucket.batches.lock();
if batches.len() > 1 {
if let Ok(single) = concat_batches(...) {
batches.clear();
batches.push(single);
}
}Read operations mutating shared state is surprising. A failed compaction (silently ignored via Hardcoded timestamp column name in filter pushdown
let is_ts = matches!(left.as_ref(), Expr::Column(c) if c.name == "timestamp");This silently skips pruning for tables where the timestamp column has a different name, and won't match qualified column references like Performance Notes8x batch size increase
Bloom filters enabled by default Bloom filters add write overhead and increase Parquet file size. The Minor
|
PR Review: Add Variant SELECT rewriter and comprehensive architecture docsOverall this is a well-structured PR that closes an important gap — Variant columns now serialize correctly through the PostgreSQL wire protocol in both INSERT and SELECT paths. The documentation additions are thorough and the config consolidation is clean. A few areas worth addressing before merge are noted below. Potential Bugs1. In This means parameterized INSERTs like 2. In 3. Potential byte-index mismatch in In let lower = query.to_lowercase();
let table_end = lower.find('(').or_else(|| lower.find("values")).unwrap_or(lower.len());
let table_part = query[..table_end].trim_end();
Code Quality & Maintainability4. Dual conversion paths for SELECT may cause confusion The PR adds This is defensible as belt-and-suspenders, but a comment explaining the intentional layering would prevent future maintainers from removing one layer thinking it is dead code. 5. Duplicate type compatibility logic
6. The Performance7. New In Correctness8. Schema partition change in Adding
Is there a migration strategy for existing deployments? If this targets greenfield deployments only, that should be documented clearly. 9.
Test Coverage10. No unit tests for
The integration test delta (+10/-5 lines) is minimal for a feature touching the full INSERT/SELECT round-trip. A test exercising Minor
|
Summary
variant_to_json()in SELECT projections, ensuring proper serialization for PostgreSQL wire protocolDocumentation Added
docs/ARCHITECTURE.md- Full system architecture overview with diagramsdocs/VARIANT_TYPE_SYSTEM.md- Variant type implementation->and->>)docs/WAL.md- Write-ahead log implementationCode Changes
src/optimizers/variant_select_rewriter.rs- New analyzer rulesrc/database.rs- Unified table storage improvementssrc/dml.rs- Buffered layer integrationTest plan
cargo testfor unit testscargo test --test integration_testfor integration tests