@@ -66,16 +66,7 @@ static unique_ptr<FunctionData> AdbcInfoBind(ClientContext &context, TableFuncti
6666 }
6767
6868 bind_data->connection_id = input.inputs [0 ].GetValue <int64_t >();
69-
70- auto ®istry = ConnectionRegistry::Get ();
71- bind_data->connection = registry.Get (bind_data->connection_id );
72- if (!bind_data->connection ) {
73- throw InvalidInputException (" adbc_info: Invalid connection handle: " + to_string (bind_data->connection_id ));
74- }
75-
76- if (!bind_data->connection ->IsInitialized ()) {
77- throw InvalidInputException (" adbc_info: Connection has been closed" );
78- }
69+ bind_data->connection = GetValidatedConnection (bind_data->connection_id , " adbc_info" );
7970
8071 // Return simple key-value schema
8172 names = {" info_name" , " info_value" };
@@ -132,10 +123,6 @@ static unique_ptr<GlobalTableFunctionState> AdbcInfoInitGlobal(ClientContext &co
132123 auto &bind_data = input.bind_data ->Cast <AdbcInfoBindData>();
133124 auto global_state = make_uniq<AdbcInfoGlobalState>();
134125
135- if (!bind_data.connection ->IsInitialized ()) {
136- throw InvalidInputException (" adbc_info: Connection has been closed" );
137- }
138-
139126 memset (&global_state->stream , 0 , sizeof (global_state->stream ));
140127 try {
141128 bind_data.connection ->GetInfo (nullptr , 0 , &global_state->stream );
@@ -145,44 +132,23 @@ static unique_ptr<GlobalTableFunctionState> AdbcInfoInitGlobal(ClientContext &co
145132 global_state->stream_initialized = true ;
146133
147134 // Pre-extract all info into simple key-value pairs
148- // This avoids the union type issue by converting to strings
149- ArrowArray batch;
150- while (true ) {
151- memset (&batch, 0 , sizeof (batch));
152- int ret = global_state->stream .get_next (&global_state->stream , &batch);
153- if (ret != 0 ) {
154- const char *error_msg = global_state->stream .get_last_error (&global_state->stream );
155- string msg = " adbc_info: Failed to get next batch" ;
156- if (error_msg) {
157- msg += " : " ;
158- msg += error_msg;
159- }
160- throw IOException (msg);
161- }
162-
163- if (!batch.release ) {
164- break ; // End of stream
165- }
166-
135+ ForEachArrowBatch (global_state->stream , " adbc_info" , [&](ArrowArray *batch) {
167136 // batch has 2 children: info_name (uint32) and info_value (union)
168- if (batch. n_children >= 2 ) {
169- ArrowArray *info_name_array = batch. children [0 ];
170- ArrowArray *info_value_array = batch. children [1 ];
137+ if (batch-> n_children >= 2 ) {
138+ ArrowArray *info_name_array = batch-> children [0 ];
139+ ArrowArray *info_value_array = batch-> children [1 ];
171140
172141 auto info_codes = static_cast <const uint32_t *>(info_name_array->buffers [1 ]);
173142
174- for (int64_t i = 0 ; i < batch. length ; i++) {
143+ for (int64_t i = 0 ; i < batch-> length ; i++) {
175144 uint32_t info_code = info_codes[i];
176145 string name = GetInfoName (info_code);
177146 string value = ExtractUnionValue (info_value_array, i);
178147 global_state->info_rows .emplace_back (name, value);
179148 }
180149 }
181-
182- if (batch.release ) {
183- batch.release (&batch);
184- }
185- }
150+ return true ; // continue
151+ });
186152
187153 return std::move (global_state);
188154}
@@ -375,15 +341,7 @@ static unique_ptr<FunctionData> AdbcTablesBind(ClientContext &context, TableFunc
375341 bind_data->has_table_filter = true ;
376342 }
377343
378- auto ®istry = ConnectionRegistry::Get ();
379- bind_data->connection = registry.Get (bind_data->connection_id );
380- if (!bind_data->connection ) {
381- throw InvalidInputException (" adbc_tables: Invalid connection handle: " + to_string (bind_data->connection_id ));
382- }
383-
384- if (!bind_data->connection ->IsInitialized ()) {
385- throw InvalidInputException (" adbc_tables: Connection has been closed" );
386- }
344+ bind_data->connection = GetValidatedConnection (bind_data->connection_id , " adbc_tables" );
387345
388346 // Return a simple schema for tables: catalog, schema, table_name, table_type
389347 names = {" catalog_name" , " schema_name" , " table_name" , " table_type" };
@@ -396,10 +354,6 @@ static unique_ptr<GlobalTableFunctionState> AdbcTablesInitGlobal(ClientContext &
396354 auto &bind_data = input.bind_data ->Cast <AdbcTablesBindData>();
397355 auto global_state = make_uniq<AdbcTablesGlobalState>();
398356
399- if (!bind_data.connection ->IsInitialized ()) {
400- throw InvalidInputException (" adbc_tables: Connection has been closed" );
401- }
402-
403357 memset (&global_state->stream , 0 , sizeof (global_state->stream ));
404358
405359 const char *catalog = bind_data.has_catalog_filter ? bind_data.catalog_filter .c_str () : nullptr ;
@@ -415,30 +369,10 @@ static unique_ptr<GlobalTableFunctionState> AdbcTablesInitGlobal(ClientContext &
415369 global_state->stream_initialized = true ;
416370
417371 // Pre-extract all tables by flattening the hierarchical structure
418- ArrowArray batch;
419- while (true ) {
420- memset (&batch, 0 , sizeof (batch));
421- int ret = global_state->stream .get_next (&global_state->stream , &batch);
422- if (ret != 0 ) {
423- const char *error_msg = global_state->stream .get_last_error (&global_state->stream );
424- string msg = " adbc_tables: Failed to get next batch" ;
425- if (error_msg) {
426- msg += " : " ;
427- msg += error_msg;
428- }
429- throw IOException (msg);
430- }
431-
432- if (!batch.release ) {
433- break ; // End of stream
434- }
435-
436- ExtractTables (&batch, global_state->table_rows );
437-
438- if (batch.release ) {
439- batch.release (&batch);
440- }
441- }
372+ ForEachArrowBatch (global_state->stream , " adbc_tables" , [&](ArrowArray *batch) {
373+ ExtractTables (batch, global_state->table_rows );
374+ return true ;
375+ });
442376
443377 return std::move (global_state);
444378}
@@ -513,16 +447,7 @@ static unique_ptr<FunctionData> AdbcTableTypesBind(ClientContext &context, Table
513447 }
514448
515449 bind_data->connection_id = input.inputs [0 ].GetValue <int64_t >();
516-
517- auto ®istry = ConnectionRegistry::Get ();
518- bind_data->connection = registry.Get (bind_data->connection_id );
519- if (!bind_data->connection ) {
520- throw InvalidInputException (" adbc_table_types: Invalid connection handle: " + to_string (bind_data->connection_id ));
521- }
522-
523- if (!bind_data->connection ->IsInitialized ()) {
524- throw InvalidInputException (" adbc_table_types: Connection has been closed" );
525- }
450+ bind_data->connection = GetValidatedConnection (bind_data->connection_id , " adbc_table_types" );
526451
527452 // Return single column schema
528453 names = {" table_type" };
@@ -535,10 +460,6 @@ static unique_ptr<GlobalTableFunctionState> AdbcTableTypesInitGlobal(ClientConte
535460 auto &bind_data = input.bind_data ->Cast <AdbcTableTypesBindData>();
536461 auto global_state = make_uniq<AdbcTableTypesGlobalState>();
537462
538- if (!bind_data.connection ->IsInitialized ()) {
539- throw InvalidInputException (" adbc_table_types: Connection has been closed" );
540- }
541-
542463 memset (&global_state->stream , 0 , sizeof (global_state->stream ));
543464 try {
544465 bind_data.connection ->GetTableTypes (&global_state->stream );
@@ -548,37 +469,17 @@ static unique_ptr<GlobalTableFunctionState> AdbcTableTypesInitGlobal(ClientConte
548469 global_state->stream_initialized = true ;
549470
550471 // Extract all table types from the Arrow stream
551- ArrowArray batch;
552- while (true ) {
553- memset (&batch, 0 , sizeof (batch));
554- int ret = global_state->stream .get_next (&global_state->stream , &batch);
555- if (ret != 0 ) {
556- const char *error_msg = global_state->stream .get_last_error (&global_state->stream );
557- string msg = " adbc_table_types: Failed to get next batch" ;
558- if (error_msg) {
559- msg += " : " ;
560- msg += error_msg;
561- }
562- throw IOException (msg);
563- }
564-
565- if (!batch.release ) {
566- break ; // End of stream
567- }
568-
472+ ForEachArrowBatch (global_state->stream , " adbc_table_types" , [&](ArrowArray *batch) {
569473 // The result has a single column: table_type (utf8)
570- if (batch. n_children >= 1 ) {
571- ArrowArray *table_type_array = batch. children [0 ];
572- for (int64_t i = 0 ; i < batch. length ; i++) {
474+ if (batch-> n_children >= 1 ) {
475+ ArrowArray *table_type_array = batch-> children [0 ];
476+ for (int64_t i = 0 ; i < batch-> length ; i++) {
573477 string table_type = ExtractString (table_type_array, i);
574478 global_state->table_types .push_back (table_type);
575479 }
576480 }
577-
578- if (batch.release ) {
579- batch.release (&batch);
580- }
581- }
481+ return true ;
482+ });
582483
583484 return std::move (global_state);
584485}
@@ -856,15 +757,7 @@ static unique_ptr<FunctionData> AdbcColumnsBind(ClientContext &context, TableFun
856757 bind_data->has_column_filter = true ;
857758 }
858759
859- auto ®istry = ConnectionRegistry::Get ();
860- bind_data->connection = registry.Get (bind_data->connection_id );
861- if (!bind_data->connection ) {
862- throw InvalidInputException (" adbc_columns: Invalid connection handle: " + to_string (bind_data->connection_id ));
863- }
864-
865- if (!bind_data->connection ->IsInitialized ()) {
866- throw InvalidInputException (" adbc_columns: Connection has been closed" );
867- }
760+ bind_data->connection = GetValidatedConnection (bind_data->connection_id , " adbc_columns" );
868761
869762 // Return schema for columns
870763 names = {" catalog_name" , " schema_name" , " table_name" , " column_name" , " ordinal_position" , " remarks" , " type_name" , " is_nullable" };
@@ -878,10 +771,6 @@ static unique_ptr<GlobalTableFunctionState> AdbcColumnsInitGlobal(ClientContext
878771 auto &bind_data = input.bind_data ->Cast <AdbcColumnsBindData>();
879772 auto global_state = make_uniq<AdbcColumnsGlobalState>();
880773
881- if (!bind_data.connection ->IsInitialized ()) {
882- throw InvalidInputException (" adbc_columns: Connection has been closed" );
883- }
884-
885774 memset (&global_state->stream , 0 , sizeof (global_state->stream ));
886775
887776 const char *catalog = bind_data.has_catalog_filter ? bind_data.catalog_filter .c_str () : nullptr ;
@@ -898,30 +787,10 @@ static unique_ptr<GlobalTableFunctionState> AdbcColumnsInitGlobal(ClientContext
898787 global_state->stream_initialized = true ;
899788
900789 // Pre-extract all columns by flattening the hierarchical structure
901- ArrowArray batch;
902- while (true ) {
903- memset (&batch, 0 , sizeof (batch));
904- int ret = global_state->stream .get_next (&global_state->stream , &batch);
905- if (ret != 0 ) {
906- const char *error_msg = global_state->stream .get_last_error (&global_state->stream );
907- string msg = " adbc_columns: Failed to get next batch" ;
908- if (error_msg) {
909- msg += " : " ;
910- msg += error_msg;
911- }
912- throw IOException (msg);
913- }
914-
915- if (!batch.release ) {
916- break ; // End of stream
917- }
918-
919- ExtractColumns (&batch, global_state->column_rows );
920-
921- if (batch.release ) {
922- batch.release (&batch);
923- }
924- }
790+ ForEachArrowBatch (global_state->stream , " adbc_columns" , [&](ArrowArray *batch) {
791+ ExtractColumns (batch, global_state->column_rows );
792+ return true ;
793+ });
925794
926795 return std::move (global_state);
927796}
@@ -1178,15 +1047,7 @@ static unique_ptr<FunctionData> AdbcSchemaBind(ClientContext &context, TableFunc
11781047 bind_data->has_schema_filter = true ;
11791048 }
11801049
1181- auto ®istry = ConnectionRegistry::Get ();
1182- bind_data->connection = registry.Get (bind_data->connection_id );
1183- if (!bind_data->connection ) {
1184- throw InvalidInputException (" adbc_schema: Invalid connection handle: " + to_string (bind_data->connection_id ));
1185- }
1186-
1187- if (!bind_data->connection ->IsInitialized ()) {
1188- throw InvalidInputException (" adbc_schema: Connection has been closed" );
1189- }
1050+ bind_data->connection = GetValidatedConnection (bind_data->connection_id , " adbc_schema" );
11901051
11911052 // Return schema for fields
11921053 names = {" field_name" , " field_type" , " nullable" };
@@ -1199,10 +1060,6 @@ static unique_ptr<GlobalTableFunctionState> AdbcSchemaInitGlobal(ClientContext &
11991060 auto &bind_data = input.bind_data ->Cast <AdbcSchemaBindData>();
12001061 auto global_state = make_uniq<AdbcSchemaGlobalState>();
12011062
1202- if (!bind_data.connection ->IsInitialized ()) {
1203- throw InvalidInputException (" adbc_schema: Connection has been closed" );
1204- }
1205-
12061063 const char *catalog = bind_data.has_catalog_filter ? bind_data.catalog_filter .c_str () : nullptr ;
12071064 const char *db_schema = bind_data.has_schema_filter ? bind_data.schema_filter .c_str () : nullptr ;
12081065
0 commit comments