Skip to content

Commit e6efc12

Browse files
committed
feat: first shot at catalog ATTACH
feat: add batch_size parameter to ATTACH, null validation across all functions, update docs fix: add create schema support
1 parent 53518a5 commit e6efc12

31 files changed

+1897
-18
lines changed

CLAUDE.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,39 @@ A manifest file is a TOML file (e.g., `sqlite.toml`) containing driver metadata
100100
- `adbc_columns(handle, [table_name := ...])` - Returns column metadata (name, type, ordinal position, nullability).
101101
- `adbc_schema(handle, table_name)` - Returns the Arrow schema for a specific table (field names, Arrow types, nullability).
102102

103+
### Storage Extension (ATTACH)
104+
105+
The extension also provides a storage extension that allows attaching ADBC data sources as DuckDB databases. This enables querying remote tables using standard SQL syntax without explicit function calls.
106+
107+
```sql
108+
-- Attach an ADBC data source
109+
ATTACH 'path/to/database.db' AS my_db (TYPE adbc, driver 'sqlite');
110+
111+
-- Query tables directly
112+
SELECT * FROM my_db.my_table;
113+
```
114+
115+
**ATTACH options:**
116+
- `driver` (required) - Driver name, path to shared library, or manifest name
117+
- `entrypoint` - Custom entry point function name
118+
- `search_paths` - Additional paths to search for driver manifests
119+
- `use_manifests` - Enable/disable manifest search (default: 'true')
120+
- `batch_size` - Hint for number of rows per batch when scanning tables (default: driver-specific). Larger batch sizes can reduce network round-trips for remote databases.
121+
- Other options are passed directly to the ADBC driver (e.g., `username`, `password`)
122+
123+
**Examples:**
124+
```sql
125+
-- Attach SQLite database
126+
ATTACH '/path/to/mydb.sqlite' AS sqlite_db (TYPE adbc, driver 'sqlite');
127+
128+
-- Attach with custom batch size (useful for network databases)
129+
ATTACH 'postgresql://localhost/mydb' AS pg_db (TYPE adbc, driver 'postgresql', batch_size 65536);
130+
131+
-- Query attached tables
132+
SELECT * FROM pg_db.public.users WHERE id > 100;
133+
SELECT COUNT(*) FROM sqlite_db.main.orders;
134+
```
135+
103136
### Example Usage
104137

105138
```sql

CMakeLists.txt

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,16 @@ set(EXTENSION_SOURCES
3232
src/adbc_secrets.cpp
3333
src/adbc_filter_pushdown.cpp
3434
src/query_farm_telemetry.cpp
35+
src/storage/adbc_catalog.cpp
36+
src/storage/adbc_catalog_set.cpp
37+
src/storage/adbc_schema_set.cpp
38+
src/storage/adbc_schema_entry.cpp
39+
src/storage/adbc_table_set.cpp
40+
src/storage/adbc_table_entry.cpp
41+
src/storage/adbc_transaction.cpp
42+
src/storage/adbc_transaction_manager.cpp
43+
src/storage/adbc_storage.cpp
44+
src/storage/adbc_clear_cache.cpp
3545
)
3646

3747
build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES})

docs/README.md

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -726,6 +726,83 @@ DROP PERSISTENT SECRET my_postgres;
726726
- Persistent secrets are stored encrypted on disk
727727
- Secrets are scoped to the current DuckDB connection/session
728728

729+
## Storage Extension (ATTACH)
730+
731+
The ADBC Scanner extension provides a storage extension that allows you to attach ADBC data sources as DuckDB databases. This enables querying remote tables using standard SQL syntax without explicit function calls.
732+
733+
### Basic Usage
734+
735+
```sql
736+
-- Attach an ADBC data source
737+
ATTACH '/path/to/database.db' AS my_db (TYPE adbc, driver 'sqlite');
738+
739+
-- Query tables directly using standard SQL
740+
SELECT * FROM my_db.my_table;
741+
SELECT * FROM my_db.main.users WHERE id > 100;
742+
743+
-- Detach when done
744+
DETACH my_db;
745+
```
746+
747+
### ATTACH Options
748+
749+
| Option | Required | Description |
750+
|--------|----------|-------------|
751+
| `TYPE` | Yes | Must be `adbc` |
752+
| `driver` | Yes | Driver name (e.g., `'sqlite'`, `'postgresql'`), path to shared library, or manifest name |
753+
| `entrypoint` | No | Custom driver entry point function name |
754+
| `search_paths` | No | Additional paths to search for driver manifests |
755+
| `use_manifests` | No | Enable/disable manifest search (default: `'true'`) |
756+
| `batch_size` | No | Hint for number of rows per batch when scanning tables (default: driver-specific). Larger batch sizes can reduce network round-trips for remote databases. |
757+
758+
Any other options are passed directly to the ADBC driver (e.g., `username`, `password`).
759+
760+
### Examples
761+
762+
```sql
763+
-- Attach a SQLite database
764+
ATTACH '/path/to/mydb.sqlite' AS sqlite_db (TYPE adbc, driver 'sqlite');
765+
766+
-- Attach PostgreSQL with credentials
767+
ATTACH 'postgresql://localhost/mydb' AS pg_db (
768+
TYPE adbc,
769+
driver 'postgresql',
770+
username 'user',
771+
password 'secret'
772+
);
773+
774+
-- Attach with custom batch size (useful for network databases)
775+
ATTACH 'postgresql://localhost/mydb' AS pg_db (
776+
TYPE adbc,
777+
driver 'postgresql',
778+
batch_size 65536
779+
);
780+
781+
-- Query attached databases
782+
SELECT * FROM pg_db.public.users WHERE active = true;
783+
SELECT COUNT(*) FROM sqlite_db.main.orders;
784+
785+
-- Join tables from different attached databases
786+
SELECT u.name, o.total
787+
FROM pg_db.public.users u
788+
JOIN sqlite_db.main.orders o ON u.id = o.user_id;
789+
```
790+
791+
### Features
792+
793+
When querying attached ADBC tables, the following optimizations are automatically applied:
794+
795+
- **Projection pushdown**: Only requested columns are fetched from the remote database
796+
- **Filter pushdown**: WHERE clauses are pushed to the remote database with parameter binding
797+
- **Cardinality estimation**: Row count statistics are used for query planning
798+
- **Progress reporting**: Scan progress is reported based on estimated row counts
799+
800+
### Limitations
801+
802+
- Attached ADBC databases are read-only; INSERT, UPDATE, and DELETE operations are not supported through the ATTACH interface (use `adbc_execute` instead)
803+
- Schema creation and modification are not supported
804+
- The connection remains open while the database is attached
805+
729806
## ADBC Drivers
730807

731808
ADBC drivers are available for many databases. When using driver manifests (see below), you can reference drivers by their short name:

src/adbc_catalog.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ static unique_ptr<FunctionData> AdbcInfoBind(ClientContext &context, TableFuncti
6060
vector<LogicalType> &return_types, vector<string> &names) {
6161
auto bind_data = make_uniq<AdbcInfoBindData>();
6262

63+
// Check for NULL connection handle
64+
if (input.inputs[0].IsNull()) {
65+
throw InvalidInputException("adbc_info: Connection handle cannot be NULL");
66+
}
67+
6368
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
6469

6570
auto &registry = ConnectionRegistry::Get();
@@ -344,6 +349,11 @@ static unique_ptr<FunctionData> AdbcTablesBind(ClientContext &context, TableFunc
344349
vector<LogicalType> &return_types, vector<string> &names) {
345350
auto bind_data = make_uniq<AdbcTablesBindData>();
346351

352+
// Check for NULL connection handle
353+
if (input.inputs[0].IsNull()) {
354+
throw InvalidInputException("adbc_tables: Connection handle cannot be NULL");
355+
}
356+
347357
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
348358

349359
// Check for optional filter parameters
@@ -497,6 +507,11 @@ static unique_ptr<FunctionData> AdbcTableTypesBind(ClientContext &context, Table
497507
vector<LogicalType> &return_types, vector<string> &names) {
498508
auto bind_data = make_uniq<AdbcTableTypesBindData>();
499509

510+
// Check for NULL connection handle
511+
if (input.inputs[0].IsNull()) {
512+
throw InvalidInputException("adbc_table_types: Connection handle cannot be NULL");
513+
}
514+
500515
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
501516

502517
auto &registry = ConnectionRegistry::Get();
@@ -809,6 +824,11 @@ static unique_ptr<FunctionData> AdbcColumnsBind(ClientContext &context, TableFun
809824
vector<LogicalType> &return_types, vector<string> &names) {
810825
auto bind_data = make_uniq<AdbcColumnsBindData>();
811826

827+
// Check for NULL connection handle
828+
if (input.inputs[0].IsNull()) {
829+
throw InvalidInputException("adbc_columns: Connection handle cannot be NULL");
830+
}
831+
812832
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
813833

814834
// Check for optional filter parameters
@@ -1131,7 +1151,18 @@ static unique_ptr<FunctionData> AdbcSchemaBind(ClientContext &context, TableFunc
11311151
vector<LogicalType> &return_types, vector<string> &names) {
11321152
auto bind_data = make_uniq<AdbcSchemaBindData>();
11331153

1154+
// Check for NULL connection handle
1155+
if (input.inputs[0].IsNull()) {
1156+
throw InvalidInputException("adbc_schema: Connection handle cannot be NULL");
1157+
}
1158+
11341159
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
1160+
1161+
// Check for NULL table name
1162+
if (input.inputs[1].IsNull()) {
1163+
throw InvalidInputException("adbc_schema: Table name cannot be NULL");
1164+
}
1165+
11351166
bind_data->table_name = input.inputs[1].GetValue<string>();
11361167

11371168
// Check for optional filter parameters

src/adbc_scan.cpp

Lines changed: 77 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -362,9 +362,19 @@ static unique_ptr<FunctionData> AdbcScanBind(ClientContext &context, TableFuncti
362362
vector<LogicalType> &return_types, vector<string> &names) {
363363
auto bind_data = make_uniq<AdbcScanBindData>();
364364

365+
// Check for NULL connection handle
366+
if (input.inputs[0].IsNull()) {
367+
throw InvalidInputException("adbc_scan: Connection handle cannot be NULL");
368+
}
369+
365370
// Get connection ID from first argument
366371
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
367372

373+
// Check for NULL query
374+
if (input.inputs[1].IsNull()) {
375+
throw InvalidInputException("adbc_scan: Query cannot be NULL");
376+
}
377+
368378
// Get SQL query from second argument
369379
bind_data->query = input.inputs[1].GetValue<string>();
370380

@@ -724,9 +734,19 @@ static unique_ptr<FunctionData> AdbcScanTableBind(ClientContext &context, TableF
724734
vector<LogicalType> &return_types, vector<string> &names) {
725735
auto bind_data = make_uniq<AdbcScanBindData>();
726736

737+
// Check for NULL connection handle
738+
if (input.inputs[0].IsNull()) {
739+
throw InvalidInputException("adbc_scan_table: Connection handle cannot be NULL");
740+
}
741+
727742
// Get connection ID from first argument
728743
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
729744

745+
// Check for NULL table name
746+
if (input.inputs[1].IsNull()) {
747+
throw InvalidInputException("adbc_scan_table: Table name cannot be NULL");
748+
}
749+
730750
// Get table name from second argument
731751
bind_data->table_name = input.inputs[1].GetValue<string>();
732752

@@ -861,17 +881,30 @@ static unique_ptr<GlobalTableFunctionState> AdbcScanTableInitGlobal(ClientContex
861881
// If we have column_ids and they're a subset of all columns, build a projected query
862882
string query;
863883
bool needs_projection = false;
864-
if (!bind_data.table_name.empty() && !input.column_ids.empty()) {
884+
885+
// Count how many valid column IDs we have (excluding special IDs like COLUMN_IDENTIFIER_ROW_ID)
886+
idx_t valid_column_count = 0;
887+
for (auto col_id : input.column_ids) {
888+
if (col_id < bind_data.all_column_names.size()) {
889+
valid_column_count++;
890+
}
891+
}
892+
893+
if (!bind_data.table_name.empty() && valid_column_count > 0) {
865894
// Check if we need all columns or just a subset
866-
needs_projection = input.column_ids.size() < bind_data.all_column_names.size();
895+
needs_projection = valid_column_count < bind_data.all_column_names.size();
867896

868897
// Also check if columns are in order and consecutive from 0
869898
// If column_ids = [0, 1, 2, ...] matching all_column_names size, no projection needed
870899
if (!needs_projection) {
871-
for (idx_t i = 0; i < input.column_ids.size(); i++) {
872-
if (input.column_ids[i] != i) {
873-
needs_projection = true;
874-
break;
900+
idx_t expected_idx = 0;
901+
for (auto col_id : input.column_ids) {
902+
if (col_id < bind_data.all_column_names.size()) {
903+
if (col_id != expected_idx) {
904+
needs_projection = true;
905+
break;
906+
}
907+
expected_idx++;
875908
}
876909
}
877910
}
@@ -898,7 +931,8 @@ static unique_ptr<GlobalTableFunctionState> AdbcScanTableInitGlobal(ClientContex
898931
query = bind_data.query;
899932
}
900933
} else {
901-
// Fall back to original query
934+
// No valid columns requested (e.g., count(*)) or empty column_ids - use SELECT *
935+
// We need to select something, so fall back to original query
902936
query = bind_data.query;
903937
}
904938

@@ -1359,26 +1393,38 @@ static void AdbcExecuteFunction(DataChunk &args, ExpressionState &state, Vector
13591393
// Handle constant input (for constant folding optimization)
13601394
if (conn_vector.GetVectorType() == VectorType::CONSTANT_VECTOR &&
13611395
query_vector.GetVectorType() == VectorType::CONSTANT_VECTOR) {
1362-
if (ConstantVector::IsNull(conn_vector) || ConstantVector::IsNull(query_vector)) {
1363-
result.SetVectorType(VectorType::CONSTANT_VECTOR);
1364-
ConstantVector::SetNull(result, true);
1365-
} else {
1366-
auto connection_id = conn_vector.GetValue(0).GetValue<int64_t>();
1367-
auto query = query_vector.GetValue(0).GetValue<string>();
1368-
auto rows_affected = ExecuteStatement(connection_id, query);
1369-
result.SetVectorType(VectorType::CONSTANT_VECTOR);
1370-
ConstantVector::GetData<int64_t>(result)[0] = rows_affected;
1396+
if (ConstantVector::IsNull(conn_vector)) {
1397+
throw InvalidInputException("adbc_execute: Connection handle cannot be NULL");
1398+
}
1399+
if (ConstantVector::IsNull(query_vector)) {
1400+
throw InvalidInputException("adbc_execute: Query cannot be NULL");
13711401
}
1402+
auto connection_id = conn_vector.GetValue(0).GetValue<int64_t>();
1403+
auto query = query_vector.GetValue(0).GetValue<string>();
1404+
auto rows_affected = ExecuteStatement(connection_id, query);
1405+
result.SetVectorType(VectorType::CONSTANT_VECTOR);
1406+
ConstantVector::GetData<int64_t>(result)[0] = rows_affected;
13721407
return;
13731408
}
13741409

13751410
// Handle flat/dictionary vectors
13761411
result.SetVectorType(VectorType::FLAT_VECTOR);
13771412
auto result_data = FlatVector::GetData<int64_t>(result);
1413+
auto &validity = FlatVector::Validity(result);
13781414

13791415
for (idx_t row_idx = 0; row_idx < count; row_idx++) {
1380-
auto connection_id = conn_vector.GetValue(row_idx).GetValue<int64_t>();
1381-
auto query = query_vector.GetValue(row_idx).GetValue<string>();
1416+
auto conn_value = conn_vector.GetValue(row_idx);
1417+
auto query_value = query_vector.GetValue(row_idx);
1418+
1419+
if (conn_value.IsNull()) {
1420+
throw InvalidInputException("adbc_execute: Connection handle cannot be NULL");
1421+
}
1422+
if (query_value.IsNull()) {
1423+
throw InvalidInputException("adbc_execute: Query cannot be NULL");
1424+
}
1425+
1426+
auto connection_id = conn_value.GetValue<int64_t>();
1427+
auto query = query_value.GetValue<string>();
13821428
result_data[row_idx] = ExecuteStatement(connection_id, query);
13831429
}
13841430
}
@@ -1394,6 +1440,8 @@ void RegisterAdbcExecuteFunction(DatabaseInstance &db) {
13941440
AdbcExecuteFunction,
13951441
AdbcExecuteBind
13961442
);
1443+
// Disable automatic NULL propagation so we can throw a meaningful error
1444+
adbc_execute_function.null_handling = FunctionNullHandling::SPECIAL_HANDLING;
13971445

13981446
CreateScalarFunctionInfo info(adbc_execute_function);
13991447
FunctionDescription desc;
@@ -1537,8 +1585,19 @@ static unique_ptr<FunctionData> AdbcInsertBind(ClientContext &context, TableFunc
15371585
vector<LogicalType> &return_types, vector<string> &names) {
15381586
auto bind_data = make_uniq<AdbcInsertBindData>();
15391587

1588+
// Check for NULL connection handle
1589+
if (input.inputs[0].IsNull()) {
1590+
throw InvalidInputException("adbc_insert: Connection handle cannot be NULL");
1591+
}
1592+
15401593
// First argument is connection handle
15411594
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
1595+
1596+
// Check for NULL table name
1597+
if (input.inputs[1].IsNull()) {
1598+
throw InvalidInputException("adbc_insert: Target table name cannot be NULL");
1599+
}
1600+
15421601
// Second argument is target table name
15431602
bind_data->target_table = input.inputs[1].GetValue<string>();
15441603

src/adbc_scanner_extension.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@
33
#include "adbc_scanner_extension.hpp"
44
#include "adbc_functions.hpp"
55
#include "adbc_secrets.hpp"
6+
#include "storage/adbc_storage.hpp"
67
#include "duckdb.hpp"
78
#include "duckdb/common/exception.hpp"
9+
#include "duckdb/main/config.hpp"
810
#include "query_farm_telemetry.hpp"
911

1012
namespace duckdb {
@@ -28,6 +30,13 @@ static void LoadInternal(ExtensionLoader &loader) {
2830
// Register ADBC insert function (adbc_insert for bulk ingestion)
2931
adbc::RegisterAdbcInsertFunction(loader.GetDatabaseInstance());
3032

33+
// Register ADBC clear cache function
34+
RegisterAdbcClearCacheFunction(loader.GetDatabaseInstance());
35+
36+
// Register ADBC storage extension for ATTACH support
37+
auto &config = DBConfig::GetConfig(loader.GetDatabaseInstance());
38+
config.storage_extensions["adbc"] = make_uniq<AdbcStorageExtension>();
39+
3140
QueryFarmSendTelemetry(loader, "adbc", "2025120801");
3241
}
3342

0 commit comments

Comments
 (0)