1818#include " concurrentqueue.h"
1919
2020#include " vortex.hpp"
21- #include " vortex_extension.hpp"
22- #include " vortex_layout_reader.hpp"
2321#include " vortex_scan.hpp"
2422#include " vortex_common.hpp"
2523#include " vortex_expr.hpp"
@@ -42,7 +40,7 @@ struct BindData : public TableFunctionData {
4240
4341 // Used to read the schema during the bind phase and cached here to
4442 // avoid having to open the same file again during the scan phase.
45- unique_ptr <FileReader> initial_file;
43+ shared_ptr <FileReader> initial_file;
4644
4745 // Used to create an arena for protobuf exprs, need a ptr since the bind arg is const.
4846 unique_ptr<google::protobuf::Arena> arena;
@@ -107,7 +105,7 @@ struct ScanGlobalState : public GlobalTableFunctionState {
107105 // Multi producer, multi consumer lockfree queue.
108106 duckdb_moodycamel::ConcurrentQueue<ScanPartition> scan_partitions {8192 };
109107
110- std::vector<std:: shared_ptr<LayoutReader >> layout_readers ;
108+ std::vector<shared_ptr<FileReader >> file_readers ;
111109
112110 // The column idx that must be returned by the scan.
113111 vector<idx_t > column_ids;
@@ -252,9 +250,9 @@ static bool PinFileToThread(ScanGlobalState &global_state) {
252250}
253251
254252static void CreateScanPartitions (ClientContext &context, const BindData &bind, ScanGlobalState &global_state,
255- ScanLocalState &local_state, uint64_t file_idx, unique_ptr< FileReader> &file_reader) {
253+ ScanLocalState &local_state, uint64_t file_idx, FileReader &file_reader) {
256254 const auto file_name = global_state.expanded_files [file_idx];
257- const auto row_count = Try ([&](auto err) { return vx_file_row_count (file_reader-> file , err); });
255+ const auto row_count = Try ([&](auto err) { return vx_file_row_count (file_reader. file , err); });
258256
259257 const auto thread_count = std::thread::hardware_concurrency ();
260258 const auto file_count = global_state.expanded_files .size ();
@@ -292,8 +290,7 @@ static void CreateScanPartitions(ClientContext &context, const BindData &bind, S
292290 D_ASSERT (global_state.files_partitioned <= global_state.expanded_files .size ());
293291}
294292
295- static unique_ptr<ArrayIterator> OpenArrayIter (ScanGlobalState &global_state,
296- std::shared_ptr<LayoutReader> &layout_reader,
293+ static unique_ptr<ArrayIterator> OpenArrayIter (ScanGlobalState &global_state, shared_ptr<FileReader> &file_reader,
297294 ScanPartition row_range_partition) {
298295 const auto options = vx_file_scan_options {
299296 .projection = global_state.projected_column_names .data (),
@@ -305,7 +302,7 @@ static unique_ptr<ArrayIterator> OpenArrayIter(ScanGlobalState &global_state,
305302 .row_range_end = row_range_partition.end_row ,
306303 };
307304
308- return make_uniq<ArrayIterator>(layout_reader ->Scan (&options));
305+ return make_uniq<ArrayIterator>(file_reader ->Scan (&options));
309306}
310307
311308// Assigns the next array from the array stream.
@@ -348,8 +345,8 @@ static bool GetNextArray(ClientContext &context, const BindData &bind_data, Scan
348345
349346 // Layout readers are safe to share across threads for reading. Further, they
350347 // are created before pushing partitions of the corresponing files into a queue.
351- auto layout_reader = global_state.layout_readers [partition.file_idx ];
352- local_state.array_iterator = OpenArrayIter (global_state, layout_reader , partition);
348+ auto file_reader = global_state.file_readers [partition.file_idx ];
349+ local_state.array_iterator = OpenArrayIter (global_state, file_reader , partition);
353350 }
354351
355352 local_state.currently_scanned_array = local_state.array_iterator ->NextArray ();
@@ -378,20 +375,25 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data,
378375 return ;
379376 }
380377
381- // Free layout readers as long as we pin files to threads .
378+ // Free file readers when owned by the thread .
382379 if (local_state.scan_partitions .empty () && local_state.thread_local_file_idx .has_value ()) {
383- global_state.layout_readers [local_state.thread_local_file_idx .value ()] = nullptr ;
380+ global_state.file_readers [local_state.thread_local_file_idx .value ()] = nullptr ;
384381 local_state.thread_local_file_idx .reset ();
385382 }
386383
387384 // Create new scan partitions in case the queue is empty.
388385 if (auto file_idx = global_state.next_file_idx .fetch_add (1 );
389386 file_idx < global_state.expanded_files .size ()) {
390- auto file_name = global_state.expanded_files [file_idx];
391- auto vortex_file =
392- OpenFileAndVerify (FileSystem::GetFileSystem (context), *bind_data.session , file_name, bind_data);
393- global_state.layout_readers [file_idx] = LayoutReader::CreateFromFile (vortex_file.get ());
394- CreateScanPartitions (context, bind_data, global_state, local_state, file_idx, vortex_file);
387+ if (file_idx == 0 ) {
388+ global_state.file_readers [0 ] = bind_data.initial_file ;
389+ } else {
390+ auto file_name = global_state.expanded_files [file_idx];
391+ global_state.file_readers [file_idx] =
392+ OpenFileAndVerify (FileSystem::GetFileSystem (context), *bind_data.session , file_name, bind_data);
393+ }
394+
395+ CreateScanPartitions (context, bind_data, global_state, local_state, file_idx,
396+ *global_state.file_readers [file_idx]);
395397 }
396398 }
397399 }
@@ -491,7 +493,7 @@ void RegisterScanFunction(DatabaseInstance &instance) {
491493 }
492494
493495 // Resizing the empty vector default constructs std::shared pointers at all indices with nullptr.
494- global_state->layout_readers .resize (global_state->expanded_files .size ());
496+ global_state->file_readers .resize (global_state->expanded_files .size ());
495497
496498 bind.arena ->Reset ();
497499 return std::move (global_state);
0 commit comments