99#include " duckdb/main/extension_util.hpp"
1010#include " vortex_extension.hpp"
1111
12- #include " vortex.h"
13-
12+ #include " vortex_common.hpp"
1413#include " expr/expr.hpp"
1514
1615#ifndef DUCKDB_EXTENSION_MAIN
@@ -31,7 +30,7 @@ struct VortexBindData : public TableFunctionData {
3130 vector<LogicalType> columns_types;
3231 vector<string> column_names;
3332 uint64_t num_columns;
34- File * initial_file;
33+ unique_ptr<VortexFile> initial_file;
3534
3635 shared_ptr<MultiFileList> file_list;
3736
@@ -57,24 +56,27 @@ struct VortexBindData : public TableFunctionData {
5756struct VortexScanLocalState : public LocalTableFunctionState {
5857 idx_t current_row;
5958 bool finished;
60- Array *array;
59+ unique_ptr<VortexArray> array;
60+ unique_ptr<VortexConversionCache> cache;
6161 uint32_t thread_id;
6262
6363 explicit VortexScanLocalState (uint32_t thread_id)
64- : current_row(0 ), finished(false ), array(nullptr ), thread_id(thread_id) {
64+ : current_row(0 ), finished(false ), array(nullptr ), cache( nullptr ), thread_id(thread_id) {
6565 }
6666};
6767
6868struct FileSlot {
6969 std::mutex slot_lock;
70- ArrayStream * array_stream;
70+ unique_ptr<VortexArrayStream> array_stream;
7171};
7272
7373struct VortexScanGlobalState : public GlobalTableFunctionState {
7474 // Must be <= MAX_THREAD_COUNT.
7575 std::atomic_uint32_t thread_id_counter;
7676 std::atomic_bool finished;
7777
78+ std::uint64_t cache_id;
79+
7880 // Each thread owns a file slot and is the thing only one allowed to modify the slot itself.
7981 // Other threads can work-steal array batches from the slot, by taking out the mutex in the FileSlot.
8082 // We allocate MAX_THREAD_COUNT threads, the max number threads allowed by this extension.
@@ -100,7 +102,7 @@ struct VortexScanGlobalState : public GlobalTableFunctionState {
100102 }
101103
102104 explicit VortexScanGlobalState ()
103- : thread_id_counter(0 ), finished(false ), file_slots(), next_file(0 ), filter(nullptr ) {
105+ : thread_id_counter(0 ), finished(false ), cache_id( 0 ), file_slots(), next_file(0 ), filter(nullptr ) {
104106 }
105107};
106108
@@ -152,22 +154,23 @@ std::string EnsureFileProtocol(const std::string &path) {
152154 return prefix + path;
153155}
154156
155- static File *OpenFile (const std::string &filename, vector<LogicalType> &column_types, vector<string> &column_names) {
157+ static unique_ptr<VortexFile> OpenFile (const std::string &filename, vector<LogicalType> &column_types,
158+ vector<string> &column_names) {
156159 FileOpenOptions options;
157160 options.uri = filename.c_str ();
158161 options.property_keys = nullptr ;
159162 options.property_vals = nullptr ;
160163 options.property_len = 0 ;
161164
162- File * file = File_open (&options);
165+ auto file = VortexFile::Open (&options);
163166 if (!file) {
164167 throw IOException (" Failed to open Vortex file: " + filename);
165168 }
166169
167170 // This Ptr is owned by the file
168- const DType *file_dtype = File_dtype (file);
171+ const DType *file_dtype = File_dtype (file-> file );
169172 if (DType_get (file_dtype) != DTYPE_STRUCT) {
170- File_free (file);
173+ File_free (file-> file );
171174 throw FatalException (" Vortex file does not contain a struct array as a top-level dtype" );
172175 }
173176
@@ -193,7 +196,7 @@ static void VerifyNewFile(const VortexBindData &bind_data, vector<LogicalType> &
193196 }
194197}
195198
196- static File * OpenFileAndVerify (const std::string &filename, const VortexBindData &bind_data) {
199+ static unique_ptr<VortexFile> OpenFileAndVerify (const std::string &filename, const VortexBindData &bind_data) {
197200 auto new_column_names = vector<string>();
198201 new_column_names.reserve (bind_data.column_names .size ());
199202 auto new_column_types = vector<LogicalType>();
@@ -204,7 +207,8 @@ static File *OpenFileAndVerify(const std::string &filename, const VortexBindData
204207 return file;
205208}
206209
207- static ArrayStream *OpenArrayStream (const VortexBindData &bind_data, VortexScanGlobalState &global_state, File *file) {
210+ static unique_ptr<VortexArrayStream> OpenArrayStream (const VortexBindData &bind_data,
211+ VortexScanGlobalState &global_state, VortexFile *file) {
208212 auto options = FileScanOptions {
209213 .projection = global_state.projected_column_names .data (),
210214 .projection_len = static_cast <int >(global_state.projected_column_names .size ()),
@@ -214,10 +218,10 @@ static ArrayStream *OpenArrayStream(const VortexBindData &bind_data, VortexScanG
214218 // This has a few factor effecting it:
215219 // 1. A smaller value means for work for the vortex file reader.
216220 // 2. A larger value reduces the parallelism available to the scanner
217- .split_by_row_count = 2048 * 8 ,
221+ .split_by_row_count = 2048 * 32 ,
218222 };
219223
220- return File_scan (file, &options);
224+ return make_uniq<VortexArrayStream>( File_scan (file-> file , &options) );
221225}
222226
223227static void VortexScanFunction (ClientContext &context, TableFunctionInput &data, DataChunk &output) {
@@ -239,10 +243,9 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data,
239243 // todo: 3. check if we can work steal from another thread
240244 // 4. we are done
241245
242- auto next = slot.array_stream != nullptr ? FFIArrayStream_next ( slot.array_stream ) : false ;
246+ auto next = slot.array_stream != nullptr ? slot.array_stream -> NextArray ( ) : false ;
243247 while (!next) {
244248 if (slot.array_stream != nullptr ) {
245- FFIArrayStream_free (slot.array_stream );
246249 slot.array_stream = nullptr ;
247250 }
248251
@@ -258,19 +261,24 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data,
258261 auto file_name = global_state.expanded_files [file_idx];
259262 auto file = OpenFileAndVerify (file_name, bind_data);
260263
261- slot.array_stream = OpenArrayStream (bind_data, global_state, file);
262- next = FFIArrayStream_next ( slot.array_stream );
264+ slot.array_stream = OpenArrayStream (bind_data, global_state, file. get () );
265+ next = slot.array_stream -> NextArray ( );
263266 }
264- local_state.array = FFIArrayStream_current ( slot.array_stream );
267+ local_state.array = slot.array_stream -> CurrentArray ( );
265268 local_state.current_row = 0 ;
266269 }
267270
268- local_state.current_row = FFIArray_to_duckdb_chunk (local_state.array , local_state.current_row ,
269- reinterpret_cast <duckdb_data_chunk>(&output));
271+ if (local_state.cache == nullptr ) {
272+ // Create a unique value so each cache can be differentiated.
273+ local_state.cache = make_uniq<VortexConversionCache>(global_state.cache_id ++);
274+ }
275+
276+ local_state.current_row = local_state.array ->ToDuckDBVector (
277+ local_state.current_row , reinterpret_cast <duckdb_data_chunk>(&output), local_state.cache .get ());
270278
271279 if (local_state.current_row == 0 ) {
272- FFIArray_free (local_state.array );
273280 local_state.array = nullptr ;
281+ local_state.cache = nullptr ;
274282 }
275283}
276284
@@ -297,7 +305,7 @@ static unique_ptr<FunctionData> VortexBind(ClientContext &context, TableFunction
297305}
298306
299307unique_ptr<NodeStatistics> VortexCardinality (ClientContext &context, const FunctionData *bind_data) {
300- auto data = bind_data->Cast <VortexBindData>();
308+ auto & data = bind_data->Cast <VortexBindData>();
301309
302310 return make_uniq<NodeStatistics>(data.num_columns , data.num_columns );
303311}
@@ -336,7 +344,7 @@ void VortexExtension::Load(DuckDB &db) {
336344 state->projected_column_names = column_names;
337345
338346 // Can ignore mutex since no other threads are running now.
339- state->file_slots [0 ].array_stream = OpenArrayStream (bind, *state, bind.initial_file );
347+ state->file_slots [0 ].array_stream = OpenArrayStream (bind, *state, bind.initial_file . get () );
340348 state->next_file = 1 ;
341349
342350 return std::move (state);
0 commit comments