11/* *
22 *
33 * @file DuckdbConnection.cc
4- * @author An Tao
4+ * @author Dq Wei
55 *
66 * Copyright 2018, An Tao. All rights reserved.
77 * https://github.com/an-tao/drogon
@@ -35,21 +35,30 @@ void DuckdbConnection::onError(
3535 const char *errorMessage)
3636{
3737 // DuckDB 错误处理
38+ // 创建 SqlError 异常并通过回调传递
3839 auto exceptPtr = std::make_exception_ptr (
3940 SqlError (errorMessage ? errorMessage : " Unknown DuckDB error" ,
4041 std::string{sql}));
4142 exceptCallback (exceptPtr);
4243}
4344
45+ /* *
46+ * @brief Construct a new Duckdb Connection:: Duckdb Connection object
47+ *
48+ * @param loop
49+ * @param connInfo 连接字符串
50+ * @param sharedMutex
51+ * @param configOptions 配置参数
52+ */
4453DuckdbConnection::DuckdbConnection (
4554 trantor::EventLoop *loop,
4655 const std::string &connInfo,
4756 const std::shared_ptr<SharedMutex> &sharedMutex,
48- const std::unordered_map<std::string, std::string> &configOptions) // 新增配置参数支持 [dq 2025-11-19 ]
57+ const std::unordered_map<std::string, std::string> &configOptions) // 新增配置参数支持 [dq 2025-11-21 ]
4958 : DbConnection(loop),
5059 sharedMutexPtr_(sharedMutex),
5160 connInfo_(connInfo),
52- configOptions_(configOptions) // 存储配置选项 [dq 2025-11-19 ]
61+ configOptions_(configOptions) // 存储配置选项 [dq 2025-11-21 ]
5362{
5463}
5564
@@ -76,6 +85,7 @@ void DuckdbConnection::init()
7685 }
7786 }
7887
88+ // 在事件循环线程中初始化 DuckDB 数据库连接
7989 loop_->runInLoop ([this , filename = std::move (filename)]() {
8090 duckdb_database db;
8191 duckdb_config config;
@@ -166,6 +176,17 @@ void DuckdbConnection::init()
166176 });
167177}
168178
179+ /* *
180+ * @brief 执行 SQL 语句(异步)
181+ *
182+ * @param sql SQL 语句
183+ * @param paraNum 参数数量
184+ * @param parameters 参数值
185+ * @param length
186+ * @param format
187+ * @param rcb
188+ * @param exceptCallback
189+ */
169190void DuckdbConnection::execSql (
170191 std::string_view &&sql,
171192 size_t paraNum,
@@ -190,6 +211,17 @@ void DuckdbConnection::execSql(
190211 });
191212}
192213
214+ /* *
215+ * @brief
216+ *
217+ * @param sql
218+ * @param paraNum
219+ * @param parameters
220+ * @param length
221+ * @param format
222+ * @param rcb
223+ * @param exceptCallback
224+ */
193225void DuckdbConnection::execSqlInQueue (
194226 const std::string_view &sql,
195227 size_t paraNum,
@@ -325,7 +357,7 @@ std::shared_ptr<DuckdbResultImpl> DuckdbConnection::stmtExecute(
325357 return nullptr ;
326358 }
327359
328- // 创建智能指针,自定义删除器调用 duckdb_destroy_result
360+ // 创建智能指针,删除器调用 duckdb_destroy_result must call!
329361 auto resultShared = std::shared_ptr<duckdb_result>(
330362 rawResult,
331363 [](duckdb_result *ptr) {
@@ -347,6 +379,142 @@ std::shared_ptr<DuckdbResultImpl> DuckdbConnection::stmtExecute(
347379 return std::make_shared<DuckdbResultImpl>(resultShared, rowCount, insertId);
348380}
349381
382+
383+
384+ /* *
385+ * @brief 批量执行 SQL 语句(入口方法)
386+ *
387+ * @param sqlCommands SQL 命令队列
388+ */
389+ void DuckdbConnection::batchSql (std::deque<std::shared_ptr<SqlCmd>> &&sqlCommands)
390+ {
391+ auto thisPtr = shared_from_this ();
392+ loopThread_.getLoop ()->queueInLoop (
393+ [thisPtr, sqlCommands = std::move (sqlCommands)]() mutable {
394+ thisPtr->batchSqlCommands_ = std::move (sqlCommands);
395+ thisPtr->executeBatchSql ();
396+ });
397+ }
398+
399+ /* *
400+ * @brief 执行批量 SQL 的主逻辑
401+ */
402+ void DuckdbConnection::executeBatchSql ()
403+ {
404+ loop_->assertInLoopThread ();
405+
406+ if (batchSqlCommands_.empty ())
407+ {
408+ idleCb_ ();
409+ return ;
410+ }
411+
412+ // 标记为工作中
413+ isWorking_ = true ;
414+
415+ // 逐个处理每个命令
416+ while (!batchSqlCommands_.empty ())
417+ {
418+ auto cmd = batchSqlCommands_.front ();
419+ executeSingleBatchCommand (cmd);
420+ batchSqlCommands_.pop_front ();
421+ }
422+
423+ // 完成所有批处理
424+ isWorking_ = false ;
425+ idleCb_ ();
426+ }
427+
428+ /* *
429+ * @brief 执行单个批量 SQL 命令
430+ *
431+ * @param cmd SQL 命令对象
432+ */
433+ void DuckdbConnection::executeSingleBatchCommand (
434+ const std::shared_ptr<SqlCmd> &cmd)
435+ {
436+ LOG_TRACE << " DuckDB batch sql:" << cmd->sql_ ;
437+
438+ // case1:无参数的 SQL,使用 duckdb_extract_statements
439+ if (cmd->parametersNumber_ == 0 )
440+ {
441+ duckdb_extracted_statements extracted;
442+ idx_t count = duckdb_extract_statements (
443+ *connectionPtr_,
444+ cmd->sql_ .data (),
445+ &extracted);
446+
447+ if (count == 0 )
448+ {
449+ // 提取失败
450+ const char *error = duckdb_extract_statements_error (extracted);
451+ onError (cmd->sql_ , cmd->exceptionCallback_ , error);
452+ duckdb_destroy_extracted (&extracted);
453+ return ;
454+ }
455+
456+ // 执行提取的语句
457+ for (idx_t i = 0 ; i < count; ++i)
458+ {
459+ duckdb_prepared_statement stmt;
460+ if (duckdb_prepare_extracted_statement (
461+ *connectionPtr_, extracted, i, &stmt) == DuckDBError)
462+ {
463+ const char *error = duckdb_prepare_error (stmt);
464+ onError (cmd->sql_ , cmd->exceptionCallback_ , error);
465+ duckdb_destroy_prepare (&stmt);
466+ continue ;
467+ }
468+
469+ // 执行语句
470+ auto rawResult = new duckdb_result ();
471+ auto state = duckdb_execute_prepared (stmt, rawResult);
472+
473+ if (state == DuckDBError)
474+ {
475+ const char *error = duckdb_result_error (rawResult);
476+ onError (cmd->sql_ , cmd->exceptionCallback_ ,
477+ error ? error : " Unknown execution error" );
478+ duckdb_destroy_result (rawResult);
479+ delete rawResult;
480+ duckdb_destroy_prepare (&stmt);
481+ continue ;
482+ }
483+
484+ // 成功,创建结果并调用回调
485+ auto resultShared = std::shared_ptr<duckdb_result>(
486+ rawResult,
487+ [](duckdb_result *ptr) {
488+ if (ptr)
489+ {
490+ duckdb_destroy_result (ptr);
491+ delete ptr;
492+ }
493+ });
494+
495+ idx_t rowCount = duckdb_row_count (rawResult);
496+ auto result = std::make_shared<DuckdbResultImpl>(
497+ resultShared, rowCount, 0 );
498+ cmd->callback_ (Result (result));
499+
500+ duckdb_destroy_prepare (&stmt);
501+ }
502+
503+ duckdb_destroy_extracted (&extracted);
504+ }
505+ else
506+ {
507+ // case 2:有参数的 SQL,使用现有的预编译逻辑
508+ execSqlInQueue (cmd->sql_ ,
509+ cmd->parametersNumber_ ,
510+ cmd->parameters_ ,
511+ cmd->lengths_ ,
512+ cmd->formats_ ,
513+ cmd->callback_ ,
514+ cmd->exceptionCallback_ );
515+ }
516+ }
517+
350518void DuckdbConnection::disconnect ()
351519{
352520 std::promise<int > pro;
0 commit comments