diff --git a/Makefile b/Makefile index b23fabf..d53836c 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ EXT_NAME=vortex_duckdb EXT_CONFIG=${PROJ_DIR}extension_config.cmake EXT_FLAGS=-DCMAKE_OSX_DEPLOYMENT_TARGET=12.0 -export OVERRIDE_GIT_DESCRIBE=v1.3.0 +export OVERRIDE_GIT_DESCRIBE=v1.3.1 export MACOSX_DEPLOYMENT_TARGET=12.0 export VCPKG_FEATURE_FLAGS=-binarycaching export VCPKG_OSX_DEPLOYMENT_TARGET=12.0 diff --git a/duckdb b/duckdb index ad12732..082f0c6 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit ad1273222186d28b4b351736ed88101044bbe97b +Subproject commit 082f0c603979b31fead03536047487999ba820bc diff --git a/extension-ci-tools b/extension-ci-tools index 1f00107..8010fb5 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 1f00107ca1eaf1691049907296a8aa796d054e6b +Subproject commit 8010fb516deeafca07b88d2bd109cbe7a528dea5 diff --git a/src/include/vortex_common.hpp b/src/include/vortex_common.hpp index 306571f..67a55e8 100644 --- a/src/include/vortex_common.hpp +++ b/src/include/vortex_common.hpp @@ -11,7 +11,7 @@ namespace vortex { struct DType { - explicit DType(vx_dtype *dtype) : dtype(dtype) { + explicit DType(const vx_dtype *dtype) : dtype(dtype) { } static duckdb::unique_ptr FromDuckDBTable(const std::vector &column_types, @@ -34,57 +34,46 @@ struct DType { } } - vx_dtype *dtype; + const vx_dtype *dtype; }; -struct ConversionCache { - explicit ConversionCache(const unsigned long cache_id) : cache(vx_conversion_cache_create(cache_id)) { +struct VortexFile { + explicit VortexFile(const vx_file *file) : file(file) { } - ~ConversionCache() { - vx_conversion_cache_free(cache); + ~VortexFile() { + vx_file_free(file); } - vx_conversion_cache *cache; -}; - -struct FileReader { - explicit FileReader(vx_file_reader *file) : file(file) { - } - - ~FileReader() { - vx_file_reader_free(file); - } - - static duckdb::unique_ptr Open(const vx_file_open_options *options, VortexSession &session) { + static duckdb::unique_ptr Open(const vx_file_open_options *options, VortexSession &session) { auto file = Try([&](auto err) { return vx_file_open_reader(options, session.session, err); }); - return duckdb::make_uniq(file); + return duckdb::make_uniq(file); } vx_array_iterator *Scan(const vx_file_scan_options *options) { - return Try([&](auto err) { return vx_file_reader_scan(this->file, options, err); }); + return Try([&](auto err) { return vx_file_scan(this->file, options, err); }); } - bool CanPrune(const char *filter_expression, unsigned int filter_expression_len) { + bool CanPrune(const char *filter_expression, unsigned int filter_expression_len, unsigned long file_idx) { return Try([&](auto err) { - return vx_file_reader_can_prune(this->file, filter_expression, filter_expression_len, err); + return vx_file_can_prune(this->file, filter_expression, filter_expression_len, file_idx, err); }); } - uint64_t FileRowCount() { - return Try([&](auto err) { return vx_file_row_count(file, err); }); + uint64_t RowCount() { + return vx_file_row_count(file); } struct DType DType() { - return vortex::DType(vx_file_dtype(file)); + return vortex::DType(vx_dtype_clone(vx_file_dtype(file))); } - vx_file_reader *file; + const vx_file *file; }; struct Array { - explicit Array(vx_array *array) : array(array) { + explicit Array(const vx_array *array) : array(array) { } ~Array() { @@ -101,11 +90,7 @@ struct Array { return duckdb::make_uniq(array); } - idx_t ToDuckDBVector(idx_t current_row, duckdb_data_chunk output, const ConversionCache *cache) const { - return Try([&](auto err) { return vx_array_to_duckdb_chunk(array, current_row, output, cache->cache, err); }); - } - - vx_array *array; + const vx_array *array; }; @@ -125,12 +110,12 @@ struct ArrayIterator { ~ArrayIterator() { if (array_iter) { - vx_array_iter_free(array_iter); + vx_array_iterator_free(array_iter); } } duckdb::unique_ptr NextArray() const { - auto array = Try([&](auto err) { return vx_array_iter_next(array_iter, err); }); + auto array = Try([&](auto err) { return vx_array_iterator_next(array_iter, err); }); if (array == nullptr) { return nullptr; @@ -154,9 +139,7 @@ struct ArrayExporter { } static duckdb::unique_ptr FromArrayIterator(duckdb::unique_ptr array_iter) { - auto exporter = Try([&](auto err) { - return vx_duckdb_exporter_create(array_iter->release(), err); - }); + auto exporter = vx_duckdb_exporter_new(array_iter->release()); return duckdb::make_uniq(exporter); } diff --git a/src/include/vortex_error.hpp b/src/include/vortex_error.hpp index b41565d..4d8e7ae 100644 --- a/src/include/vortex_error.hpp +++ b/src/include/vortex_error.hpp @@ -10,7 +10,8 @@ namespace vortex { inline void HandleError(vx_error *error) { if (error != nullptr) { - auto msg = std::string(vx_error_get_message(error)); + auto msg_str = vx_error_get_message(error); + auto msg = std::string(vx_string_ptr(msg_str), vx_string_len(msg_str)); vx_error_free(error); throw duckdb::InvalidInputException(msg); } diff --git a/src/include/vortex_expr.hpp b/src/include/vortex_expr.hpp index 191c689..a46336c 100644 --- a/src/include/vortex_expr.hpp +++ b/src/include/vortex_expr.hpp @@ -10,14 +10,17 @@ namespace vortex { const std::string BETWEEN_ID = "between"; const std::string BINARY_ID = "binary"; const std::string GET_ITEM_ID = "get_item"; -const std::string IDENTITY_ID = "identity"; +const std::string PACK_ID = "pack"; +const std::string VAR_ID = "var"; const std::string LIKE_ID = "like"; const std::string LITERAL_ID = "literal"; const std::string NOT_ID = "not"; +const std::string LIST_CONTAINS_ID = "list_contains"; vortex::expr::Expr *table_expression_into_expr(google::protobuf::Arena &arena, duckdb::TableFilter &filter, const std::string &column_name); vortex::expr::Expr *expression_into_vortex_expr(google::protobuf::Arena &arena, const duckdb::Expression &expr); vortex::expr::Expr *flatten_exprs(google::protobuf::Arena &arena, const duckdb::vector &child_filters); +vortex::expr::Expr *pack_projection_columns(google::protobuf::Arena &arena, duckdb::vector columns); } // namespace vortex diff --git a/src/include/vortex_session.hpp b/src/include/vortex_session.hpp index cea5c7d..c8ead2e 100644 --- a/src/include/vortex_session.hpp +++ b/src/include/vortex_session.hpp @@ -7,7 +7,7 @@ namespace vortex { class VortexSession : public duckdb::ObjectCacheEntry { public: - VortexSession() : session(vx_session_create()) { + VortexSession() : session(vx_session_new()) { } ~VortexSession() override { diff --git a/src/vortex_expr.cpp b/src/vortex_expr.cpp index 62a4393..bd0874f 100644 --- a/src/vortex_expr.cpp +++ b/src/vortex_expr.cpp @@ -18,7 +18,10 @@ #include "vortex_expr.hpp" +#include "duckdb/planner/filter/in_filter.hpp" + using duckdb::ConjunctionAndFilter; +using duckdb::ConjunctionOrFilter; using duckdb::ConstantFilter; using duckdb::Exception; using duckdb::ExceptionType; @@ -38,6 +41,8 @@ const string VORTEX_DATE_ID = "vortex.date"; const string VORTEX_TIME_ID = "vortex.time"; const string VORTEX_TIMESTAMP_ID = "vortex.timestamp"; +const string VX_ROW_ID_COL_ID = "$vx.row_id"; + const string DUCKDB_FUNCTION_NAME_CONTAINS = "contains"; enum TimeUnit : uint8_t { @@ -194,50 +199,48 @@ scalar::Scalar *into_null_scalar(Arena &arena, LogicalType &logical_type) { return scalar; } -scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, bool nullable) { - auto scalar = Arena::Create(&arena); - auto dtype = into_vortex_dtype(arena, value.type(), nullable); - scalar->set_allocated_dtype(dtype); +scalar::ScalarValue *into_vortex_scalar_value(Arena &arena, const Value &value) { + auto scalar = Arena::Create(&arena); switch (value.type().id()) { case LogicalTypeId::INVALID: case LogicalTypeId::SQLNULL: { - scalar->mutable_value()->set_null_value(google::protobuf::NULL_VALUE); + scalar->set_null_value(google::protobuf::NULL_VALUE); return scalar; } case LogicalTypeId::BOOLEAN: { - scalar->mutable_value()->set_bool_value(value.GetValue()); + scalar->set_bool_value(value.GetValue()); return scalar; } case LogicalTypeId::TINYINT: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::SMALLINT: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::INTEGER: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::BIGINT: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::UTINYINT: - scalar->mutable_value()->set_uint64_value(value.GetValue()); + scalar->set_uint64_value(value.GetValue()); return scalar; case LogicalTypeId::USMALLINT: - scalar->mutable_value()->set_uint64_value(value.GetValue()); + scalar->set_uint64_value(value.GetValue()); return scalar; case LogicalTypeId::UINTEGER: - scalar->mutable_value()->set_uint64_value(value.GetValue()); + scalar->set_uint64_value(value.GetValue()); return scalar; case LogicalTypeId::UBIGINT: - scalar->mutable_value()->set_uint64_value(value.GetValue()); + scalar->set_uint64_value(value.GetValue()); return scalar; case LogicalTypeId::FLOAT: - scalar->mutable_value()->set_f32_value(value.GetValue()); + scalar->set_f32_value(value.GetValue()); return scalar; case LogicalTypeId::DOUBLE: - scalar->mutable_value()->set_f64_value(value.GetValue()); + scalar->set_f64_value(value.GetValue()); return scalar; case LogicalTypeId::DECIMAL: { auto huge = value.GetValue(); @@ -246,44 +249,59 @@ scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, bool nullab out[1] = static_cast(huge >> 32); out[2] = static_cast(huge >> 64); out[3] = static_cast(huge >> 96); - scalar->mutable_value()->set_bytes_value(std::string(reinterpret_cast(out), 8)); + scalar->set_bytes_value(std::string(reinterpret_cast(out), 8)); return scalar; } case LogicalTypeId::VARCHAR: - scalar->mutable_value()->set_string_value(value.GetValue()); + scalar->set_string_value(value.GetValue()); return scalar; case LogicalTypeId::DATE: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::TIME: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::TIMESTAMP_SEC: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::TIMESTAMP_MS: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::TIMESTAMP: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; case LogicalTypeId::TIMESTAMP_NS: - scalar->mutable_value()->set_int64_value(value.GetValue()); + scalar->set_int64_value(value.GetValue()); return scalar; default: throw Exception(ExceptionType::NOT_IMPLEMENTED, "into_vortex_scalar", {{"id", value.ToString()}}); } } +scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, bool nullable) { + auto scalar = Arena::Create(&arena); + auto dtype = into_vortex_dtype(arena, value.type(), nullable); + scalar->set_allocated_dtype(dtype); + + auto scalar_value = into_vortex_scalar_value(arena, value); + scalar->mutable_value()->Swap(scalar_value); + return scalar; +} + void set_column(const string &s, expr::Expr *column) { + column->set_id(GET_ITEM_ID); auto kind = column->mutable_kind(); auto get_item = kind->mutable_get_item(); get_item->mutable_path()->assign(s); auto id = column->add_children(); - id->mutable_kind()->mutable_identity(); - id->set_id(IDENTITY_ID); + id->set_id(VAR_ID); + if (s == "file_row_number" || s == "file_index") { + id->mutable_kind()->mutable_var()->set_var(VX_ROW_ID_COL_ID); + } else { + id->mutable_kind()->mutable_var()->set_var(""); + } } void set_literal(Arena &arena, const Value &value, bool nullable, expr::Expr *constant) { @@ -294,7 +312,8 @@ void set_literal(Arena &arena, const Value &value, bool nullable, expr::Expr *co } expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector> &child_filters, - const string &column_name) { + expr::Kind_BinaryOp operation, const string &column_name) { + D_ASSERT(!child_filters.empty()); if (child_filters.size() == 1) { @@ -305,11 +324,11 @@ expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector(nullptr); auto hd = Arena::Create(&arena); - // Flatten the list of children into a linked list of AND values. + // Flatten the list of children into a linked list of operation values. for (size_t i = 0; i < child_filters.size() - 1; i++) { expr::Expr *new_and = !tail ? hd : tail->add_children(); new_and->set_id(BINARY_ID); - new_and->mutable_kind()->set_binary_op(expr::Kind::And); + new_and->mutable_kind()->set_binary_op(operation); new_and->add_children()->Swap(table_expression_into_expr(arena, *child_filters[i], column_name)); tail = new_and; @@ -443,6 +462,20 @@ expr::Expr *expression_into_vortex_expr(Arena &arena, const duckdb::Expression & } } +void set_list_element(Arena &arena, expr::Expr *list, duckdb::vector &values) { + list->set_id(LITERAL_ID); + auto ll = list->mutable_kind()->mutable_literal(); + auto scalar = ll->mutable_value(); + auto elem_type = into_vortex_dtype(arena, values[0].GetTypeMutable(), true); + auto list_type = scalar->mutable_dtype()->mutable_list(); + list_type->mutable_element_type()->Swap(elem_type); + list_type->set_nullable(true); + auto list_scalar_value = scalar->mutable_value()->mutable_list_value(); + for (auto &elem : values) { + list_scalar_value->mutable_values()->Add(std::move(*into_vortex_scalar_value(arena, elem))); + } +} + expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter, const string &column_name) { auto expr = Arena::Create(&arena); switch (filter.filter_type) { @@ -457,27 +490,59 @@ expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter, const expr->set_id(BINARY_ID); return expr; } + case TableFilterType::CONJUNCTION_OR: { + auto &disjuncts = filter.Cast(); + return flatten_table_filters(arena, disjuncts.child_filters, expr::Kind_BinaryOp_Or, column_name); + } case TableFilterType::CONJUNCTION_AND: { auto &conjucts = filter.Cast(); - return flatten_table_filters(arena, conjucts.child_filters, column_name); + return flatten_table_filters(arena, conjucts.child_filters, expr::Kind_BinaryOp_And, column_name); } case TableFilterType::IS_NULL: case TableFilterType::IS_NOT_NULL: { throw Exception(ExceptionType::NOT_IMPLEMENTED, "null checks"); } case TableFilterType::OPTIONAL_FILTER: { + auto *expr_o = + table_expression_into_expr(arena, *filter.Cast().child_filter, column_name); + if (expr_o != nullptr) { + return expr_o; + } expr->set_id(LITERAL_ID); auto lit = expr->mutable_kind()->mutable_literal(); lit->mutable_value()->mutable_value()->set_bool_value(true); lit->mutable_value()->mutable_dtype()->mutable_bool_()->set_nullable(false); return expr; } + case TableFilterType::IN_FILTER: { + auto &in_list_filter = filter.Cast(); + expr->set_id(LIST_CONTAINS_ID); + expr->mutable_kind()->mutable_list_contains(); + auto list = expr->add_children(); + set_list_element(arena, list, in_list_filter.values); + set_column(column_name, expr->add_children()); + return expr; + } default: break; } + std::cout << "table expr: " << std::to_string(static_cast(filter.filter_type)) << filter.DebugToString() + << std::endl; throw Exception(ExceptionType::NOT_IMPLEMENTED, "table_expression_into_expr", {{"filter_type_id", std::to_string(static_cast(filter.filter_type))}}); } +vortex::expr::Expr *pack_projection_columns(google::protobuf::Arena &arena, duckdb::vector columns) { + auto expr = arena.Create(&arena); + expr->set_id(PACK_ID); + auto pack_paths = expr->mutable_kind()->mutable_pack()->mutable_paths(); + for (auto &columnn : columns) { + set_column(columnn, expr->add_children()); + pack_paths->Add(std::string(columnn)); + } + + return expr; +} + } // namespace vortex diff --git a/src/vortex_scan.cpp b/src/vortex_scan.cpp index f55a722..8eae203 100644 --- a/src/vortex_scan.cpp +++ b/src/vortex_scan.cpp @@ -22,6 +22,7 @@ #include "vortex_common.hpp" #include "vortex_expr.hpp" #include "vortex_session.hpp" +#include "duckdb/common/multi_file/multi_file_reader.hpp" #include "duckdb/function/table/table_scan.hpp" #include "duckdb/planner/filter/dynamic_filter.hpp" #include "duckdb/planner/filter/optional_filter.hpp" @@ -30,6 +31,9 @@ using namespace duckdb; namespace vortex { +static constexpr column_t _COLUMN_IDENTIFIER_FILE_ROW_NUMBER = UINT64_C(9223372036854775809); +static constexpr column_t _COLUMN_IDENTIFIER_FILE_INDEX = UINT64_C(9223372036854775810); + /// Bind data for the Vortex table function that holds information about the /// file and its schema. This data is populated during the bind phase, which /// happens during the query planning phase. @@ -40,10 +44,11 @@ struct ScanBindData : public TableFunctionData { shared_ptr file_list; vector columns_types; vector column_names; + map virtual_col; // Used to read the schema during the bind phase and cached here to // avoid having to open the same file again during the scan phase. - shared_ptr initial_file; + shared_ptr initial_file; // Used to create an arena for protobuf exprs, need a ptr since the bind arg is const. unique_ptr arena; @@ -57,13 +62,23 @@ struct ScanBindData : public TableFunctionData { unique_ptr Copy() const override { auto result = make_uniq(); + result->arena = make_uniq(); result->session = session; result->file_list = file_list; result->columns_types = columns_types; result->column_names = column_names; result->initial_file = initial_file; + result->virtual_col = virtual_col; return std::move(result); } + + std::string &ColumnName(column_t col) { + if (col < column_names.size()) { + return column_names[col]; + } + + return virtual_col[col]; + } }; struct ScanPartition { @@ -112,13 +127,13 @@ struct ScanGlobalState : public GlobalTableFunctionState { // Multi producer, multi consumer lockfree queue. duckdb_moodycamel::ConcurrentQueue scan_partitions {8192}; - std::vector> file_readers; + std::vector> files; // The column idx that must be returned by the scan. vector column_ids; vector projection_ids; // The precomputed column names used in the query. - std::vector projected_column_names; + std::string projection; // This is the max number threads that the extension might use. idx_t MaxThreads() const override { @@ -159,7 +174,7 @@ struct ScanGlobalState : public GlobalTableFunctionState { }; // Use to create vortex expressions from `TableFilterSet` filter. -void ExtractFilterExpression(google::protobuf::Arena &arena, vector column_names, +void ExtractFilterExpression(google::protobuf::Arena &arena, ScanBindData &data, optional_ptr filter_set, vector column_ids, vector &conjuncts, map> &dyn_filters) { if (filter_set == nullptr) { @@ -167,7 +182,7 @@ void ExtractFilterExpression(google::protobuf::Arena &arena, vector } for (const auto &[col_id, value] : filter_set->filters) { - auto column_name = column_names[column_ids[col_id]]; + auto column_name = data.ColumnName(column_ids[col_id]); // Extract the optional dynamic filter, this seems like the only way that // duckdb will use dynamic filters. @@ -175,39 +190,37 @@ void ExtractFilterExpression(google::protobuf::Arena &arena, vector auto &opt_filter = value->Cast().child_filter; if (opt_filter->filter_type == TableFilterType::DYNAMIC_FILTER) { dyn_filters.emplace(column_name, opt_filter->Cast().filter_data); + continue; } - } else { - auto conj = table_expression_into_expr(arena, *value, column_name); - conjuncts.push_back(conj); } + auto conj = table_expression_into_expr(arena, *value, column_name); + conjuncts.push_back(conj); } } -static void PopulateProjection(ScanGlobalState &global_state, const vector &column_names, - TableFunctionInitInput &input) { +static void PopulateProjection(ScanBindData &bind_data, ScanGlobalState &global_state, TableFunctionInitInput &input) { global_state.projection_ids.reserve(input.projection_ids.size()); for (auto proj_id : input.projection_ids) { global_state.projection_ids.push_back(input.column_ids[proj_id]); } - global_state.projected_column_names.reserve(input.projection_ids.size()); + auto vec = duckdb::vector(); for (auto column_id : global_state.projection_ids) { - assert(column_id < column_names.size()); - global_state.projected_column_names.push_back(column_names[column_id].c_str()); + vec.push_back(bind_data.ColumnName(column_id)); } + auto expr = pack_projection_columns(*bind_data.arena, vec); + global_state.projection = expr->SerializeAsString(); } /// Extracts schema information from a Vortex file's data type. static void ExtractVortexSchema(DType &file_dtype, vector &column_types, vector &column_names) { - uint32_t field_count = vx_dtype_field_count(file_dtype.dtype); + auto struct_dtype = vx_dtype_struct_dtype(file_dtype.dtype); + uint32_t field_count = vx_struct_fields_nfields(struct_dtype); for (uint32_t idx = 0; idx < field_count; idx++) { - char name_buffer[512]; - int name_len = 0; - - vx_dtype_field_name(file_dtype.dtype, idx, name_buffer, &name_len); - std::string field_name(name_buffer, name_len); + auto vx_field_name = vx_struct_fields_field_name(struct_dtype, idx); + std::string field_name(vx_string_ptr(vx_field_name), vx_string_len(vx_field_name)); - vx_dtype *field_dtype = vx_dtype_field_dtype(file_dtype.dtype, idx); + const vx_dtype *field_dtype = vx_struct_fields_field_dtype(struct_dtype, idx); auto duckdb_type = Try([&](auto err) { return vx_dtype_to_duckdb_logical_type(field_dtype, err); }); column_names.push_back(field_name); @@ -233,20 +246,20 @@ std::string EnsureFileProtocol(FileSystem &fs, const std::string &path) { return prefix + absolute_path; } -static unique_ptr OpenFile(const std::string &filename, VortexSession &session, +static unique_ptr OpenFile(const std::string &filename, VortexSession &session, vector &column_types, vector &column_names) { vx_file_open_options options { .uri = filename.c_str(), .property_keys = nullptr, .property_vals = nullptr, .property_len = 0}; - auto file = FileReader::Open(&options, session); + auto file = VortexFile::Open(&options, session); if (!file) { throw IOException("Failed to open Vortex file: " + filename); } // This pointer is owned by the file. auto file_dtype = file->DType(); - if (vx_dtype_get(file_dtype.dtype) != DTYPE_STRUCT) { - vx_file_reader_free(file->file); + if (vx_dtype_get_variant(file_dtype.dtype) != DTYPE_STRUCT) { + vx_file_free(file->file); throw FatalException("Vortex file does not contain a struct array as a top-level dtype"); } @@ -276,7 +289,7 @@ static void VerifyNewFile(const ScanBindData &bind_data, vector &co } } -static unique_ptr OpenFileAndVerify(FileSystem &fs, VortexSession &session, const std::string &filename, +static unique_ptr OpenFileAndVerify(FileSystem &fs, VortexSession &session, const std::string &filename, const ScanBindData &bind_data) { auto new_column_names = vector(); new_column_names.reserve(bind_data.column_names.size()); @@ -299,14 +312,15 @@ static bool PinFileToThread(ScanGlobalState &global_state) { } static void CreateScanPartitions(ClientContext &context, const ScanBindData &bind, ScanGlobalState &global_state, - ScanLocalState &local_state, uint64_t file_idx, FileReader &file_reader) { + ScanLocalState &local_state, uint64_t file_idx, VortexFile &file) { auto filter_str = global_state.filter_expression_string(*bind.arena); - if (global_state.file_readers[file_idx]->CanPrune(filter_str.data(), static_cast(filter_str.length()))) { + if (global_state.files[file_idx]->CanPrune(filter_str.data(), static_cast(filter_str.length()), + file_idx)) { global_state.files_partitioned += 1; return; } - const auto row_count = Try([&](auto err) { return vx_file_row_count(file_reader.file, err); }); + const auto row_count = vx_file_row_count(file.file); const auto thread_count = std::thread::hardware_concurrency(); const auto file_count = global_state.expanded_files.size(); @@ -345,26 +359,27 @@ static void CreateScanPartitions(ClientContext &context, const ScanBindData &bin } static unique_ptr OpenArrayIter(const ScanBindData &bind, ScanGlobalState &global_state, - shared_ptr &file_reader, ScanPartition row_range_partition) { + shared_ptr &file, ScanPartition row_range_partition) { auto filter_str = global_state.filter_expression_string(*bind.arena); - const auto options = vx_file_scan_options { - .projection = global_state.projected_column_names.data(), - .projection_len = static_cast(global_state.projected_column_names.size()), - .filter_expression = filter_str.data(), - .filter_expression_len = static_cast(filter_str.length()), - .split_by_row_count = 0, - .row_range_start = row_range_partition.start_row, - .row_range_end = row_range_partition.end_row, - }; - - return make_uniq(file_reader->Scan(&options)); + const auto options = + vx_file_scan_options {.projection_expression = global_state.projection.data(), + .projection_expr_len = static_cast(global_state.projection.length()), + .filter_expression = filter_str.data(), + .filter_expression_len = static_cast(filter_str.length()), + .split_by_row_count = 0, + .row_range_start = row_range_partition.start_row, + .row_range_end = row_range_partition.end_row, + .file_index = row_range_partition.file_idx}; + + return make_uniq(file->Scan(&options)); } // Assigns the next array exporter. // // Returns true if a new exporter was assigned, false otherwise. -static bool GetNextExporter(ClientContext &context, const ScanBindData &bind_data, ScanGlobalState &global_state, ScanLocalState &local_state) { +static bool GetNextExporter(ClientContext &context, const ScanBindData &bind_data, ScanGlobalState &global_state, + ScanLocalState &local_state) { // Try to deque a partition off the thread local queue. auto try_dequeue = [&](ScanPartition &scan_partition) { @@ -400,8 +415,8 @@ static bool GetNextExporter(ClientContext &context, const ScanBindData &bind_dat // Layout readers are safe to share across threads for reading. Further, they // are created before pushing partitions of the corresponing files into a queue. - auto file_reader = global_state.file_readers[partition.file_idx]; - auto array_iter = OpenArrayIter(bind_data, global_state, file_reader, partition); + auto file = global_state.files[partition.file_idx]; + auto array_iter = OpenArrayIter(bind_data, global_state, file, partition); local_state.array_exporter = ArrayExporter::FromArrayIterator(std::move(array_iter)); } @@ -430,23 +445,23 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, if (!GetNextExporter(context, bind_data, global_state, local_state)) { // Free file readers when owned by the thread. if (local_state.scan_partitions.empty() && local_state.thread_local_file_idx.has_value()) { - global_state.file_readers[local_state.thread_local_file_idx.value()] = nullptr; + global_state.files[local_state.thread_local_file_idx.value()] = nullptr; local_state.thread_local_file_idx.reset(); } // Create new scan partitions in case the queue is empty. if (auto file_idx = global_state.next_file_idx.fetch_add(1); - file_idx < global_state.expanded_files.size()) { + file_idx < global_state.expanded_files.size()) { if (file_idx == 0) { - global_state.file_readers[0] = bind_data.initial_file; + global_state.files[0] = bind_data.initial_file; } else { auto file_name = global_state.expanded_files[file_idx]; - global_state.file_readers[file_idx] = - OpenFileAndVerify(FileSystem::GetFileSystem(context), *bind_data.session, file_name, bind_data); + global_state.files[file_idx] = OpenFileAndVerify(FileSystem::GetFileSystem(context), + *bind_data.session, file_name, bind_data); } CreateScanPartitions(context, bind_data, global_state, local_state, file_idx, - *global_state.file_readers[file_idx]); + *global_state.files[file_idx]); } } continue; @@ -493,7 +508,7 @@ static unique_ptr VortexBind(ClientContext &context, TableFunction unique_ptr VortexCardinality(ClientContext &context, const FunctionData *bind_data) { auto &data = bind_data->Cast(); - auto row_count = data.initial_file->FileRowCount(); + auto row_count = data.initial_file->RowCount(); if (data.file_list->GetTotalFileCount() == 1) { return make_uniq(row_count, row_count); } else { @@ -542,11 +557,12 @@ void RegisterScanFunction(DatabaseInstance &instance) { global_state->filter = input.filters; global_state->column_ids = input.column_ids; - PopulateProjection(*global_state, bind.column_names, input); + PopulateProjection(bind, *global_state, input); // Most expressions are extracted from `PushdownComplexFilter`, the final filters come from `input.filters`. - ExtractFilterExpression(*bind.arena, bind.column_names, input.filters, input.column_ids, bind.conjuncts, + ExtractFilterExpression(*bind.arena, bind, input.filters, input.column_ids, bind.conjuncts, global_state->dynamic_filters); + // Create the static filter expression global_state->static_filter_expr = flatten_exprs(*bind.arena, bind.conjuncts); if (global_state->static_filter_expr != nullptr) { @@ -554,7 +570,7 @@ void RegisterScanFunction(DatabaseInstance &instance) { } // Resizing the empty vector default constructs std::shared pointers at all indices with nullptr. - global_state->file_readers.resize(global_state->expanded_files.size()); + global_state->files.resize(global_state->expanded_files.size()); return std::move(global_state); }; @@ -575,6 +591,29 @@ void RegisterScanFunction(DatabaseInstance &instance) { vortex_scan.cardinality = VortexCardinality; vortex_scan.filter_pushdown = true; vortex_scan.filter_prune = true; + vortex_scan.late_materialization = true; + + vortex_scan.get_row_id_columns = [](ClientContext &context, + optional_ptr bind_data) -> vector { + vector result; + result.emplace_back(_COLUMN_IDENTIFIER_FILE_ROW_NUMBER); + result.emplace_back(_COLUMN_IDENTIFIER_FILE_INDEX); + return result; + }; + vortex_scan.get_virtual_columns = [](ClientContext &context, + optional_ptr bind_data) -> virtual_column_map_t { + auto &scan_bind_data = bind_data->Cast(); + virtual_column_map_t result; + + result.insert( + make_pair(_COLUMN_IDENTIFIER_FILE_ROW_NUMBER, TableColumn("file_row_number", LogicalType::UBIGINT))); + result.insert(make_pair(_COLUMN_IDENTIFIER_FILE_INDEX, TableColumn("file_index", LogicalType::UBIGINT))); + + scan_bind_data.virtual_col[_COLUMN_IDENTIFIER_FILE_ROW_NUMBER] = "file_row_number"; + scan_bind_data.virtual_col[_COLUMN_IDENTIFIER_FILE_INDEX] = "file_index"; + + return result; + }; ExtensionUtil::RegisterFunction(instance, vortex_scan); } diff --git a/vortex b/vortex index c0d54aa..eb872e3 160000 --- a/vortex +++ b/vortex @@ -1 +1 @@ -Subproject commit c0d54aaf75c3a44ea26fcd760516d1069e277240 +Subproject commit eb872e3da140051b4929267b1b72b548cd3bb6a8