diff --git a/src/include/vortex_common.hpp b/src/include/vortex_common.hpp index 4910527..306571f 100644 --- a/src/include/vortex_common.hpp +++ b/src/include/vortex_common.hpp @@ -82,6 +82,7 @@ struct FileReader { vx_file_reader *file; }; + struct Array { explicit Array(vx_array *array) : array(array) { } @@ -107,12 +108,25 @@ struct Array { vx_array *array; }; + struct ArrayIterator { explicit ArrayIterator(vx_array_iterator *array_iter) : array_iter(array_iter) { } + /// Releases ownership of the native array iterator ptr to the caller. The caller is then responsible for + /// eventually calling vx_array_iter_free. + /// + /// This ArrayIterator is useless after this call. + vx_array_iterator* release() { + auto* ptr = array_iter; + array_iter = nullptr; // Give up ownership + return ptr; + } + ~ArrayIterator() { - vx_array_iter_free(array_iter); + if (array_iter) { + vx_array_iter_free(array_iter); + } } duckdb::unique_ptr NextArray() const { @@ -128,6 +142,32 @@ struct ArrayIterator { vx_array_iterator *array_iter; }; + +struct ArrayExporter { + explicit ArrayExporter(vx_duckdb_exporter *exporter) : exporter(exporter) { + } + + ~ArrayExporter() { + if (exporter != nullptr) { + vx_duckdb_exporter_free(exporter); + } + } + + static duckdb::unique_ptr FromArrayIterator(duckdb::unique_ptr array_iter) { + auto exporter = Try([&](auto err) { + return vx_duckdb_exporter_create(array_iter->release(), err); + }); + return duckdb::make_uniq(exporter); + } + + bool ExportNext(duckdb_data_chunk output) const { + return Try([&](auto err) { return vx_duckdb_exporter_next(exporter, output, err); }); + } + + vx_duckdb_exporter *exporter; +}; + + struct ArrayStreamSink { explicit ArrayStreamSink(vx_array_sink *sink, duckdb::unique_ptr dtype) : sink(sink), dtype(std::move(dtype)) { diff --git a/src/include/vortex_expr.hpp b/src/include/vortex_expr.hpp index cdf831f..191c689 100644 --- a/src/include/vortex_expr.hpp +++ b/src/include/vortex_expr.hpp @@ -6,6 +6,15 @@ #include "expr.pb.h" namespace vortex { +// vortex expr proto ids. +const std::string BETWEEN_ID = "between"; +const std::string BINARY_ID = "binary"; +const std::string GET_ITEM_ID = "get_item"; +const std::string IDENTITY_ID = "identity"; +const std::string LIKE_ID = "like"; +const std::string LITERAL_ID = "literal"; +const std::string NOT_ID = "not"; + vortex::expr::Expr *table_expression_into_expr(google::protobuf::Arena &arena, duckdb::TableFilter &filter, const std::string &column_name); vortex::expr::Expr *expression_into_vortex_expr(google::protobuf::Arena &arena, const duckdb::Expression &expr); diff --git a/src/vortex_expr.cpp b/src/vortex_expr.cpp index fae6f17..62a4393 100644 --- a/src/vortex_expr.cpp +++ b/src/vortex_expr.cpp @@ -33,15 +33,6 @@ using std::string; namespace vortex { -// vortex expr proto ids. -const string BETWEEN_ID = "between"; -const string BINARY_ID = "binary"; -const string GET_ITEM_ID = "get_item"; -const string IDENTITY_ID = "identity"; -const string LIKE_ID = "like"; -const string LITERAL_ID = "literal"; -const string NOT_ID = "not"; - // Temporal ids const string VORTEX_DATE_ID = "vortex.date"; const string VORTEX_TIME_ID = "vortex.time"; diff --git a/src/vortex_scan.cpp b/src/vortex_scan.cpp index 019eeba..f55a722 100644 --- a/src/vortex_scan.cpp +++ b/src/vortex_scan.cpp @@ -22,6 +22,9 @@ #include "vortex_common.hpp" #include "vortex_expr.hpp" #include "vortex_session.hpp" +#include "duckdb/function/table/table_scan.hpp" +#include "duckdb/planner/filter/dynamic_filter.hpp" +#include "duckdb/planner/filter/optional_filter.hpp" using namespace duckdb; @@ -30,7 +33,7 @@ namespace vortex { /// Bind data for the Vortex table function that holds information about the /// file and its schema. This data is populated during the bind phase, which /// happens during the query planning phase. -struct BindData : public TableFunctionData { +struct ScanBindData : public TableFunctionData { // Session used to caching shared_ptr session; @@ -47,19 +50,19 @@ struct BindData : public TableFunctionData { vector conjuncts; bool Equals(const FunctionData &other_p) const override { - auto &other = other_p.Cast(); + auto &other = other_p.Cast(); return file_list == other.file_list && column_names == other.column_names && columns_types == other.columns_types; } unique_ptr Copy() const override { - auto result = make_uniq(); + auto result = make_uniq(); result->session = session; result->file_list = file_list; result->columns_types = columns_types; result->column_names = column_names; result->initial_file = initial_file; - return std::move(result); + return std::move(result); } }; @@ -73,10 +76,9 @@ struct ScanPartition { /// operation. In DuckDB's execution model, a query reading from a file can be /// parallelized by dividing it into ranges, each handled by a different scan. struct ScanLocalState : public LocalTableFunctionState { - idx_t array_row_offset; - unique_ptr currently_scanned_array; - unique_ptr array_iterator; - unique_ptr conversion_cache; + bool finished; + + unique_ptr array_exporter; std::queue scan_partitions; @@ -85,14 +87,17 @@ struct ScanLocalState : public LocalTableFunctionState { }; struct ScanGlobalState : public GlobalTableFunctionState { - std::atomic_bool finished; std::atomic_uint64_t cache_id; vector expanded_files; optional_ptr filter; // The precomputed filter string used in the query - std::string filter_str; + std::string static_filter_str; + // Any dynamic filter contained in the query + map> dynamic_filters; + // Static filter expression, owned by the arena + expr::Expr *static_filter_expr; // Limited to indicate progress in `table_scan_progress`. std::atomic_uint32_t partitions_processed; @@ -120,20 +125,61 @@ struct ScanGlobalState : public GlobalTableFunctionState { constexpr uint32_t MAX_THREAD_COUNT = 192; return MAX_THREAD_COUNT; } + + // Return the conjunction of the static and dynamic filters, if either exist. + // The dynamic filter can be updated and so we recompute the filter if there is an + // active dyn filter. + // TODO(joe): cache the dyn filter expr if the dynamic filters have not changed. + std::string filter_expression_string(google::protobuf::Arena &arena) { + if (dynamic_filters.empty()) { + return static_filter_str; + } + vector conjs; + for (auto &[col_name, filter] : dynamic_filters) { + auto g = lock_guard(filter->lock); + if (!filter->initialized) { + continue; + } + auto conj = table_expression_into_expr(arena, *filter->filter, col_name); + if (conj) { + conjs.push_back(conj); + } + } + if (conjs.empty()) { + return static_filter_expr->SerializeAsString(); + } + auto dynamic_expr = flatten_exprs(arena, conjs); + auto expr = arena.Create(&arena); + expr->set_id(BINARY_ID); + expr->mutable_kind()->set_binary_op(expr::Kind::And); + expr->add_children()->Swap(dynamic_expr); + expr->add_children()->CopyFrom(*static_filter_expr); + return expr->SerializeAsString(); + } }; // Use to create vortex expressions from `TableFilterSet` filter. -void CreateFilterExpression(google::protobuf::Arena &arena, vector column_names, - optional_ptr filter, vector column_ids, - vector &conjuncts) { - if (filter == nullptr) { +void ExtractFilterExpression(google::protobuf::Arena &arena, vector column_names, + optional_ptr filter_set, vector column_ids, + vector &conjuncts, map> &dyn_filters) { + if (filter_set == nullptr) { return; } - for (const auto &[col_id, value] : filter->filters) { + for (const auto &[col_id, value] : filter_set->filters) { auto column_name = column_names[column_ids[col_id]]; - auto conj = table_expression_into_expr(arena, *value, column_name); - conjuncts.push_back(conj); + + // Extract the optional dynamic filter, this seems like the only way that + // duckdb will use dynamic filters. + if (value->filter_type == TableFilterType::OPTIONAL_FILTER) { + auto &opt_filter = value->Cast().child_filter; + if (opt_filter->filter_type == TableFilterType::DYNAMIC_FILTER) { + dyn_filters.emplace(column_name, opt_filter->Cast().filter_data); + } + } else { + auto conj = table_expression_into_expr(arena, *value, column_name); + conjuncts.push_back(conj); + } } } @@ -214,7 +260,8 @@ static unique_ptr OpenFile(const std::string &filename, VortexSessio // This function ensures schema consistency across all the files in a multi-file query. // It compares the column types and names extracted from a new file against the schema // obtained from the first file (stored in bind_data). -static void VerifyNewFile(const BindData &bind_data, vector &column_types, vector &column_names) { +static void VerifyNewFile(const ScanBindData &bind_data, vector &column_types, + vector &column_names) { if (column_types.size() != bind_data.columns_types.size() || column_names != bind_data.column_names) { throw FatalException("Vortex file does not contain the same number of columns as the first"); } @@ -230,7 +277,7 @@ static void VerifyNewFile(const BindData &bind_data, vector &column } static unique_ptr OpenFileAndVerify(FileSystem &fs, VortexSession &session, const std::string &filename, - const BindData &bind_data) { + const ScanBindData &bind_data) { auto new_column_names = vector(); new_column_names.reserve(bind_data.column_names.size()); @@ -251,11 +298,10 @@ static bool PinFileToThread(ScanGlobalState &global_state) { return (file_count - global_state.files_partitioned) > thread_count; } -static void CreateScanPartitions(ClientContext &context, const BindData &bind, ScanGlobalState &global_state, +static void CreateScanPartitions(ClientContext &context, const ScanBindData &bind, ScanGlobalState &global_state, ScanLocalState &local_state, uint64_t file_idx, FileReader &file_reader) { - - if (global_state.file_readers[file_idx]->CanPrune(global_state.filter_str.data(), - static_cast(global_state.filter_str.length()))) { + auto filter_str = global_state.filter_expression_string(*bind.arena); + if (global_state.file_readers[file_idx]->CanPrune(filter_str.data(), static_cast(filter_str.length()))) { global_state.files_partitioned += 1; return; } @@ -298,13 +344,15 @@ static void CreateScanPartitions(ClientContext &context, const BindData &bind, S D_ASSERT(global_state.files_partitioned <= global_state.expanded_files.size()); } -static unique_ptr OpenArrayIter(ScanGlobalState &global_state, shared_ptr &file_reader, - ScanPartition row_range_partition) { +static unique_ptr OpenArrayIter(const ScanBindData &bind, ScanGlobalState &global_state, + shared_ptr &file_reader, ScanPartition row_range_partition) { + auto filter_str = global_state.filter_expression_string(*bind.arena); + const auto options = vx_file_scan_options { .projection = global_state.projected_column_names.data(), .projection_len = static_cast(global_state.projected_column_names.size()), - .filter_expression = global_state.filter_str.data(), - .filter_expression_len = static_cast(global_state.filter_str.length()), + .filter_expression = filter_str.data(), + .filter_expression_len = static_cast(filter_str.length()), .split_by_row_count = 0, .row_range_start = row_range_partition.start_row, .row_range_end = row_range_partition.end_row, @@ -313,11 +361,10 @@ static unique_ptr OpenArrayIter(ScanGlobalState &global_state, sh return make_uniq(file_reader->Scan(&options)); } -// Assigns the next array from the array stream. +// Assigns the next array exporter. // -// Returns true if a new array was assigned, false otherwise. -static bool GetNextArray(ClientContext &context, const BindData &bind_data, ScanGlobalState &global_state, - ScanLocalState &local_state, DataChunk &output) { +// Returns true if a new exporter was assigned, false otherwise. +static bool GetNextExporter(ClientContext &context, const ScanBindData &bind_data, ScanGlobalState &global_state, ScanLocalState &local_state) { // Try to deque a partition off the thread local queue. auto try_dequeue = [&](ScanPartition &scan_partition) { @@ -330,7 +377,7 @@ static bool GetNextArray(ClientContext &context, const BindData &bind_data, Scan return true; }; - if (local_state.array_iterator == nullptr) { + if (local_state.array_exporter == nullptr) { ScanPartition partition; if (bool success = (try_dequeue(partition) || global_state.scan_partitions.try_dequeue(partition)); !success) { @@ -341,7 +388,7 @@ static bool GetNextArray(ClientContext &context, const BindData &bind_data, Scan // A new partition might have been created after the first pop. Therefore, // one more pop is necessary to ensure no more partitions are left to process. if (success = global_state.scan_partitions.try_dequeue(partition); !success) { - global_state.finished = true; + local_state.finished = true; return false; } } @@ -354,68 +401,60 @@ static bool GetNextArray(ClientContext &context, const BindData &bind_data, Scan // Layout readers are safe to share across threads for reading. Further, they // are created before pushing partitions of the corresponing files into a queue. auto file_reader = global_state.file_readers[partition.file_idx]; - local_state.array_iterator = OpenArrayIter(global_state, file_reader, partition); - } - - local_state.currently_scanned_array = local_state.array_iterator->NextArray(); - local_state.array_row_offset = 0; - - if (local_state.currently_scanned_array == nullptr) { - local_state.array_iterator = nullptr; - global_state.partitions_processed += 1; - - return false; + auto array_iter = OpenArrayIter(bind_data, global_state, file_reader, partition); + local_state.array_exporter = ArrayExporter::FromArrayIterator(std::move(array_iter)); } return true; } static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) { - auto &bind_data = data.bind_data->Cast(); + auto &bind_data = data.bind_data->Cast(); auto &global_state = data.global_state->Cast(); auto &local_state = data.local_state->Cast(); - if (local_state.currently_scanned_array == nullptr) { - while (!GetNextArray(context, bind_data, global_state, local_state, output)) { - if (global_state.finished) { - output.Reset(); - output.SetCardinality(0); + while (true) { + if (local_state.array_exporter != nullptr) { + if (local_state.array_exporter->ExportNext(reinterpret_cast(&output))) { + // Successfully exported a chunk return; + } else { + // Otherwise, reset the exporter and try the next one. + global_state.partitions_processed += 1; + local_state.array_exporter = nullptr; } + } - // Free file readers when owned by the thread. - if (local_state.scan_partitions.empty() && local_state.thread_local_file_idx.has_value()) { - global_state.file_readers[local_state.thread_local_file_idx.value()] = nullptr; - local_state.thread_local_file_idx.reset(); - } - - // Create new scan partitions in case the queue is empty. - if (auto file_idx = global_state.next_file_idx.fetch_add(1); - file_idx < global_state.expanded_files.size()) { - if (file_idx == 0) { - global_state.file_readers[0] = bind_data.initial_file; - } else { - auto file_name = global_state.expanded_files[file_idx]; - global_state.file_readers[file_idx] = - OpenFileAndVerify(FileSystem::GetFileSystem(context), *bind_data.session, file_name, bind_data); + if (!local_state.finished) { + // Try to get the next exporter, if we fail, make progress on partitions and then loop. + if (!GetNextExporter(context, bind_data, global_state, local_state)) { + // Free file readers when owned by the thread. + if (local_state.scan_partitions.empty() && local_state.thread_local_file_idx.has_value()) { + global_state.file_readers[local_state.thread_local_file_idx.value()] = nullptr; + local_state.thread_local_file_idx.reset(); } - CreateScanPartitions(context, bind_data, global_state, local_state, file_idx, - *global_state.file_readers[file_idx]); + // Create new scan partitions in case the queue is empty. + if (auto file_idx = global_state.next_file_idx.fetch_add(1); + file_idx < global_state.expanded_files.size()) { + if (file_idx == 0) { + global_state.file_readers[0] = bind_data.initial_file; + } else { + auto file_name = global_state.expanded_files[file_idx]; + global_state.file_readers[file_idx] = + OpenFileAndVerify(FileSystem::GetFileSystem(context), *bind_data.session, file_name, bind_data); + } + + CreateScanPartitions(context, bind_data, global_state, local_state, file_idx, + *global_state.file_readers[file_idx]); + } } + continue; } - } - - if (local_state.conversion_cache == nullptr) { - local_state.conversion_cache = make_uniq(global_state.cache_id++); - } - - local_state.array_row_offset = local_state.currently_scanned_array->ToDuckDBVector( - local_state.array_row_offset, reinterpret_cast(&output), local_state.conversion_cache.get()); - if (local_state.array_row_offset == 0) { - local_state.currently_scanned_array = nullptr; - local_state.conversion_cache = nullptr; + // Otherwise, we're truly done. + output.Reset(); + return; } } @@ -425,7 +464,7 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, /// like projection pushdown and predicate pushdown. static unique_ptr VortexBind(ClientContext &context, TableFunctionBindInput &input, vector &column_types, vector &column_names) { - auto result = make_uniq(); + auto result = make_uniq(); result->arena = make_uniq(); const static string VortexExtensionKey = std::string("vortex_extension:vortex_session"); @@ -452,7 +491,7 @@ static unique_ptr VortexBind(ClientContext &context, TableFunction } unique_ptr VortexCardinality(ClientContext &context, const FunctionData *bind_data) { - auto &data = bind_data->Cast(); + auto &data = bind_data->Cast(); auto row_count = data.initial_file->FileRowCount(); if (data.file_list->GetTotalFileCount() == 1) { @@ -469,12 +508,18 @@ void PushdownComplexFilter(ClientContext &context, LogicalGet &get, FunctionData return; } - auto &bind = bind_data->Cast(); + auto &bind = bind_data->Cast(); bind.conjuncts.reserve(filters.size()); - for (auto &filter : filters) { - if (auto expr = expression_into_vortex_expr(*bind.arena, *filter); expr != nullptr) { + // Delete filters here so they are not given to used the create global state callback. + for (auto iter = filters.begin(); iter != filters.end();) { + auto expr = expression_into_vortex_expr(*bind.arena, *iter->get()); + if (expr != nullptr) { bind.conjuncts.push_back(expr); + + iter = filters.erase(iter); + } else { + ++iter; } } } @@ -485,7 +530,7 @@ void RegisterScanFunction(DatabaseInstance &instance) { vortex_scan.init_global = [](ClientContext &context, TableFunctionInitInput &input) -> unique_ptr { - auto &bind = input.bind_data->CastNoConst(); + auto &bind = input.bind_data->CastNoConst(); auto global_state = make_uniq(); // TODO(joe): do this expansion gradually in the scan to avoid a slower start. @@ -500,15 +545,17 @@ void RegisterScanFunction(DatabaseInstance &instance) { PopulateProjection(*global_state, bind.column_names, input); // Most expressions are extracted from `PushdownComplexFilter`, the final filters come from `input.filters`. - CreateFilterExpression(*bind.arena, bind.column_names, input.filters, input.column_ids, bind.conjuncts); - if (auto exprs = flatten_exprs(*bind.arena, bind.conjuncts); exprs != nullptr) { - global_state->filter_str = exprs->SerializeAsString(); + ExtractFilterExpression(*bind.arena, bind.column_names, input.filters, input.column_ids, bind.conjuncts, + global_state->dynamic_filters); + // Create the static filter expression + global_state->static_filter_expr = flatten_exprs(*bind.arena, bind.conjuncts); + if (global_state->static_filter_expr != nullptr) { + global_state->static_filter_str = global_state->static_filter_expr->SerializeAsString(); } // Resizing the empty vector default constructs std::shared pointers at all indices with nullptr. global_state->file_readers.resize(global_state->expanded_files.size()); - bind.arena->Reset(); return std::move(global_state); }; diff --git a/vortex b/vortex index b948d5f..c0d54aa 160000 --- a/vortex +++ b/vortex @@ -1 +1 @@ -Subproject commit b948d5f0ef49a561e0c0aa50265f19dc5e001840 +Subproject commit c0d54aaf75c3a44ea26fcd760516d1069e277240