Skip to content

Commit 90054dc

Browse files
authored
feat: [sqlite] Delegate supports_filters_pushdown to read provider (#454)
* feat: [sqlite] Delegate supports_filters_pushdown to read provider * Formatting
1 parent 409868a commit 90054dc

File tree

1 file changed

+133
-0
lines changed

1 file changed

+133
-0
lines changed

core/src/sqlite/write.rs

Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,22 @@ impl TableProvider for SqliteTableWriter {
6666
Some(self.sqlite.constraints())
6767
}
6868

69+
fn supports_filters_pushdown(
70+
&self,
71+
filters: &[&Expr],
72+
) -> datafusion::error::Result<Vec<datafusion::logical_expr::TableProviderFilterPushDown>> {
73+
// Verify schema consistency before delegating
74+
// This is a cheap check since it's just comparing Arc<Schema> pointers
75+
if self.read_provider.schema() != self.schema() {
76+
tracing::warn!(
77+
"Schema mismatch detected in SqliteTableWriter for table {}",
78+
self.sqlite.table_name()
79+
);
80+
}
81+
82+
self.read_provider.supports_filters_pushdown(filters)
83+
}
84+
6985
async fn scan(
7086
&self,
7187
state: &dyn Session,
@@ -320,4 +336,121 @@ mod tests {
320336
.await
321337
.expect("insert successful");
322338
}
339+
340+
#[tokio::test]
341+
async fn test_filter_pushdown_support() {
342+
use datafusion::logical_expr::{col, lit, TableProviderFilterPushDown};
343+
344+
let schema = Arc::new(Schema::new(vec![
345+
datafusion::arrow::datatypes::Field::new("id", DataType::Int64, false),
346+
datafusion::arrow::datatypes::Field::new("name", DataType::Utf8, false),
347+
]));
348+
let df_schema = ToDFSchema::to_dfschema_ref(Arc::clone(&schema)).expect("df schema");
349+
let external_table = CreateExternalTable {
350+
schema: df_schema,
351+
name: TableReference::bare("test_filter_table"),
352+
location: String::new(),
353+
file_type: String::new(),
354+
table_partition_cols: vec![],
355+
if_not_exists: true,
356+
definition: None,
357+
order_exprs: vec![],
358+
unbounded: false,
359+
options: HashMap::new(),
360+
constraints: Constraints::default(),
361+
column_defaults: HashMap::default(),
362+
temporary: false,
363+
};
364+
let ctx = SessionContext::new();
365+
let table = SqliteTableProviderFactory::default()
366+
.create(&ctx.state(), &external_table)
367+
.await
368+
.expect("table should be created");
369+
370+
// Test that filter pushdown is supported
371+
let filter = col("id").gt(lit(10));
372+
let result = table
373+
.supports_filters_pushdown(&[&filter])
374+
.expect("should support filter pushdown");
375+
376+
assert_eq!(
377+
result,
378+
vec![TableProviderFilterPushDown::Exact],
379+
"Filter pushdown should be exact for simple comparison"
380+
);
381+
}
382+
383+
#[tokio::test]
384+
async fn test_concurrent_read_write_with_filter_pushdown() {
385+
use datafusion::logical_expr::{col, lit, TableProviderFilterPushDown};
386+
387+
let schema = Arc::new(Schema::new(vec![
388+
datafusion::arrow::datatypes::Field::new("id", DataType::Int64, false),
389+
datafusion::arrow::datatypes::Field::new("value", DataType::Int64, false),
390+
]));
391+
let df_schema = ToDFSchema::to_dfschema_ref(Arc::clone(&schema)).expect("df schema");
392+
393+
let external_table = CreateExternalTable {
394+
schema: df_schema,
395+
name: TableReference::bare("concurrent_test"),
396+
location: String::new(),
397+
file_type: String::new(),
398+
table_partition_cols: vec![],
399+
if_not_exists: true,
400+
definition: None,
401+
order_exprs: vec![],
402+
unbounded: false,
403+
options: HashMap::new(),
404+
constraints: Constraints::default(),
405+
column_defaults: HashMap::default(),
406+
temporary: false,
407+
};
408+
409+
let ctx = SessionContext::new();
410+
let table = SqliteTableProviderFactory::default()
411+
.create(&ctx.state(), &external_table)
412+
.await
413+
.expect("table should be created");
414+
415+
// Insert initial data
416+
let arr1 = Int64Array::from(vec![1, 2, 3]);
417+
let arr2 = Int64Array::from(vec![10, 20, 30]);
418+
let data = RecordBatch::try_new(Arc::clone(&schema), vec![Arc::new(arr1), Arc::new(arr2)])
419+
.expect("data should be created");
420+
421+
let exec = MockExec::new(vec![Ok(data)], Arc::clone(&schema));
422+
let insertion = table
423+
.insert_into(&ctx.state(), Arc::new(exec), InsertOp::Append)
424+
.await
425+
.expect("insertion should be successful");
426+
collect(insertion, ctx.task_ctx())
427+
.await
428+
.expect("insert successful");
429+
430+
// Verify filter pushdown works after insert
431+
let filter = col("id").gt(lit(1));
432+
let result = table
433+
.supports_filters_pushdown(&[&filter])
434+
.expect("should support filter pushdown");
435+
436+
assert_eq!(
437+
result,
438+
vec![TableProviderFilterPushDown::Exact],
439+
"Filter pushdown should be exact for simple comparison"
440+
);
441+
442+
// Verify we can actually scan with the filter
443+
let scan = table
444+
.scan(&ctx.state(), None, &[filter], None)
445+
.await
446+
.expect("scan should succeed");
447+
448+
let batches = collect(scan, ctx.task_ctx())
449+
.await
450+
.expect("collect should succeed");
451+
452+
assert!(!batches.is_empty(), "Should have results");
453+
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
454+
assert_eq!(total_rows, 2, "Should have 2 rows with id > 1");
455+
}
323456
}

0 commit comments

Comments
 (0)