Skip to content

Commit e5ed40b

Browse files
committed
feat: add adbc_insert for bulk ingestion
1 parent d4230d9 commit e5ed40b

File tree

7 files changed

+415
-0
lines changed

7 files changed

+415
-0
lines changed

CLAUDE.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ The extension provides the following functions:
2121
### Query Execution
2222
- `adbc_scan(handle, query, [params := row(...)])` - Execute a SELECT query and return results as a table. Supports parameterized queries via the optional `params` named parameter.
2323
- `adbc_execute(handle, query)` - Execute DDL/DML statements (CREATE, INSERT, UPDATE, DELETE). Returns affected row count.
24+
- `adbc_insert(handle, table_name, <table>, [mode := ...])` - Bulk insert data from a subquery. Modes: 'create', 'append', 'replace', 'create_append'.
2425

2526
### Catalog Functions
2627
- `adbc_info(handle)` - Returns driver/database information (vendor name, version, etc.).
@@ -45,6 +46,9 @@ SELECT * FROM adbc_scan(getvariable('conn')::BIGINT, 'SELECT ? AS value', params
4546
SELECT adbc_execute(getvariable('conn')::BIGINT, 'CREATE TABLE test (id INTEGER, name TEXT)');
4647
SELECT adbc_execute(getvariable('conn')::BIGINT, 'INSERT INTO test VALUES (1, ''hello'')');
4748

49+
-- Bulk insert from DuckDB query
50+
SELECT * FROM adbc_insert(getvariable('conn')::BIGINT, 'target', (SELECT * FROM local_table), mode := 'create');
51+
4852
-- Catalog functions
4953
SELECT * FROM adbc_info(getvariable('conn')::BIGINT);
5054
SELECT * FROM adbc_tables(getvariable('conn')::BIGINT);

docs/README.md

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,54 @@ SELECT adbc_execute(getvariable('conn')::BIGINT,
163163
'DELETE FROM products WHERE id = 2');
164164
```
165165

166+
### adbc_insert
167+
168+
Bulk insert data from a DuckDB query into a table via ADBC. This is more efficient than executing individual INSERT statements.
169+
170+
```sql
171+
adbc_insert(connection_id, table_name, <table>, [mode:=]) -> TABLE(rows_inserted BIGINT)
172+
```
173+
174+
**Parameters:**
175+
- `connection_id`: Connection handle from `adbc_connect`
176+
- `table_name`: Target table name in the remote database
177+
- `<table>`: A subquery providing the data to insert
178+
- `mode` (optional): Insert mode, one of:
179+
- `'create'`: Create the table; error if it exists (default for new tables)
180+
- `'append'`: Append to existing table; error if table doesn't exist
181+
- `'replace'`: Drop and recreate the table if it exists
182+
- `'create_append'`: Create if doesn't exist, append if it does
183+
184+
**Returns:** A table with a single row containing the number of rows inserted.
185+
186+
**Examples:**
187+
188+
```sql
189+
-- Create a new table and insert data
190+
SELECT * FROM adbc_insert(getvariable('conn')::BIGINT, 'users',
191+
(SELECT id, name, email FROM local_users),
192+
mode := 'create');
193+
194+
-- Append data to an existing table
195+
SELECT * FROM adbc_insert(getvariable('conn')::BIGINT, 'users',
196+
(SELECT id, name, email FROM new_users),
197+
mode := 'append');
198+
199+
-- Replace an existing table with new data
200+
SELECT * FROM adbc_insert(getvariable('conn')::BIGINT, 'users',
201+
(SELECT * FROM updated_users),
202+
mode := 'replace');
203+
```
204+
205+
Output:
206+
```
207+
┌───────────────┐
208+
│ rows_inserted │
209+
├───────────────┤
210+
│ 1000 │
211+
└───────────────┘
212+
```
213+
166214
### adbc_info
167215

168216
Returns driver and database information for a connection.

src/adbc_extension.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ static void LoadInternal(ExtensionLoader &loader) {
2121
// Register ADBC execute function (adbc_execute for DDL/DML)
2222
adbc::RegisterAdbcExecuteFunction(loader.GetDatabaseInstance());
2323

24+
// Register ADBC insert function (adbc_insert for bulk ingestion)
25+
adbc::RegisterAdbcInsertFunction(loader.GetDatabaseInstance());
26+
2427
QueryFarmSendTelemetry(loader, "adbc", "2025120801");
2528
}
2629

src/adbc_scan.cpp

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include "duckdb/common/insertion_order_preserving_map.hpp"
1111
#include "duckdb/main/client_context.hpp"
1212
#include <nanoarrow/nanoarrow.h>
13+
#include <queue>
1314

1415
namespace duckdb {
1516
namespace adbc {
@@ -578,5 +579,291 @@ void RegisterAdbcExecuteFunction(DatabaseInstance &db) {
578579
loader.RegisterFunction(adbc_execute_function);
579580
}
580581

582+
//===--------------------------------------------------------------------===//
583+
// adbc_insert - Bulk insert data into an ADBC table
584+
//===--------------------------------------------------------------------===//
585+
586+
struct AdbcInsertBindData : public TableFunctionData {
587+
int64_t connection_id;
588+
string target_table;
589+
string mode; // "create", "append", "replace", "create_append"
590+
shared_ptr<AdbcConnectionWrapper> connection;
591+
vector<LogicalType> input_types;
592+
vector<string> input_names;
593+
};
594+
595+
// Custom ArrowArrayStream that we can feed batches into
596+
// This allows us to use BindStream for proper streaming ingestion
597+
struct AdbcInsertStream {
598+
ArrowArrayStream stream;
599+
ArrowSchema schema;
600+
bool schema_set = false;
601+
queue<ArrowArray> pending_batches;
602+
mutex lock;
603+
bool finished = false;
604+
string last_error;
605+
606+
AdbcInsertStream() {
607+
memset(&stream, 0, sizeof(stream));
608+
memset(&schema, 0, sizeof(schema));
609+
stream.private_data = this;
610+
stream.get_schema = GetSchema;
611+
stream.get_next = GetNext;
612+
stream.get_last_error = GetLastError;
613+
stream.release = Release;
614+
}
615+
616+
~AdbcInsertStream() {
617+
if (schema.release) {
618+
schema.release(&schema);
619+
}
620+
// Release any pending batches
621+
while (!pending_batches.empty()) {
622+
auto &batch = pending_batches.front();
623+
if (batch.release) {
624+
batch.release(&batch);
625+
}
626+
pending_batches.pop();
627+
}
628+
}
629+
630+
void SetSchema(ArrowSchema *new_schema) {
631+
lock_guard<mutex> l(lock);
632+
if (schema.release) {
633+
schema.release(&schema);
634+
}
635+
schema = *new_schema;
636+
memset(new_schema, 0, sizeof(*new_schema)); // Transfer ownership
637+
schema_set = true;
638+
}
639+
640+
void AddBatch(ArrowArray *batch) {
641+
lock_guard<mutex> l(lock);
642+
pending_batches.push(*batch);
643+
memset(batch, 0, sizeof(*batch)); // Transfer ownership
644+
}
645+
646+
void Finish() {
647+
lock_guard<mutex> l(lock);
648+
finished = true;
649+
}
650+
651+
static int GetSchema(ArrowArrayStream *stream, ArrowSchema *out) {
652+
auto *self = static_cast<AdbcInsertStream *>(stream->private_data);
653+
lock_guard<mutex> l(self->lock);
654+
if (!self->schema_set) {
655+
self->last_error = "Schema not set";
656+
return EINVAL;
657+
}
658+
// Copy the schema (don't transfer ownership)
659+
return ArrowSchemaDeepCopy(&self->schema, out);
660+
}
661+
662+
static int GetNext(ArrowArrayStream *stream, ArrowArray *out) {
663+
auto *self = static_cast<AdbcInsertStream *>(stream->private_data);
664+
lock_guard<mutex> l(self->lock);
665+
666+
if (self->pending_batches.empty()) {
667+
if (self->finished) {
668+
// Signal end of stream
669+
memset(out, 0, sizeof(*out));
670+
return 0;
671+
}
672+
// No batches available yet - this shouldn't happen in our usage
673+
self->last_error = "No batches available";
674+
return EAGAIN;
675+
}
676+
677+
*out = self->pending_batches.front();
678+
self->pending_batches.pop();
679+
return 0;
680+
}
681+
682+
static const char *GetLastError(ArrowArrayStream *stream) {
683+
auto *self = static_cast<AdbcInsertStream *>(stream->private_data);
684+
return self->last_error.empty() ? nullptr : self->last_error.c_str();
685+
}
686+
687+
static void Release(ArrowArrayStream *stream) {
688+
// Don't delete - we manage lifetime externally
689+
stream->release = nullptr;
690+
}
691+
};
692+
693+
struct AdbcInsertGlobalState : public GlobalTableFunctionState {
694+
mutex lock;
695+
shared_ptr<AdbcStatementWrapper> statement;
696+
unique_ptr<AdbcInsertStream> insert_stream;
697+
int64_t rows_inserted = 0;
698+
bool stream_bound = false;
699+
bool executed = false;
700+
ClientProperties client_properties;
701+
702+
idx_t MaxThreads() const override {
703+
return 1;
704+
}
705+
};
706+
707+
static unique_ptr<FunctionData> AdbcInsertBind(ClientContext &context, TableFunctionBindInput &input,
708+
vector<LogicalType> &return_types, vector<string> &names) {
709+
auto bind_data = make_uniq<AdbcInsertBindData>();
710+
711+
// First argument is connection handle
712+
bind_data->connection_id = input.inputs[0].GetValue<int64_t>();
713+
// Second argument is target table name
714+
bind_data->target_table = input.inputs[1].GetValue<string>();
715+
716+
// Check for optional mode parameter (default is "append")
717+
auto mode_it = input.named_parameters.find("mode");
718+
if (mode_it != input.named_parameters.end() && !mode_it->second.IsNull()) {
719+
bind_data->mode = mode_it->second.GetValue<string>();
720+
// Validate mode
721+
if (bind_data->mode != "create" && bind_data->mode != "append" &&
722+
bind_data->mode != "replace" && bind_data->mode != "create_append") {
723+
throw InvalidInputException("adbc_insert: Invalid mode '" + bind_data->mode +
724+
"'. Must be one of: create, append, replace, create_append");
725+
}
726+
} else {
727+
bind_data->mode = "append"; // Default to append
728+
}
729+
730+
// Get connection from registry
731+
auto &registry = ConnectionRegistry::Get();
732+
bind_data->connection = registry.Get(bind_data->connection_id);
733+
if (!bind_data->connection) {
734+
throw InvalidInputException("adbc_insert: Invalid connection handle: " + to_string(bind_data->connection_id));
735+
}
736+
737+
if (!bind_data->connection->IsInitialized()) {
738+
throw InvalidInputException("adbc_insert: Connection has been closed");
739+
}
740+
741+
// Store input table types and names for Arrow conversion
742+
bind_data->input_types = input.input_table_types;
743+
bind_data->input_names = input.input_table_names;
744+
745+
// Return schema: rows_inserted (BIGINT)
746+
return_types = {LogicalType::BIGINT};
747+
names = {"rows_inserted"};
748+
749+
return std::move(bind_data);
750+
}
751+
752+
static unique_ptr<GlobalTableFunctionState> AdbcInsertInitGlobal(ClientContext &context, TableFunctionInitInput &input) {
753+
auto &bind_data = input.bind_data->Cast<AdbcInsertBindData>();
754+
auto global_state = make_uniq<AdbcInsertGlobalState>();
755+
756+
// Store client properties for Arrow conversion
757+
global_state->client_properties = context.GetClientProperties();
758+
759+
// Create the statement and set up for bulk ingestion
760+
global_state->statement = make_shared_ptr<AdbcStatementWrapper>(bind_data.connection);
761+
global_state->statement->Init();
762+
global_state->statement->SetOption("adbc.ingest.target_table", bind_data.target_table);
763+
764+
// Set mode
765+
string mode_value;
766+
if (bind_data.mode == "create") {
767+
mode_value = "adbc.ingest.mode.create";
768+
} else if (bind_data.mode == "append") {
769+
mode_value = "adbc.ingest.mode.append";
770+
} else if (bind_data.mode == "replace") {
771+
mode_value = "adbc.ingest.mode.replace";
772+
} else if (bind_data.mode == "create_append") {
773+
mode_value = "adbc.ingest.mode.create_append";
774+
}
775+
global_state->statement->SetOption("adbc.ingest.mode", mode_value);
776+
777+
// Create the insert stream
778+
global_state->insert_stream = make_uniq<AdbcInsertStream>();
779+
780+
// Set up the schema from the input types
781+
ArrowSchema schema;
782+
ArrowConverter::ToArrowSchema(&schema, bind_data.input_types, bind_data.input_names,
783+
global_state->client_properties);
784+
global_state->insert_stream->SetSchema(&schema);
785+
786+
// Bind the stream to the statement
787+
try {
788+
global_state->statement->BindStream(&global_state->insert_stream->stream);
789+
global_state->stream_bound = true;
790+
} catch (Exception &e) {
791+
throw IOException("adbc_insert: Failed to bind stream: " + string(e.what()));
792+
}
793+
794+
return std::move(global_state);
795+
}
796+
797+
static OperatorResultType AdbcInsertInOut(ExecutionContext &context, TableFunctionInput &data_p,
798+
DataChunk &input, DataChunk &output) {
799+
auto &bind_data = data_p.bind_data->Cast<AdbcInsertBindData>();
800+
auto &global_state = data_p.global_state->Cast<AdbcInsertGlobalState>();
801+
lock_guard<mutex> l(global_state.lock);
802+
803+
if (input.size() == 0) {
804+
output.SetCardinality(0);
805+
return OperatorResultType::NEED_MORE_INPUT;
806+
}
807+
808+
// Convert DuckDB DataChunk to Arrow
809+
ArrowAppender appender(bind_data.input_types, input.size(),
810+
global_state.client_properties,
811+
ArrowTypeExtensionData::GetExtensionTypes(context.client, bind_data.input_types));
812+
appender.Append(input, 0, input.size(), input.size());
813+
814+
ArrowArray arr = appender.Finalize();
815+
816+
// Add the batch to our stream
817+
global_state.insert_stream->AddBatch(&arr);
818+
global_state.rows_inserted += input.size();
819+
820+
// Don't output anything during processing - we output the total at the end
821+
output.SetCardinality(0);
822+
return OperatorResultType::NEED_MORE_INPUT;
823+
}
824+
825+
static OperatorFinalizeResultType AdbcInsertFinalize(ExecutionContext &context, TableFunctionInput &data_p,
826+
DataChunk &output) {
827+
auto &global_state = data_p.global_state->Cast<AdbcInsertGlobalState>();
828+
lock_guard<mutex> l(global_state.lock);
829+
830+
// Mark the stream as finished
831+
global_state.insert_stream->Finish();
832+
833+
// Execute the statement to perform the actual insert
834+
if (!global_state.executed && global_state.stream_bound) {
835+
int64_t rows_affected = -1;
836+
try {
837+
global_state.statement->ExecuteUpdate(&rows_affected);
838+
global_state.executed = true;
839+
} catch (Exception &e) {
840+
throw IOException("adbc_insert: Failed to execute insert: " + string(e.what()));
841+
}
842+
}
843+
844+
// Output the total rows inserted
845+
output.SetCardinality(1);
846+
output.SetValue(0, 0, Value::BIGINT(global_state.rows_inserted));
847+
848+
return OperatorFinalizeResultType::FINISHED;
849+
}
850+
851+
// Register adbc_insert table in-out function
852+
void RegisterAdbcInsertFunction(DatabaseInstance &db) {
853+
ExtensionLoader loader(db, "adbc");
854+
855+
// adbc_insert(connection_id, table_name, <table>) - Bulk insert data
856+
TableFunction adbc_insert_function("adbc_insert",
857+
{LogicalType::BIGINT, LogicalType::VARCHAR, LogicalType::TABLE},
858+
nullptr, // No regular function - use in_out
859+
AdbcInsertBind,
860+
AdbcInsertInitGlobal);
861+
adbc_insert_function.in_out_function = AdbcInsertInOut;
862+
adbc_insert_function.in_out_function_final = AdbcInsertFinalize;
863+
adbc_insert_function.named_parameters["mode"] = LogicalType::VARCHAR;
864+
865+
loader.RegisterFunction(adbc_insert_function);
866+
}
867+
581868
} // namespace adbc
582869
} // namespace duckdb

0 commit comments

Comments
 (0)