Skip to content

Commit fb5dadb

Browse files
committed
fix: fix crash, add transaction support
1 parent d151f8d commit fb5dadb

File tree

5 files changed

+174
-2
lines changed

5 files changed

+174
-2
lines changed

CLAUDE.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ The extension provides the following functions:
1818
- `adbc_connect(options)` - Connect to an ADBC data source. Returns a connection handle (BIGINT). Options can be passed as a STRUCT (preferred) or MAP. The `driver` option is required.
1919
- `adbc_disconnect(handle)` - Disconnect from an ADBC data source. Returns true on success.
2020

21+
### Transaction Control
22+
- `adbc_set_autocommit(handle, enabled)` - Enable or disable autocommit mode. When disabled, changes require explicit commit.
23+
- `adbc_commit(handle)` - Commit the current transaction.
24+
- `adbc_rollback(handle)` - Rollback the current transaction, discarding all uncommitted changes.
25+
2126
### Query Execution
2227
- `adbc_scan(handle, query, [params := row(...)])` - Execute a SELECT query and return results as a table. Supports parameterized queries via the optional `params` named parameter.
2328
- `adbc_execute(handle, query)` - Execute DDL/DML statements (CREATE, INSERT, UPDATE, DELETE). Returns affected row count.
@@ -56,6 +61,13 @@ SELECT * FROM adbc_table_types(getvariable('conn')::BIGINT);
5661
SELECT * FROM adbc_columns(getvariable('conn')::BIGINT, table_name := 'test');
5762
SELECT * FROM adbc_schema(getvariable('conn')::BIGINT, 'test');
5863

64+
-- Transaction control
65+
SELECT adbc_set_autocommit(getvariable('conn')::BIGINT, false); -- Start transaction
66+
SELECT adbc_execute(getvariable('conn')::BIGINT, 'INSERT INTO test VALUES (2, ''world'')');
67+
SELECT adbc_commit(getvariable('conn')::BIGINT); -- Commit changes
68+
-- Or: SELECT adbc_rollback(getvariable('conn')::BIGINT); -- Discard changes
69+
SELECT adbc_set_autocommit(getvariable('conn')::BIGINT, true); -- Back to autocommit
70+
5971
-- Disconnect
6072
SELECT adbc_disconnect(getvariable('conn')::BIGINT);
6173
```

src/adbc_functions.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,56 @@ static void AdbcDisconnectFunction(DataChunk &args, ExpressionState &state, Vect
149149
});
150150
}
151151

152+
// adbc_commit(connection_id BIGINT) -> BOOLEAN
153+
// Commits the current transaction
154+
static void AdbcCommitFunction(DataChunk &args, ExpressionState &state, Vector &result) {
155+
auto &connection_vector = args.data[0];
156+
157+
UnaryExecutor::Execute<int64_t, bool>(connection_vector, result, args.size(), [&](int64_t connection_id) {
158+
auto &registry = ConnectionRegistry::Get();
159+
auto connection = registry.Get(connection_id);
160+
if (!connection) {
161+
throw InvalidInputException("adbc_commit: Invalid connection handle: " + to_string(connection_id));
162+
}
163+
connection->Commit();
164+
return true;
165+
});
166+
}
167+
168+
// adbc_rollback(connection_id BIGINT) -> BOOLEAN
169+
// Rolls back the current transaction
170+
static void AdbcRollbackFunction(DataChunk &args, ExpressionState &state, Vector &result) {
171+
auto &connection_vector = args.data[0];
172+
173+
UnaryExecutor::Execute<int64_t, bool>(connection_vector, result, args.size(), [&](int64_t connection_id) {
174+
auto &registry = ConnectionRegistry::Get();
175+
auto connection = registry.Get(connection_id);
176+
if (!connection) {
177+
throw InvalidInputException("adbc_rollback: Invalid connection handle: " + to_string(connection_id));
178+
}
179+
connection->Rollback();
180+
return true;
181+
});
182+
}
183+
184+
// adbc_set_autocommit(connection_id BIGINT, enabled BOOLEAN) -> BOOLEAN
185+
// Sets the autocommit mode for the connection
186+
static void AdbcSetAutocommitFunction(DataChunk &args, ExpressionState &state, Vector &result) {
187+
auto &connection_vector = args.data[0];
188+
auto &enabled_vector = args.data[1];
189+
190+
BinaryExecutor::Execute<int64_t, bool, bool>(connection_vector, enabled_vector, result, args.size(),
191+
[&](int64_t connection_id, bool enabled) {
192+
auto &registry = ConnectionRegistry::Get();
193+
auto connection = registry.Get(connection_id);
194+
if (!connection) {
195+
throw InvalidInputException("adbc_set_autocommit: Invalid connection handle: " + to_string(connection_id));
196+
}
197+
connection->SetAutocommit(enabled);
198+
return true;
199+
});
200+
}
201+
152202
// Register the ADBC scalar functions using ExtensionLoader
153203
void RegisterAdbcScalarFunctions(DatabaseInstance &db) {
154204
ExtensionLoader loader(db, "adbc");
@@ -171,6 +221,33 @@ void RegisterAdbcScalarFunctions(DatabaseInstance &db) {
171221
AdbcDisconnectFunction
172222
);
173223
loader.RegisterFunction(adbc_disconnect_function);
224+
225+
// adbc_commit: Commit the current transaction
226+
auto adbc_commit_function = ScalarFunction(
227+
"adbc_commit",
228+
{LogicalType::BIGINT},
229+
LogicalType::BOOLEAN,
230+
AdbcCommitFunction
231+
);
232+
loader.RegisterFunction(adbc_commit_function);
233+
234+
// adbc_rollback: Rollback the current transaction
235+
auto adbc_rollback_function = ScalarFunction(
236+
"adbc_rollback",
237+
{LogicalType::BIGINT},
238+
LogicalType::BOOLEAN,
239+
AdbcRollbackFunction
240+
);
241+
loader.RegisterFunction(adbc_rollback_function);
242+
243+
// adbc_set_autocommit: Set autocommit mode
244+
auto adbc_set_autocommit_function = ScalarFunction(
245+
"adbc_set_autocommit",
246+
{LogicalType::BIGINT, LogicalType::BOOLEAN},
247+
LogicalType::BOOLEAN,
248+
AdbcSetAutocommitFunction
249+
);
250+
loader.RegisterFunction(adbc_set_autocommit_function);
174251
}
175252

176253
} // namespace adbc

src/adbc_scan.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,8 +369,7 @@ static void AdbcScanFunction(ClientContext &context, TableFunctionInput &data, D
369369
ArrowTableFunction::ArrowToDuckDB(local_state,
370370
bind_data.arrow_table.GetColumns(),
371371
output,
372-
0, // start
373-
false); // arrow_scan_is_projected = false
372+
false); // arrow_scan_is_projected = false (no projection pushdown)
374373
}
375374

376375
local_state.chunk_offset += output.size();

src/include/adbc_connection.hpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,26 @@ class AdbcConnectionWrapper {
179179
CheckAdbc(status, error.Get(), "Failed to get table schema", GetDriverName());
180180
}
181181

182+
// Transaction support
183+
void Commit() {
184+
AdbcErrorGuard error;
185+
auto status = AdbcConnectionCommit(&connection, error.Get());
186+
CheckAdbc(status, error.Get(), "Failed to commit transaction", GetDriverName());
187+
}
188+
189+
void Rollback() {
190+
AdbcErrorGuard error;
191+
auto status = AdbcConnectionRollback(&connection, error.Get());
192+
CheckAdbc(status, error.Get(), "Failed to rollback transaction", GetDriverName());
193+
}
194+
195+
void SetAutocommit(bool enabled) {
196+
AdbcErrorGuard error;
197+
const char *value = enabled ? ADBC_OPTION_VALUE_ENABLED : ADBC_OPTION_VALUE_DISABLED;
198+
auto status = AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT, value, error.Get());
199+
CheckAdbc(status, error.Get(), "Failed to set autocommit", GetDriverName());
200+
}
201+
182202
// Non-copyable
183203
AdbcConnectionWrapper(const AdbcConnectionWrapper &) = delete;
184204
AdbcConnectionWrapper &operator=(const AdbcConnectionWrapper &) = delete;

test/sql/adbc.test

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,70 @@ Invalid Input Error: adbc_execute: Invalid connection handle: 12345
360360
statement ok
361361
SELECT adbc_disconnect(getvariable('exec_conn')::BIGINT);
362362

363+
# ============================================
364+
# Transaction Tests
365+
# ============================================
366+
367+
# Test adbc_commit with invalid connection handle
368+
statement error
369+
SELECT adbc_commit(12345);
370+
----
371+
Invalid Input Error: adbc_commit: Invalid connection handle: 12345
372+
373+
# Test adbc_rollback with invalid connection handle
374+
statement error
375+
SELECT adbc_rollback(12345);
376+
----
377+
Invalid Input Error: adbc_rollback: Invalid connection handle: 12345
378+
379+
# Test adbc_set_autocommit with invalid connection handle
380+
statement error
381+
SELECT adbc_set_autocommit(12345, false);
382+
----
383+
Invalid Input Error: adbc_set_autocommit: Invalid connection handle: 12345
384+
385+
# Create a connection for transaction tests
386+
statement ok
387+
SET VARIABLE tx_conn = (SELECT adbc_connect({'driver': getvariable('sqlite_driver'), 'uri': ':memory:'}));
388+
389+
# Create a test table
390+
statement ok
391+
SELECT adbc_execute(getvariable('tx_conn')::BIGINT, 'CREATE TABLE tx_test (id INTEGER, value TEXT)');
392+
393+
# Test rollback - disable autocommit, insert, rollback
394+
query I
395+
SELECT adbc_set_autocommit(getvariable('tx_conn')::BIGINT, false);
396+
----
397+
true
398+
399+
statement ok
400+
SELECT adbc_execute(getvariable('tx_conn')::BIGINT, 'INSERT INTO tx_test VALUES (1, ''will rollback'')');
401+
402+
# Rollback the transaction
403+
query I
404+
SELECT adbc_rollback(getvariable('tx_conn')::BIGINT);
405+
----
406+
true
407+
408+
# Test commit - insert data and commit
409+
statement ok
410+
SELECT adbc_execute(getvariable('tx_conn')::BIGINT, 'INSERT INTO tx_test VALUES (2, ''committed'')');
411+
412+
query I
413+
SELECT adbc_commit(getvariable('tx_conn')::BIGINT);
414+
----
415+
true
416+
417+
# Re-enable autocommit
418+
query I
419+
SELECT adbc_set_autocommit(getvariable('tx_conn')::BIGINT, true);
420+
----
421+
true
422+
423+
# Clean up transaction connection
424+
statement ok
425+
SELECT adbc_disconnect(getvariable('tx_conn')::BIGINT);
426+
363427
# Test disconnect
364428
query I
365429
SELECT adbc_disconnect(getvariable('conn_id')::BIGINT);

0 commit comments

Comments
 (0)