Skip to content

Commit 25b19bd

Browse files
committed
fix: support schema and catalog for adbc_scan_table
1 parent 2ec2ef1 commit 25b19bd

File tree

3 files changed

+105
-13
lines changed

3 files changed

+105
-13
lines changed

docs/README.md

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,13 +276,15 @@ Scans an entire table by name and returns all rows. This function provides advan
276276
- **Column statistics**: Distinct count, null count, and min/max values are provided to the query optimizer (when available from the driver via `AdbcConnectionGetStatistics`)
277277

278278
```sql
279-
adbc_scan_table(connection_id, table_name, [batch_size := N]) -> TABLE
279+
adbc_scan_table(connection_id, table_name, [catalog := ...], [schema := ...], [batch_size := N]) -> TABLE
280280
```
281281

282282
**Parameters:**
283283

284284
- `connection_id`: Connection handle from `adbc_connect`
285285
- `table_name`: Name of the table to scan
286+
- `catalog` (optional): Catalog containing the table (database-specific, e.g., database name in some systems)
287+
- `schema` (optional): Schema containing the table (e.g., `'public'` in PostgreSQL)
286288
- `batch_size` (optional): Hint to the driver for how many rows to return per batch (default: driver-specific)
287289

288290
**Returns:** A table with all columns from the specified table.
@@ -293,6 +295,12 @@ adbc_scan_table(connection_id, table_name, [batch_size := N]) -> TABLE
293295
-- Scan an entire table
294296
SELECT * FROM adbc_scan_table(getvariable('conn')::BIGINT, 'users');
295297

298+
-- Scan a table with schema qualification (e.g., PostgreSQL)
299+
SELECT * FROM adbc_scan_table(getvariable('conn')::BIGINT, 'users', schema := 'public');
300+
301+
-- Scan a table with full catalog.schema.table qualification
302+
SELECT * FROM adbc_scan_table(getvariable('conn')::BIGINT, 'users', catalog := 'mydb', schema := 'sales');
303+
296304
-- With batch size hint
297305
SELECT * FROM adbc_scan_table(getvariable('conn')::BIGINT, 'large_table', batch_size := 65536);
298306

src/adbc_scan.cpp

Lines changed: 50 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,10 @@ struct AdbcScanBindData : public TableFunctionData {
6464
bool has_params = false;
6565
// Batch size hint for the driver (0 = use driver default)
6666
idx_t batch_size = 0;
67-
// For adbc_scan_table: store table name and column names for projection pushdown
68-
string table_name; // Empty for adbc_scan, set for adbc_scan_table
67+
// For adbc_scan_table: store catalog, schema, table name and column names for projection pushdown
68+
string catalog_name; // Optional catalog (empty = default)
69+
string schema_name; // Optional schema (empty = default)
70+
string table_name; // Empty for adbc_scan, set for adbc_scan_table
6971
vector<string> all_column_names; // All columns in the table (for building projected query)
7072
// Cached row count from ADBC statistics (for cardinality estimation)
7173
idx_t estimated_row_count = 0;
@@ -192,15 +194,20 @@ static string GetColumnName(ArrowArrayView *column_name_view, int64_t row_idx) {
192194

193195
// Fetch all statistics (table-level and column-level) from ADBC
194196
// Populates bind_data with row count and column statistics
195-
static void TryGetStatisticsFromADBC(AdbcConnectionWrapper &connection, const string &table_name,
197+
static void TryGetStatisticsFromADBC(AdbcConnectionWrapper &connection, const string &catalog,
198+
const string &schema, const string &table_name,
196199
AdbcScanBindData &bind_data) {
197200
ArrowArrayStream stream;
198201
memset(&stream, 0, sizeof(stream));
199202

203+
// Convert empty strings to nullptr for ADBC API
204+
const char *catalog_ptr = catalog.empty() ? nullptr : catalog.c_str();
205+
const char *schema_ptr = schema.empty() ? nullptr : schema.c_str();
206+
200207
// Try to get statistics (approximate is fine)
201208
bool got_stats = false;
202209
try {
203-
got_stats = connection.GetStatistics(nullptr, nullptr, table_name.c_str(), 1, &stream);
210+
got_stats = connection.GetStatistics(catalog_ptr, schema_ptr, table_name.c_str(), 1, &stream);
204211
} catch (...) {
205212
return; // Statistics not supported - that's okay
206213
}
@@ -700,6 +707,19 @@ static InsertionOrderPreservingMap<string> AdbcScanToString(TableFunctionToStrin
700707
// ============================================================================
701708

702709
// Bind function for adbc_scan_table - similar to AdbcScanBind but takes table name instead of query
710+
// Helper to build a fully qualified table name with proper quoting
711+
static string BuildQualifiedTableName(const string &catalog, const string &schema, const string &table) {
712+
string result;
713+
if (!catalog.empty()) {
714+
result += "\"" + catalog + "\".";
715+
}
716+
if (!schema.empty()) {
717+
result += "\"" + schema + "\".";
718+
}
719+
result += "\"" + table + "\"";
720+
return result;
721+
}
722+
703723
static unique_ptr<FunctionData> AdbcScanTableBind(ClientContext &context, TableFunctionBindInput &input,
704724
vector<LogicalType> &return_types, vector<string> &names) {
705725
auto bind_data = make_uniq<AdbcScanBindData>();
@@ -710,9 +730,21 @@ static unique_ptr<FunctionData> AdbcScanTableBind(ClientContext &context, TableF
710730
// Get table name from second argument
711731
bind_data->table_name = input.inputs[1].GetValue<string>();
712732

713-
// Construct a SELECT * FROM table_name query for schema discovery
714-
// Quote the table name to handle special characters and reserved words
715-
bind_data->query = "SELECT * FROM \"" + bind_data->table_name + "\"";
733+
// Check for catalog named parameter
734+
auto catalog_it = input.named_parameters.find("catalog");
735+
if (catalog_it != input.named_parameters.end() && !catalog_it->second.IsNull()) {
736+
bind_data->catalog_name = catalog_it->second.GetValue<string>();
737+
}
738+
739+
// Check for schema named parameter
740+
auto schema_it = input.named_parameters.find("schema");
741+
if (schema_it != input.named_parameters.end() && !schema_it->second.IsNull()) {
742+
bind_data->schema_name = schema_it->second.GetValue<string>();
743+
}
744+
745+
// Construct a SELECT * FROM [catalog.][schema.]table_name query for schema discovery
746+
string qualified_name = BuildQualifiedTableName(bind_data->catalog_name, bind_data->schema_name, bind_data->table_name);
747+
bind_data->query = "SELECT * FROM " + qualified_name;
716748

717749
// Check for batch_size named parameter
718750
auto batch_size_it = input.named_parameters.find("batch_size");
@@ -809,7 +841,8 @@ static unique_ptr<FunctionData> AdbcScanTableBind(ClientContext &context, TableF
809841
bind_data->return_types = return_types;
810842

811843
// Try to get statistics from ADBC (row count and column statistics)
812-
TryGetStatisticsFromADBC(*bind_data->connection, bind_data->table_name, *bind_data);
844+
TryGetStatisticsFromADBC(*bind_data->connection, bind_data->catalog_name, bind_data->schema_name,
845+
bind_data->table_name, *bind_data);
813846

814847
return std::move(bind_data);
815848
}
@@ -857,7 +890,9 @@ static unique_ptr<GlobalTableFunctionState> AdbcScanTableInitGlobal(ClientContex
857890
first = false;
858891
}
859892
}
860-
query += " FROM \"" + bind_data.table_name + "\"";
893+
// Use fully qualified table name
894+
string qualified_name = BuildQualifiedTableName(bind_data.catalog_name, bind_data.schema_name, bind_data.table_name);
895+
query += " FROM " + qualified_name;
861896
} else {
862897
// Use SELECT * as no projection needed
863898
query = bind_data.query;
@@ -1205,7 +1240,9 @@ void RegisterAdbcTableFunctions(DatabaseInstance &db) {
12051240
TableFunction adbc_scan_table_function("adbc_scan_table", {LogicalType::BIGINT, LogicalType::VARCHAR},
12061241
AdbcScanTableFunction, AdbcScanTableBind, AdbcScanTableInitGlobal, AdbcScanTableInitLocal);
12071242

1208-
// Add named parameter for batch size hint (driver-specific, best-effort)
1243+
// Add named parameters for catalog, schema, and batch size
1244+
adbc_scan_table_function.named_parameters["catalog"] = LogicalType::VARCHAR;
1245+
adbc_scan_table_function.named_parameters["schema"] = LogicalType::VARCHAR;
12091246
adbc_scan_table_function.named_parameters["batch_size"] = LogicalType::BIGINT;
12101247

12111248
// Enable projection pushdown and filter pushdown
@@ -1221,9 +1258,10 @@ void RegisterAdbcTableFunctions(DatabaseInstance &db) {
12211258
CreateTableFunctionInfo scan_table_info(adbc_scan_table_function);
12221259
FunctionDescription scan_table_desc;
12231260
scan_table_desc.description = "Scan an entire table from an ADBC connection";
1224-
scan_table_desc.parameter_names = {"connection_handle", "table_name", "batch_size"};
1225-
scan_table_desc.parameter_types = {LogicalType::BIGINT, LogicalType::VARCHAR, LogicalType::BIGINT};
1261+
scan_table_desc.parameter_names = {"connection_handle", "table_name", "catalog", "schema", "batch_size"};
1262+
scan_table_desc.parameter_types = {LogicalType::BIGINT, LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::BIGINT};
12261263
scan_table_desc.examples = {"SELECT * FROM adbc_scan_table(conn, 'users')",
1264+
"SELECT * FROM adbc_scan_table(conn, 'users', schema := 'public')",
12271265
"SELECT * FROM adbc_scan_table(conn, 'large_table', batch_size := 65536)"};
12281266
scan_table_desc.categories = {"adbc"};
12291267
scan_table_info.descriptions.push_back(std::move(scan_table_desc));

test/sql/adbc_scan_table.test

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,52 @@ EXPLAIN SELECT * FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'filter_te
448448
statement ok
449449
EXPLAIN ANALYZE SELECT id, name FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'filter_test') WHERE value > 25.0;
450450

451+
# ============================================
452+
# Schema/Catalog Parameter Tests
453+
# ============================================
454+
455+
# SQLite uses 'main' as the default catalog (database name)
456+
# Test with explicit catalog parameter
457+
query ITR
458+
SELECT * FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'employees', catalog := 'main') ORDER BY id;
459+
----
460+
1 Alice 75000.0
461+
2 Bob 85000.0
462+
3 Charlie 95000.0
463+
464+
# Test with NULL catalog (should use default)
465+
query I
466+
SELECT COUNT(*) FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'employees', catalog := NULL);
467+
----
468+
3
469+
470+
# Test with NULL schema (should use default)
471+
query I
472+
SELECT COUNT(*) FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'employees', schema := NULL);
473+
----
474+
3
475+
476+
# Test with both catalog and schema as NULL
477+
query I
478+
SELECT COUNT(*) FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'employees', catalog := NULL, schema := NULL);
479+
----
480+
3
481+
482+
# Test projection pushdown with catalog parameter
483+
query IT
484+
SELECT id, name FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'employees', catalog := 'main') ORDER BY id;
485+
----
486+
1 Alice
487+
2 Bob
488+
3 Charlie
489+
490+
# Test filter pushdown with catalog parameter
491+
query IT
492+
SELECT id, name FROM adbc_scan_table(getvariable('conn_id')::BIGINT, 'employees', catalog := 'main') WHERE id > 1 ORDER BY id;
493+
----
494+
2 Bob
495+
3 Charlie
496+
451497
# Clean up
452498
statement ok
453499
SELECT adbc_disconnect(getvariable('conn_id')::BIGINT);

0 commit comments

Comments
 (0)