22#include " duckdb/common/arrow/arrow.hpp"
33#include " duckdb/common/arrow/arrow_wrapper.hpp"
44#include " duckdb/common/atomic.hpp"
5+ #include " duckdb/common/limits.hpp"
56#include " duckdb/function/table/arrow.hpp"
67#include " duckdb/main/attached_database.hpp"
78#include " duckdb/main/database_manager.hpp"
1415
1516#include < arrow/c/bridge.h>
1617#include < arrow/util/iterator.h>
17- #include < iostream>
18- #include < limits>
18+ #include < unordered_map>
1919
2020namespace duckdb {
2121namespace bigquery {
@@ -58,7 +58,6 @@ unique_ptr<FunctionData> BigqueryArrowScanFunction::BigqueryArrowScanBind(Client
5858 if (!status.ok ()) {
5959 throw BinderException (" Arrow schema export failed: " + status.ToString ());
6060 }
61-
6261 // Convert Arrow schema to DuckDB types and names
6362 vector<LogicalType> mapped_bq_types; // original physical types if casts required
6463 BigqueryUtils::PopulateAndMapArrowTableTypes (context,
@@ -161,30 +160,68 @@ unique_ptr<GlobalTableFunctionState> BigqueryArrowScanFunction::BigqueryArrowSca
161160 gstate->scanned_types .emplace_back (bind_data.mapped_bq_types [col_id]);
162161 }
163162 }
164-
165- bool needs_projection = false ;
166- for (idx_t out_idx = 0 ; out_idx < input.projection_ids .size (); ++out_idx) {
167- idx_t proj_id = input.projection_ids [out_idx];
168- idx_t col_id = input.column_ids [proj_id];
163+ // Map each requested column (in column_ids order) to its physical index in the Arrow schema
164+ vector<idx_t > column_physical_positions;
165+ column_physical_positions.reserve (input.column_ids .size ());
166+ const idx_t invalid_phys_idx = NumericLimits<idx_t >::Maximum ();
167+ bool contains_rowid = false ;
168+ for (idx_t col_pos = 0 ; col_pos < input.column_ids .size (); ++col_pos) {
169+ idx_t col_id = input.column_ids [col_pos];
169170 if (col_id == COLUMN_IDENTIFIER_ROW_ID || col_id < 0 ) {
171+ contains_rowid = true ;
172+ column_physical_positions.push_back (invalid_phys_idx);
170173 continue ;
171174 }
172-
173175 const string &name = bind_data.names [col_id];
174176 idx_t phys_idx = static_cast <idx_t >(arrow_schema->GetFieldIndex (name));
175177 if (phys_idx == static_cast <idx_t >(-1 )) {
176178 throw InternalException (" Column '" + name + " ' not found in Arrow schema" );
177179 }
178-
179- gstate->projection_ids .push_back (phys_idx);
180- if (phys_idx != out_idx) {
181- needs_projection = true ;
182- }
180+ column_physical_positions.push_back (phys_idx);
183181 }
184182
185- // Clear projection IDs if no reordering is needed
186- if (!needs_projection) {
187- gstate->projection_ids .clear ();
183+ bool requires_physical_reorder = false ;
184+ vector<idx_t > projection_mapping;
185+ if (!input.projection_ids .empty ()) {
186+ projection_mapping.reserve (input.projection_ids .size ());
187+ for (idx_t out_idx = 0 ; out_idx < input.projection_ids .size (); ++out_idx) {
188+ idx_t col_pos = input.projection_ids [out_idx];
189+ D_ASSERT (col_pos < column_physical_positions.size ());
190+ idx_t phys_idx = column_physical_positions[col_pos];
191+ if (phys_idx == invalid_phys_idx) {
192+ contains_rowid = true ;
193+ requires_physical_reorder = true ;
194+ break ;
195+ }
196+ projection_mapping.push_back (phys_idx);
197+ if (phys_idx != col_pos) {
198+ requires_physical_reorder = true ;
199+ }
200+ }
201+ if (!contains_rowid) {
202+ gstate->projection_ids = std::move (projection_mapping);
203+ } else {
204+ gstate->projection_ids .clear ();
205+ }
206+ } else {
207+ projection_mapping.reserve (column_physical_positions.size ());
208+ for (idx_t out_idx = 0 ; out_idx < column_physical_positions.size (); ++out_idx) {
209+ idx_t phys_idx = column_physical_positions[out_idx];
210+ if (phys_idx == invalid_phys_idx) {
211+ contains_rowid = true ;
212+ requires_physical_reorder = true ;
213+ break ;
214+ }
215+ projection_mapping.push_back (phys_idx);
216+ if (phys_idx != out_idx && phys_idx != COLUMN_IDENTIFIER_ROW_ID && phys_idx >= 0 ) {
217+ requires_physical_reorder = true ;
218+ }
219+ }
220+ if (!contains_rowid && requires_physical_reorder) {
221+ gstate->projection_ids = std::move (projection_mapping);
222+ } else {
223+ gstate->projection_ids .clear ();
224+ }
188225 }
189226
190227 // Create the Arrow scan stream
@@ -208,10 +245,9 @@ unique_ptr<LocalTableFunctionState> BigqueryArrowScanFunction::BigqueryArrowScan
208245
209246 result->column_ids = sorted_column_ids;
210247 result->filters = input.filters .get ();
211-
212248 if (!bind_data.projection_pushdown_enabled ) {
213249 result->column_ids .clear ();
214- } else if (!input.projection_ids .empty () || bind_data.requires_cast ) {
250+ } else if (!input.projection_ids .empty () || bind_data.requires_cast || !global_state. projection_ids . empty () ) {
215251 auto &asgs = global_state_p->Cast <ArrowScanGlobalState>();
216252 result->all_columns .Initialize (client_context, asgs.scanned_types );
217253 }
0 commit comments