From eff35b8dc6a2c572ca01d40cf03d894a2960a308 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 28 May 2025 09:37:01 +0100 Subject: [PATCH 1/5] update to v0.36.0 --- .gitignore | 7 +- CMakeLists.txt | 11 +- Makefile | 20 +- duckdb | 2 +- extension_config.cmake | 5 +- src/include/expr/expr.hpp | 14 - src/include/vortex_common.hpp | 145 +++++-- src/include/vortex_error.hpp | 22 ++ src/include/vortex_extension.hpp | 8 +- src/include/vortex_scan.hpp | 9 +- src/include/vortex_write.hpp | 6 +- src/{expr/expr.cpp => vortex_expr.cpp} | 127 ++++--- src/vortex_extension.cpp | 20 +- src/vortex_scan.cpp | 499 +++++++++++++++---------- src/vortex_write.cpp | 80 ++-- test/CMakeLists.txt | 2 +- test/sql/nulls.test | 40 +- test/sql/table.test | 4 +- test/sql/table.test_slow | 4 +- test/src/expr.cpp | 30 -- vortex | 2 +- 21 files changed, 626 insertions(+), 431 deletions(-) delete mode 100644 src/include/expr/expr.hpp rename src/{expr/expr.cpp => vortex_expr.cpp} (77%) delete mode 100644 test/src/expr.cpp diff --git a/.gitignore b/.gitignore index 6f2e2ec..95da29d 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,9 @@ testext test/python/__pycache__/ .Rhistory compile_commands.json +corrosion +CMakeFiles -#proto gen folder -gen/ \ No newline at end of file +# Keep CMAKE + +!CMakeLists.txt diff --git a/CMakeLists.txt b/CMakeLists.txt index a136c1e..5f95281 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,9 @@ project(${TARGET_NAME}_project) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_STANDARD 17) +# Allow C++20 designator syntax in C++17 +add_compile_options(-Wno-c++20-designator) + include(FetchContent) FetchContent_Declare( Corrosion @@ -17,19 +20,20 @@ FetchContent_MakeAvailable(Corrosion) find_package(Catch2 CONFIG REQUIRED) find_package(Protobuf CONFIG REQUIRED) + if (APPLE) find_library(SECURITY_FRAMEWORK Security) endif () corrosion_import_crate(MANIFEST_PATH vortex/Cargo.toml CRATES vortex-ffi - FEATURES duckdb + FEATURES duckdb mimalloc CRATE_TYPES staticlib FLAGS --crate-type=staticlib ) set(EXTENSION_NAME ${TARGET_NAME}_extension) -set(EXTENSION_SOURCES src/vortex_extension.cpp src/expr/expr.cpp src/vortex_write.cpp src/vortex_scan.cpp) +set(EXTENSION_SOURCES src/vortex_extension.cpp src/vortex_expr.cpp src/vortex_write.cpp src/vortex_scan.cpp) set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension) # Generate C++ code from .proto files. @@ -41,7 +45,8 @@ protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${PROTO_FILES} PROTOC_OUT_DIR ${PROT include_directories(src/include ${PROTO_GEN_DIR} vortex/vortex-ffi/cinclude) build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES} ${PROTO_SRCS}) -build_loadable_extension(${TARGET_NAME} ${EXTENSION_SOURCES} ${PROTO_SRCS}) +set(PARAMETERS "-warnings") +build_loadable_extension(${TARGET_NAME} ${PARAMETERS} ${EXTENSION_SOURCES} ${PROTO_SRCS}) target_link_libraries(${EXTENSION_NAME} vortex_ffi-static diff --git a/Makefile b/Makefile index d8b80f9..82abf20 100644 --- a/Makefile +++ b/Makefile @@ -2,11 +2,27 @@ PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) EXT_NAME=vortex_duckdb EXT_CONFIG=${PROJ_DIR}extension_config.cmake -EXT_FLAGS=-DCMAKE_OSX_DEPLOYMENT_TARGET=12.0 -DOVERRIDE_GIT_DESCRIBE=v1.2.2 +EXT_FLAGS=-DCMAKE_OSX_DEPLOYMENT_TARGET=12.0 export MACOSX_DEPLOYMENT_TARGET=12.0 -export VCPKG_OSX_DEPLOYMENT_TARGET=12.0 + +# The version of DuckDB and its Vortex extension is either implicitly set by Git tag, e.g. v1.2.2, or commit +# SHA if the current commit does not have a tag. The implicitly set version can be overridden by defining the +# `OVERRIDE_GIT_DESCRIBE` environment variable. In context of the DuckDB community extension build, we have to +# rely on the Git tag, as DuckDB's CI performs a checkout by Git tag. Therefore, the version can't be explicitly +# set via environment variable for the community extension build. + +export OVERRIDE_GIT_DESCRIBE=v1.3.0 export VCPKG_FEATURE_FLAGS=-binarycaching +export VCPKG_OSX_DEPLOYMENT_TARGET=12.0 export VCPKG_TOOLCHAIN_PATH := ${PROJ_DIR}vcpkg/scripts/buildsystems/vcpkg.cmake +export BUILD_MAIN_DUCKDB_LIBRARY=0 +export DISABLE_BUILTIN_EXTENSIONS=1 + +# This is not needed on macOS, we don't see a tls error on load there. +ifeq ($(shell uname), Linux) + export CFLAGS=-ftls-model=global-dynamic +endif + include extension-ci-tools/makefiles/duckdb_extension.Makefile diff --git a/duckdb b/duckdb index 7c0cc5d..71c5c07 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 7c0cc5d4943dd4fe2176a43818f7dfcc9a541b91 +Subproject commit 71c5c07cdd295e9409c0505885033ae9eb6b5ddd diff --git a/extension_config.cmake b/extension_config.cmake index 63d03a5..0a32fd1 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -4,7 +4,4 @@ duckdb_extension_load(vortex SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} LOAD_TESTS -) - -# Any extra extensions that should be built -# e.g.: duckdb_extension_load(json) +) \ No newline at end of file diff --git a/src/include/expr/expr.hpp b/src/include/expr/expr.hpp deleted file mode 100644 index 47dab3b..0000000 --- a/src/include/expr/expr.hpp +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include "expr.pb.h" -#include "duckdb/planner/expression.hpp" - -#include - -vortex::expr::Expr *table_expression_into_expr(google::protobuf::Arena &arena, duckdb::TableFilter &filter, - const std::string &column_name); - -vortex::expr::Expr *expression_into_vortex_expr(google::protobuf::Arena &arena, const duckdb::Expression &expr); - -vortex::expr::Expr *flatten_exprs(google::protobuf::Arena &arena, - const duckdb::vector &child_filters); \ No newline at end of file diff --git a/src/include/vortex_common.hpp b/src/include/vortex_common.hpp index d817401..4910527 100644 --- a/src/include/vortex_common.hpp +++ b/src/include/vortex_common.hpp @@ -1,77 +1,162 @@ #pragma once +#define ENABLE_DUCKDB_FFI #include "duckdb.hpp" +#include "duckdb/common/unique_ptr.hpp" + #include "vortex.hpp" #include "vortex_error.hpp" +#include "vortex_session.hpp" + +namespace vortex { + +struct DType { + explicit DType(vx_dtype *dtype) : dtype(dtype) { + } + + static duckdb::unique_ptr FromDuckDBTable(const std::vector &column_types, + const std::vector &column_nullable, + const std::vector &column_names) { + D_ASSERT(column_names.size() == column_nullable.size()); + D_ASSERT(column_names.size() == column_types.size()); -#include + auto dtype = Try([&](auto err) { + return vx_duckdb_logical_type_to_dtype(column_types.data(), column_nullable.data(), column_names.data(), + column_names.size(), err); + }); -struct VortexConversionCache { - explicit VortexConversionCache(const unsigned long cache_id) : cache(vx_conversion_cache_create(cache_id)) { + return duckdb::make_uniq(dtype); } - ~VortexConversionCache() { + ~DType() { + if (dtype != nullptr) { + vx_dtype_free(dtype); + } + } + + vx_dtype *dtype; +}; + +struct ConversionCache { + explicit ConversionCache(const unsigned long cache_id) : cache(vx_conversion_cache_create(cache_id)) { + } + + ~ConversionCache() { vx_conversion_cache_free(cache); } vx_conversion_cache *cache; }; -struct VortexFileReader { - explicit VortexFileReader(vx_file_reader *file) : file(file) { +struct FileReader { + explicit FileReader(vx_file_reader *file) : file(file) { } - ~VortexFileReader() { + ~FileReader() { vx_file_reader_free(file); } - static duckdb::unique_ptr Open(const vx_file_open_options *options) { - vx_error *error; - auto file = vx_file_open_reader(options, &error); - HandleError(error); - return duckdb::make_uniq(file); + static duckdb::unique_ptr Open(const vx_file_open_options *options, VortexSession &session) { + auto file = Try([&](auto err) { return vx_file_open_reader(options, session.session, err); }); + return duckdb::make_uniq(file); + } + + vx_array_iterator *Scan(const vx_file_scan_options *options) { + return Try([&](auto err) { return vx_file_reader_scan(this->file, options, err); }); + } + + bool CanPrune(const char *filter_expression, unsigned int filter_expression_len) { + return Try([&](auto err) { + return vx_file_reader_can_prune(this->file, filter_expression, filter_expression_len, err); + }); + } + + uint64_t FileRowCount() { + return Try([&](auto err) { return vx_file_row_count(file, err); }); + } + + struct DType DType() { + return vortex::DType(vx_file_dtype(file)); } vx_file_reader *file; }; -struct VortexArray { - explicit VortexArray(vx_array *array) : array(array) { +struct Array { + explicit Array(vx_array *array) : array(array) { } - ~VortexArray() { + ~Array() { if (array != nullptr) { vx_array_free(array); } } - idx_t ToDuckDBVector(idx_t current_row, duckdb_data_chunk output, const VortexConversionCache *cache) const { - vx_error *error; - auto idx = vx_array_to_duckdb_chunk(array, current_row, output, cache->cache, &error); - HandleError(error); - return idx; + static duckdb::unique_ptr FromDuckDBChunk(DType &dtype, duckdb::DataChunk &chunk) { + auto array = Try([&](auto err) { + return vx_duckdb_chunk_to_array(reinterpret_cast(&chunk), dtype.dtype, err); + }); + + return duckdb::make_uniq(array); + } + + idx_t ToDuckDBVector(idx_t current_row, duckdb_data_chunk output, const ConversionCache *cache) const { + return Try([&](auto err) { return vx_array_to_duckdb_chunk(array, current_row, output, cache->cache, err); }); } vx_array *array; }; -struct VortexArrayStream { - explicit VortexArrayStream(vx_array_stream *array_stream) : array_stream(array_stream) { +struct ArrayIterator { + explicit ArrayIterator(vx_array_iterator *array_iter) : array_iter(array_iter) { } - ~VortexArrayStream() { - vx_array_stream_free(array_stream); + ~ArrayIterator() { + vx_array_iter_free(array_iter); } - duckdb::unique_ptr NextArray() const { - vx_error *error; - auto array = vx_array_stream_next(array_stream, &error); - HandleError(error); + duckdb::unique_ptr NextArray() const { + auto array = Try([&](auto err) { return vx_array_iter_next(array_iter, err); }); + if (array == nullptr) { return nullptr; } - return duckdb::make_uniq(array); + + return duckdb::make_uniq(array); } - vx_array_stream *array_stream; + vx_array_iterator *array_iter; }; + +struct ArrayStreamSink { + explicit ArrayStreamSink(vx_array_sink *sink, duckdb::unique_ptr dtype) + : sink(sink), dtype(std::move(dtype)) { + } + + static duckdb::unique_ptr Create(std::string file_path, duckdb::unique_ptr &&dtype) { + auto sink = Try([&](auto err) { return vx_array_sink_open_file(file_path.c_str(), dtype->dtype, err); }); + return duckdb::make_uniq(sink, std::move(dtype)); + } + + void PushChunk(duckdb::DataChunk &chunk) { + auto array = Array::FromDuckDBChunk(*dtype, chunk); + Try([&](auto err) { vx_array_sink_push(sink, array->array, err); }); + } + + void Close() { + Try([&](auto err) { vx_array_sink_close(sink, err); }); + this->sink = nullptr; + } + + ~ArrayStreamSink() { + // "should dctor a sink, before closing it + // If you throw during writes then the stack will be unwound and the destructor is going to be called before the + // close method is invoked thus triggering following assertion failure and will clobber the exception printing + // D_ASSERT(sink == nullptr); + } + + vx_array_sink *sink; + duckdb::unique_ptr dtype; +}; + +} // namespace vortex diff --git a/src/include/vortex_error.hpp b/src/include/vortex_error.hpp index 9600285..b41565d 100644 --- a/src/include/vortex_error.hpp +++ b/src/include/vortex_error.hpp @@ -1,7 +1,13 @@ #pragma once +#include +#include + +#include "duckdb.hpp" #include "vortex.hpp" +namespace vortex { + inline void HandleError(vx_error *error) { if (error != nullptr) { auto msg = std::string(vx_error_get_message(error)); @@ -9,3 +15,19 @@ inline void HandleError(vx_error *error) { throw duckdb::InvalidInputException(msg); } } + +template +auto Try(Func func) { + vx_error *error = nullptr; + // Handle both void and non-void return types. + if constexpr (std::is_void_v>) { + func(&error); + HandleError(error); + } else { + auto result = func(&error); + HandleError(error); + return result; + } +} + +} // namespace vortex diff --git a/src/include/vortex_extension.hpp b/src/include/vortex_extension.hpp index eade158..fc34e42 100644 --- a/src/include/vortex_extension.hpp +++ b/src/include/vortex_extension.hpp @@ -2,13 +2,11 @@ #include "duckdb.hpp" -namespace duckdb { +// The entry point class API can't be scoped to the vortex namespace. -class VortexExtension : public Extension { +class VortexExtension : public duckdb::Extension { public: - void Load(DuckDB &db) override; + void Load(duckdb::DuckDB &db) override; std::string Name() override; std::string Version() const override; }; - -} // namespace duckdb diff --git a/src/include/vortex_scan.hpp b/src/include/vortex_scan.hpp index 809b40f..66383ba 100644 --- a/src/include/vortex_scan.hpp +++ b/src/include/vortex_scan.hpp @@ -2,9 +2,6 @@ #include "duckdb/main/extension_util.hpp" - -namespace duckdb { - -void RegisterVortexScanFunction(DatabaseInstance &instance); - -} \ No newline at end of file +namespace vortex { +void RegisterScanFunction(duckdb::DatabaseInstance &instance); +} diff --git a/src/include/vortex_write.hpp b/src/include/vortex_write.hpp index 5e0b162..c8a0503 100644 --- a/src/include/vortex_write.hpp +++ b/src/include/vortex_write.hpp @@ -2,6 +2,6 @@ #include "duckdb/main/extension_util.hpp" -namespace duckdb { - void RegisterVortexWriteFunction(duckdb::DatabaseInstance &instance); -} \ No newline at end of file +namespace vortex { +void RegisterWriteFunction(duckdb::DatabaseInstance &instance); +} diff --git a/src/expr/expr.cpp b/src/vortex_expr.cpp similarity index 77% rename from src/expr/expr.cpp rename to src/vortex_expr.cpp index eeb2917..fae6f17 100644 --- a/src/expr/expr.cpp +++ b/src/vortex_expr.cpp @@ -1,21 +1,22 @@ -#include "expr/expr.hpp" +#include + #include "duckdb/planner/expression.hpp" #include "duckdb/planner/table_filter.hpp" #include "duckdb/planner/filter/constant_filter.hpp" #include "duckdb/common/exception.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "duckdb/parser/expression/columnref_expression.hpp" +#include "duckdb/parser/expression/comparison_expression.hpp" +#include "duckdb/parser/expression/constant_expression.hpp" +#include "duckdb/planner/expression/bound_between_expression.hpp" +#include "duckdb/planner/expression/bound_columnref_expression.hpp" +#include "duckdb/planner/expression/bound_comparison_expression.hpp" +#include "duckdb/planner/expression/bound_constant_expression.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckdb/planner/expression/bound_operator_expression.hpp" +#include "duckdb/planner/filter/conjunction_filter.hpp" +#include "duckdb/planner/filter/optional_filter.hpp" + +#include "vortex_expr.hpp" using duckdb::ConjunctionAndFilter; using duckdb::ConstantFilter; @@ -30,6 +31,8 @@ using duckdb::Value; using google::protobuf::Arena; using std::string; +namespace vortex { + // vortex expr proto ids. const string BETWEEN_ID = "between"; const string BINARY_ID = "binary"; @@ -59,16 +62,16 @@ enum TimeUnit : uint8_t { D = 4, }; -vortex::expr::Kind_BinaryOp into_binary_operation(ExpressionType type) { - static const std::unordered_map op_map = { - {ExpressionType::COMPARE_EQUAL, vortex::expr::Kind_BinaryOp_Eq}, - {ExpressionType::COMPARE_NOTEQUAL, vortex::expr::Kind_BinaryOp_NotEq}, - {ExpressionType::COMPARE_LESSTHAN, vortex::expr::Kind_BinaryOp_Lt}, - {ExpressionType::COMPARE_GREATERTHAN, vortex::expr::Kind_BinaryOp_Gt}, - {ExpressionType::COMPARE_LESSTHANOREQUALTO, vortex::expr::Kind_BinaryOp_Lte}, - {ExpressionType::COMPARE_GREATERTHANOREQUALTO, vortex::expr::Kind_BinaryOp_Gte}, - {ExpressionType::CONJUNCTION_AND, vortex::expr::Kind_BinaryOp_And}, - {ExpressionType::CONJUNCTION_OR, vortex::expr::Kind_BinaryOp_Or}}; +expr::Kind_BinaryOp into_binary_operation(ExpressionType type) { + static const std::unordered_map op_map = { + {ExpressionType::COMPARE_EQUAL, expr::Kind_BinaryOp_Eq}, + {ExpressionType::COMPARE_NOTEQUAL, expr::Kind_BinaryOp_NotEq}, + {ExpressionType::COMPARE_LESSTHAN, expr::Kind_BinaryOp_Lt}, + {ExpressionType::COMPARE_GREATERTHAN, expr::Kind_BinaryOp_Gt}, + {ExpressionType::COMPARE_LESSTHANOREQUALTO, expr::Kind_BinaryOp_Lte}, + {ExpressionType::COMPARE_GREATERTHANOREQUALTO, expr::Kind_BinaryOp_Gte}, + {ExpressionType::CONJUNCTION_AND, expr::Kind_BinaryOp_And}, + {ExpressionType::CONJUNCTION_OR, expr::Kind_BinaryOp_Or}}; auto value = op_map.find(type); if (value == op_map.end()) { @@ -94,8 +97,8 @@ TimeUnit timestamp_to_time_unit(const LogicalType &type) { } } -vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, bool nullable) { - auto *dtype = Arena::Create(&arena); +dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, bool nullable) { + auto *dtype = Arena::Create(&arena); switch (type_.id()) { case LogicalTypeId::INVALID: case LogicalTypeId::SQLNULL: @@ -106,43 +109,43 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, return dtype; case LogicalTypeId::TINYINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I8); + dtype->mutable_primitive()->set_type(dtype::I8); return dtype; case LogicalTypeId::SMALLINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I16); + dtype->mutable_primitive()->set_type(dtype::I16); return dtype; case LogicalTypeId::INTEGER: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I32); + dtype->mutable_primitive()->set_type(dtype::I32); return dtype; case LogicalTypeId::BIGINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I64); + dtype->mutable_primitive()->set_type(dtype::I64); return dtype; case LogicalTypeId::UTINYINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U8); + dtype->mutable_primitive()->set_type(dtype::U8); return dtype; case LogicalTypeId::USMALLINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U16); + dtype->mutable_primitive()->set_type(dtype::U16); return dtype; case LogicalTypeId::UINTEGER: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U32); + dtype->mutable_primitive()->set_type(dtype::U32); return dtype; case LogicalTypeId::UBIGINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U64); + dtype->mutable_primitive()->set_type(dtype::U64); return dtype; case LogicalTypeId::FLOAT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::F32); + dtype->mutable_primitive()->set_type(dtype::F32); return dtype; case LogicalTypeId::DOUBLE: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::F64); + dtype->mutable_primitive()->set_type(dtype::F64); return dtype; case LogicalTypeId::DECIMAL: { dtype->mutable_decimal()->set_nullable(nullable); @@ -162,7 +165,7 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, dtype->mutable_extension()->set_id(VORTEX_DATE_ID); auto storage = dtype->mutable_extension()->mutable_storage_dtype(); storage->mutable_primitive()->set_nullable(nullable); - storage->mutable_primitive()->set_type(vortex::dtype::I32); + storage->mutable_primitive()->set_type(dtype::I32); dtype->mutable_extension()->set_metadata(std::string({static_cast(TimeUnit::D)})); return dtype; } @@ -170,7 +173,7 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, dtype->mutable_extension()->set_id(VORTEX_TIME_ID); auto storage = dtype->mutable_extension()->mutable_storage_dtype(); storage->mutable_primitive()->set_nullable(nullable); - storage->mutable_primitive()->set_type(vortex::dtype::I32); + storage->mutable_primitive()->set_type(dtype::I32); dtype->mutable_extension()->set_metadata(std::string({static_cast(TimeUnit::Us)})); return dtype; } @@ -181,7 +184,7 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, dtype->mutable_extension()->set_id(VORTEX_TIMESTAMP_ID); auto storage = dtype->mutable_extension()->mutable_storage_dtype(); storage->mutable_primitive()->set_nullable(nullable); - storage->mutable_primitive()->set_type(vortex::dtype::I64); + storage->mutable_primitive()->set_type(dtype::I64); auto time_unit = static_cast(timestamp_to_time_unit(type_)); // This signifies a timestamp without a timezone // TODO(joe): support timezones @@ -193,15 +196,15 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, } } -vortex::scalar::Scalar *into_null_scalar(Arena &arena, LogicalType &logical_type) { - auto scalar = Arena::Create(&arena); +scalar::Scalar *into_null_scalar(Arena &arena, LogicalType &logical_type) { + auto scalar = Arena::Create(&arena); scalar->set_allocated_dtype(into_vortex_dtype(arena, logical_type, true)); scalar->mutable_value()->set_null_value(google::protobuf::NULL_VALUE); return scalar; } -vortex::scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, bool nullable) { - auto scalar = Arena::Create(&arena); +scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, bool nullable) { + auto scalar = Arena::Create(&arena); auto dtype = into_vortex_dtype(arena, value.type(), nullable); scalar->set_allocated_dtype(dtype); @@ -281,7 +284,7 @@ vortex::scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, boo } } -void set_column(const string &s, vortex::expr::Expr *column) { +void set_column(const string &s, expr::Expr *column) { column->set_id(GET_ITEM_ID); auto kind = column->mutable_kind(); auto get_item = kind->mutable_get_item(); @@ -292,15 +295,15 @@ void set_column(const string &s, vortex::expr::Expr *column) { id->set_id(IDENTITY_ID); } -void set_literal(Arena &arena, const Value &value, bool nullable, vortex::expr::Expr *constant) { +void set_literal(Arena &arena, const Value &value, bool nullable, expr::Expr *constant) { auto literal = constant->mutable_kind()->mutable_literal(); auto dvalue = into_vortex_scalar(arena, value, nullable); literal->set_allocated_value(dvalue); constant->set_id(LITERAL_ID); } -vortex::expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector> &child_filters, - const string &column_name) { +expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector> &child_filters, + const string &column_name) { D_ASSERT(!child_filters.empty()); if (child_filters.size() == 1) { @@ -308,14 +311,14 @@ vortex::expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector(nullptr); - auto hd = Arena::Create(&arena); + auto tail = static_cast(nullptr); + auto hd = Arena::Create(&arena); // Flatten the list of children into a linked list of AND values. for (size_t i = 0; i < child_filters.size() - 1; i++) { - vortex::expr::Expr *new_and = !tail ? hd : tail->add_children(); + expr::Expr *new_and = !tail ? hd : tail->add_children(); new_and->set_id(BINARY_ID); - new_and->mutable_kind()->set_binary_op(vortex::expr::Kind::And); + new_and->mutable_kind()->set_binary_op(expr::Kind::And); new_and->add_children()->Swap(table_expression_into_expr(arena, *child_filters[i], column_name)); tail = new_and; @@ -324,10 +327,10 @@ vortex::expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector &child_filters) { +expr::Expr *flatten_exprs(Arena &arena, const duckdb::vector &child_filters) { if (child_filters.empty()) { - auto expr = arena.Create(&arena); + auto expr = arena.Create(&arena); set_literal(arena, Value(true), true, expr); return expr; } @@ -337,14 +340,14 @@ vortex::expr::Expr *flatten_exprs(Arena &arena, const duckdb::vector(nullptr); - auto hd = Arena::Create(&arena); + auto tail = static_cast(nullptr); + auto hd = Arena::Create(&arena); // Flatten the list of children into a linked list of AND values. for (size_t i = 0; i < child_filters.size() - 1; i++) { - vortex::expr::Expr *new_and = !tail ? hd : tail->add_children(); + expr::Expr *new_and = !tail ? hd : tail->add_children(); new_and->set_id(BINARY_ID); - new_and->mutable_kind()->set_binary_op(vortex::expr::Kind::And); + new_and->mutable_kind()->set_binary_op(expr::Kind::And); new_and->add_children()->Swap(child_filters[i]); tail = new_and; @@ -366,8 +369,8 @@ std::optional expr_to_like_pattern(const duckdb::Expression &dexpr) { } } -vortex::expr::Expr *expression_into_vortex_expr(Arena &arena, const duckdb::Expression &dexpr) { - auto expr = Arena::Create(&arena); +expr::Expr *expression_into_vortex_expr(Arena &arena, const duckdb::Expression &dexpr) { + auto expr = Arena::Create(&arena); switch (dexpr.expression_class) { case duckdb::ExpressionClass::BOUND_COLUMN_REF: { auto &dcol_ref = dexpr.Cast(); @@ -449,8 +452,8 @@ vortex::expr::Expr *expression_into_vortex_expr(Arena &arena, const duckdb::Expr } } -vortex::expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter, const string &column_name) { - auto expr = Arena::Create(&arena); +expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter, const string &column_name) { + auto expr = Arena::Create(&arena); switch (filter.filter_type) { case TableFilterType::CONSTANT_COMPARISON: { auto &constant_filter = filter.Cast(); @@ -485,3 +488,5 @@ vortex::expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter throw Exception(ExceptionType::NOT_IMPLEMENTED, "table_expression_into_expr", {{"filter_type_id", std::to_string(static_cast(filter.filter_type))}}); } + +} // namespace vortex diff --git a/src/vortex_extension.cpp b/src/vortex_extension.cpp index 121647b..b6d120a 100644 --- a/src/vortex_extension.cpp +++ b/src/vortex_extension.cpp @@ -1,18 +1,14 @@ #define DUCKDB_EXTENSION_MAIN -#define ENABLE_DUCKDB_FFI - #include "duckdb/main/extension_util.hpp" -#include "vortex_extension.hpp" +#include "vortex_extension.hpp" #include "vortex_write.hpp" #include "vortex_scan.hpp" -#ifndef DUCKDB_EXTENSION_MAIN -#error DUCKDB_EXTENSION_MAIN not defined -#endif +using namespace duckdb; -namespace duckdb { +// The entry point class API can't be scoped to the vortex namespace. /// Called when the extension is loaded by DuckDB. /// It is responsible for registering functions and initializing state. @@ -22,8 +18,8 @@ namespace duckdb { void VortexExtension::Load(DuckDB &db) { DatabaseInstance &instance = *db.instance; - RegisterVortexWriteFunction(instance); - RegisterVortexScanFunction(instance); + vortex::RegisterWriteFunction(instance); + vortex::RegisterScanFunction(instance); } /// Returns the name of the Vortex extension. @@ -43,14 +39,14 @@ std::string VortexExtension::Version() const { return "0.1.0"; } -} // namespace duckdb - extern "C" { +__attribute__((__visibility__("default"))) DUCKDB_EXTENSION_API void vortex_init(duckdb::DatabaseInstance &db) { duckdb::DuckDB db_wrapper(db); - db_wrapper.LoadExtension(); + db_wrapper.LoadExtension(); } +__attribute__((__visibility__("default"))) DUCKDB_EXTENSION_API const char *vortex_version() { return duckdb::DuckDB::LibraryVersion(); } diff --git a/src/vortex_scan.cpp b/src/vortex_scan.cpp index be6fbac..019eeba 100644 --- a/src/vortex_scan.cpp +++ b/src/vortex_scan.cpp @@ -1,148 +1,168 @@ -#include "vortex_scan.hpp" +#define ENABLE_DUCKDB_FFI + +#include +#include +#include +#include +#include +#include #include "duckdb/common/exception.hpp" #include "duckdb/common/helper.hpp" -#include "duckdb/common/multi_file_reader.hpp" #include "duckdb/function/table_function.hpp" #include "duckdb/main/extension_util.hpp" #include "duckdb/common/file_system.hpp" -#include "vortex_extension.hpp" +#include "duckdb/common/multi_file/multi_file_list.hpp" +#include "duckdb/storage/object_cache.hpp" -#include +#include "concurrentqueue.h" +#include "vortex.hpp" +#include "vortex_scan.hpp" #include "vortex_common.hpp" -#include "expr/expr.hpp" - -namespace duckdb { +#include "vortex_expr.hpp" +#include "vortex_session.hpp" -// A value large enough that most systems can use all their threads. -// This is used to allocate `file_slots`, we could remove this later by having the init_local method increase the -// file slots size for each running. -constexpr uint32_t MAX_THREAD_COUNT = 192; +using namespace duckdb; -// This is a multiple of the 2048 duckdb vector size, it needs tuning -// This has a few factor effecting it: -// 1. A smaller value means for work for the vortex file reader. -// 2. A larger value reduces the parallelism available to the scanner -constexpr uint32_t ROW_SPLIT_COUNT = 2048 * 32; +namespace vortex { /// Bind data for the Vortex table function that holds information about the /// file and its schema. This data is populated during the bind phase, which /// happens during the query planning phase. -struct VortexBindData : public TableFunctionData { +struct BindData : public TableFunctionData { + // Session used to caching + shared_ptr session; + + shared_ptr file_list; vector columns_types; vector column_names; - uint64_t num_columns; - unique_ptr initial_file; - shared_ptr file_list; + // Used to read the schema during the bind phase and cached here to + // avoid having to open the same file again during the scan phase. + shared_ptr initial_file; // Used to create an arena for protobuf exprs, need a ptr since the bind arg is const. unique_ptr arena; - vector conjuncts; + vector conjuncts; bool Equals(const FunctionData &other_p) const override { - auto &other = other_p.Cast(); + auto &other = other_p.Cast(); return file_list == other.file_list && column_names == other.column_names && - columns_types == other.columns_types && num_columns == other.num_columns; + columns_types == other.columns_types; } unique_ptr Copy() const override { - auto result = make_uniq(); + auto result = make_uniq(); + result->session = session; result->file_list = file_list; result->columns_types = columns_types; result->column_names = column_names; - result->num_columns = num_columns; - return std::move(result); + result->initial_file = initial_file; + return std::move(result); } }; +struct ScanPartition { + uint64_t file_idx; + uint64_t start_row; + uint64_t end_row; +}; + /// Local state for the Vortex table function that tracks the progress of a scan /// operation. In DuckDB's execution model, a query reading from a file can be /// parallelized by dividing it into ranges, each handled by a different scan. -struct VortexScanLocalState : public LocalTableFunctionState { - idx_t current_row; - bool finished; - unique_ptr array; - unique_ptr cache; - uint32_t thread_id; - - explicit VortexScanLocalState(uint32_t thread_id) - : current_row(0), finished(false), array(nullptr), cache(nullptr), thread_id(thread_id) { - } -}; +struct ScanLocalState : public LocalTableFunctionState { + idx_t array_row_offset; + unique_ptr currently_scanned_array; + unique_ptr array_iterator; + unique_ptr conversion_cache; + + std::queue scan_partitions; -struct FileSlot { - std::mutex slot_lock; - unique_ptr array_stream; + // Thread local file. + std::optional thread_local_file_idx; }; -struct VortexScanGlobalState : public GlobalTableFunctionState { - // Must be <= MAX_THREAD_COUNT. - std::atomic_uint32_t thread_id_counter; +struct ScanGlobalState : public GlobalTableFunctionState { std::atomic_bool finished; + std::atomic_uint64_t cache_id; - std::uint64_t cache_id; - - // Each thread owns a file slot and is the thing only one allowed to modify the slot itself. - // Other threads can work-steal array batches from the slot, by taking out the mutex in the FileSlot. - // We allocate MAX_THREAD_COUNT threads, the max number threads allowed by this extension. - std::array file_slots; - - std::atomic_uint32_t next_file; vector expanded_files; optional_ptr filter; - // The precomputed filter string used in the query std::string filter_str; - // The precomputed column names used in the query - std::vector projected_column_names; + + // Limited to indicate progress in `table_scan_progress`. + std::atomic_uint32_t partitions_processed; + std::atomic_uint32_t partitons_total; + + // Number of files which have are fully partitioned. + std::atomic_uint32_t files_partitioned; + + // Next file to partition. + std::atomic_uint32_t next_file_idx; + + // Multi producer, multi consumer lockfree queue. + duckdb_moodycamel::ConcurrentQueue scan_partitions {8192}; + + std::vector> file_readers; // The column idx that must be returned by the scan. vector column_ids; vector projection_ids; + // The precomputed column names used in the query. + std::vector projected_column_names; // This is the max number threads that the extension might use. idx_t MaxThreads() const override { + constexpr uint32_t MAX_THREAD_COUNT = 192; return MAX_THREAD_COUNT; } - - explicit VortexScanGlobalState() - : thread_id_counter(0), finished(false), cache_id(0), file_slots(), next_file(0), filter(nullptr) { - } }; // Use to create vortex expressions from `TableFilterSet` filter. void CreateFilterExpression(google::protobuf::Arena &arena, vector column_names, optional_ptr filter, vector column_ids, - vector &conjuncts) { + vector &conjuncts) { if (filter == nullptr) { return; } for (const auto &[col_id, value] : filter->filters) { - auto col_name = column_names[column_ids[col_id]]; - auto conj = table_expression_into_expr(arena, *value, col_name); + auto column_name = column_names[column_ids[col_id]]; + auto conj = table_expression_into_expr(arena, *value, column_name); conjuncts.push_back(conj); } } +static void PopulateProjection(ScanGlobalState &global_state, const vector &column_names, + TableFunctionInitInput &input) { + global_state.projection_ids.reserve(input.projection_ids.size()); + for (auto proj_id : input.projection_ids) { + global_state.projection_ids.push_back(input.column_ids[proj_id]); + } + + global_state.projected_column_names.reserve(input.projection_ids.size()); + for (auto column_id : global_state.projection_ids) { + assert(column_id < column_names.size()); + global_state.projected_column_names.push_back(column_names[column_id].c_str()); + } +} + /// Extracts schema information from a Vortex file's data type. -static void ExtractVortexSchema(const vx_dtype *file_dtype, vector &column_types, - vector &column_names) { - uint32_t field_count = vx_dtype_field_count(file_dtype); +static void ExtractVortexSchema(DType &file_dtype, vector &column_types, vector &column_names) { + uint32_t field_count = vx_dtype_field_count(file_dtype.dtype); for (uint32_t idx = 0; idx < field_count; idx++) { char name_buffer[512]; int name_len = 0; - vx_dtype_field_name(file_dtype, idx, name_buffer, &name_len); + vx_dtype_field_name(file_dtype.dtype, idx, name_buffer, &name_len); std::string field_name(name_buffer, name_len); - vx_dtype *field_dtype = vx_dtype_field_dtype(file_dtype, idx); - vx_error *error = nullptr; - auto duckdb_type = vx_dtype_to_duckdb_logical_type(field_dtype, &error); - HandleError(error); + vx_dtype *field_dtype = vx_dtype_field_dtype(file_dtype.dtype, idx); + auto duckdb_type = Try([&](auto err) { return vx_dtype_to_duckdb_logical_type(field_dtype, err); }); column_names.push_back(field_name); column_types.push_back(LogicalType(*reinterpret_cast(duckdb_type))); @@ -151,10 +171,9 @@ static void ExtractVortexSchema(const vx_dtype *file_dtype, vector } } -const std::regex schema_prefix = std::regex("^[^/]*:\\/\\/.*$"); - std::string EnsureFileProtocol(FileSystem &fs, const std::string &path) { // If the path is a URL then don't change it, otherwise try to make the path an absolute path + static const std::regex schema_prefix = std::regex("^[^/]*:\\/\\/.*$"); if (std::regex_match(path, schema_prefix)) { return path; } @@ -168,19 +187,19 @@ std::string EnsureFileProtocol(FileSystem &fs, const std::string &path) { return prefix + absolute_path; } -static unique_ptr OpenFile(const std::string &filename, vector &column_types, - vector &column_names) { +static unique_ptr OpenFile(const std::string &filename, VortexSession &session, + vector &column_types, vector &column_names) { vx_file_open_options options { .uri = filename.c_str(), .property_keys = nullptr, .property_vals = nullptr, .property_len = 0}; - auto file = VortexFileReader::Open(&options); + auto file = FileReader::Open(&options, session); if (!file) { throw IOException("Failed to open Vortex file: " + filename); } - // This Ptr is owned by the file - const vx_dtype *file_dtype = vx_file_dtype(file->file); - if (vx_dtype_get(file_dtype) != DTYPE_STRUCT) { + // This pointer is owned by the file. + auto file_dtype = file->DType(); + if (vx_dtype_get(file_dtype.dtype) != DTYPE_STRUCT) { vx_file_reader_free(file->file); throw FatalException("Vortex file does not contain a struct array as a top-level dtype"); } @@ -190,107 +209,213 @@ static unique_ptr OpenFile(const std::string &filename, vector return file; } -static void VerifyNewFile(const VortexBindData &bind_data, vector &column_types, - vector &column_names) { +// Verifies that a new Vortex file's schema matches the expected schema from the bind phase. +// +// This function ensures schema consistency across all the files in a multi-file query. +// It compares the column types and names extracted from a new file against the schema +// obtained from the first file (stored in bind_data). +static void VerifyNewFile(const BindData &bind_data, vector &column_types, vector &column_names) { if (column_types.size() != bind_data.columns_types.size() || column_names != bind_data.column_names) { throw FatalException("Vortex file does not contain the same number of columns as the first"); } - for (auto idx = 0u; idx < bind_data.columns_types.size(); idx++) { - auto col_name = bind_data.column_names[idx]; - auto col_type = bind_data.columns_types[idx]; - if (col_name != column_names[idx]) { + + for (size_t idx = 0; idx < bind_data.columns_types.size(); ++idx) { + if (bind_data.column_names[idx] != column_names[idx]) { throw FatalException("Vortex file contains a column with a different name to the first"); } - if (col_type != column_types[idx]) { + if (bind_data.columns_types[idx] != column_types[idx]) { throw FatalException("Vortex file contains a column with a different type to the first"); } } } -static unique_ptr OpenFileAndVerify(FileSystem &fs, const std::string &filename, - const VortexBindData &bind_data) { +static unique_ptr OpenFileAndVerify(FileSystem &fs, VortexSession &session, const std::string &filename, + const BindData &bind_data) { auto new_column_names = vector(); new_column_names.reserve(bind_data.column_names.size()); + auto new_column_types = vector(); - new_column_names.reserve(bind_data.columns_types.size()); + new_column_types.reserve(bind_data.columns_types.size()); - auto file = OpenFile(EnsureFileProtocol(fs, filename), new_column_types, new_column_names); + auto file = OpenFile(EnsureFileProtocol(fs, filename), session, new_column_types, new_column_names); VerifyNewFile(bind_data, new_column_types, new_column_names); return file; } -static unique_ptr OpenArrayStream(const VortexBindData &bind_data, - VortexScanGlobalState &global_state, VortexFileReader *file) { - auto options = vx_file_scan_options { +static bool PinFileToThread(ScanGlobalState &global_state) { + // This is an approximation to determine whether we should switch to + // distributing partitions of the same file across threads and does + // not need to be exact in terms of how many threads DuckDB actually uses. + const auto thread_count = std::thread::hardware_concurrency(); + const auto file_count = global_state.expanded_files.size(); + return (file_count - global_state.files_partitioned) > thread_count; +} + +static void CreateScanPartitions(ClientContext &context, const BindData &bind, ScanGlobalState &global_state, + ScanLocalState &local_state, uint64_t file_idx, FileReader &file_reader) { + + if (global_state.file_readers[file_idx]->CanPrune(global_state.filter_str.data(), + static_cast(global_state.filter_str.length()))) { + global_state.files_partitioned += 1; + return; + } + + const auto row_count = Try([&](auto err) { return vx_file_row_count(file_reader.file, err); }); + + const auto thread_count = std::thread::hardware_concurrency(); + const auto file_count = global_state.expanded_files.size(); + + // This is a multiple of the 2048 DuckDB vector size: + // + // Factors to consider: + // 1. A smaller value means more work for the Vortex file reader. + // 2. A larger value reduces the parallelism available to the scanner + const uint64_t partition_size = 2048 * (thread_count > file_count ? 32 : 64); + + const auto partition_count = std::max(static_cast(1), row_count / partition_size); + global_state.partitons_total += partition_count; + const bool pin_file_to_thread = PinFileToThread(global_state); + + if (pin_file_to_thread) { + local_state.thread_local_file_idx = file_idx; + } + + for (size_t partition_idx = 0; partition_idx < partition_count; ++partition_idx) { + const auto scan_partition = ScanPartition { + .file_idx = file_idx, + .start_row = partition_idx * partition_size, + .end_row = (partition_idx + 1) == partition_count ? row_count : (partition_idx + 1) * partition_size, + }; + + if (pin_file_to_thread) { + local_state.scan_partitions.push(scan_partition); + } else { + global_state.scan_partitions.enqueue(scan_partition); + } + } + + global_state.files_partitioned += 1; + D_ASSERT(global_state.files_partitioned <= global_state.expanded_files.size()); +} + +static unique_ptr OpenArrayIter(ScanGlobalState &global_state, shared_ptr &file_reader, + ScanPartition row_range_partition) { + const auto options = vx_file_scan_options { .projection = global_state.projected_column_names.data(), - .projection_len = static_cast(global_state.projected_column_names.size()), + .projection_len = static_cast(global_state.projected_column_names.size()), .filter_expression = global_state.filter_str.data(), - .filter_expression_len = static_cast(global_state.filter_str.length()), - .split_by_row_count = ROW_SPLIT_COUNT, + .filter_expression_len = static_cast(global_state.filter_str.length()), + .split_by_row_count = 0, + .row_range_start = row_range_partition.start_row, + .row_range_end = row_range_partition.end_row, }; - vx_error *error = nullptr; - auto scan = vx_file_scan(file->file, &options, &error); - HandleError(error); - - return make_uniq(scan); + return make_uniq(file_reader->Scan(&options)); } -static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) { - auto &bind_data = data.bind_data->Cast(); // NOLINT - auto &global_state = data.global_state->Cast(); // NOLINT - auto &local_state = data.local_state->Cast(); // NOLINT +// Assigns the next array from the array stream. +// +// Returns true if a new array was assigned, false otherwise. +static bool GetNextArray(ClientContext &context, const BindData &bind_data, ScanGlobalState &global_state, + ScanLocalState &local_state, DataChunk &output) { - if (local_state.array == nullptr) { - auto &slot = global_state.file_slots[local_state.thread_id]; - std::lock_guard _l(slot.slot_lock); - - if (global_state.finished.load() && slot.array_stream == nullptr) { - output.SetCardinality(0); - return; + // Try to deque a partition off the thread local queue. + auto try_dequeue = [&](ScanPartition &scan_partition) { + if (local_state.scan_partitions.empty()) { + return false; } - // 1. check we can make progress on current owned file - // 2. check we can get another file - // todo: 3. check if we can work steal from another thread - // 4. we are done + scan_partition = local_state.scan_partitions.front(); + local_state.scan_partitions.pop(); + return true; + }; + + if (local_state.array_iterator == nullptr) { + ScanPartition partition; + + if (bool success = (try_dequeue(partition) || global_state.scan_partitions.try_dequeue(partition)); !success) { - while (local_state.array == nullptr) { - if (slot.array_stream == nullptr) { - auto file_idx = global_state.next_file.fetch_add(1); + // Check whether all partitions have been processed. + if (global_state.files_partitioned == global_state.expanded_files.size()) { - if (file_idx >= global_state.expanded_files.size()) { - local_state.finished = true; + // A new partition might have been created after the first pop. Therefore, + // one more pop is necessary to ensure no more partitions are left to process. + if (success = global_state.scan_partitions.try_dequeue(partition); !success) { global_state.finished = true; - output.Reset(); - return; + return false; } + } + + if (!success) { + return false; + } + } - auto file_name = global_state.expanded_files[file_idx]; - auto file = OpenFileAndVerify(FileSystem::GetFileSystem(context), file_name, bind_data); + // Layout readers are safe to share across threads for reading. Further, they + // are created before pushing partitions of the corresponing files into a queue. + auto file_reader = global_state.file_readers[partition.file_idx]; + local_state.array_iterator = OpenArrayIter(global_state, file_reader, partition); + } + + local_state.currently_scanned_array = local_state.array_iterator->NextArray(); + local_state.array_row_offset = 0; + + if (local_state.currently_scanned_array == nullptr) { + local_state.array_iterator = nullptr; + global_state.partitions_processed += 1; + + return false; + } - slot.array_stream = OpenArrayStream(bind_data, global_state, file.get()); + return true; +} + +static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) { + auto &bind_data = data.bind_data->Cast(); + auto &global_state = data.global_state->Cast(); + auto &local_state = data.local_state->Cast(); + + if (local_state.currently_scanned_array == nullptr) { + while (!GetNextArray(context, bind_data, global_state, local_state, output)) { + if (global_state.finished) { + output.Reset(); + output.SetCardinality(0); + return; } - local_state.array = slot.array_stream->NextArray(); - if (local_state.array == nullptr) { - slot.array_stream = nullptr; + // Free file readers when owned by the thread. + if (local_state.scan_partitions.empty() && local_state.thread_local_file_idx.has_value()) { + global_state.file_readers[local_state.thread_local_file_idx.value()] = nullptr; + local_state.thread_local_file_idx.reset(); + } + + // Create new scan partitions in case the queue is empty. + if (auto file_idx = global_state.next_file_idx.fetch_add(1); + file_idx < global_state.expanded_files.size()) { + if (file_idx == 0) { + global_state.file_readers[0] = bind_data.initial_file; + } else { + auto file_name = global_state.expanded_files[file_idx]; + global_state.file_readers[file_idx] = + OpenFileAndVerify(FileSystem::GetFileSystem(context), *bind_data.session, file_name, bind_data); + } + + CreateScanPartitions(context, bind_data, global_state, local_state, file_idx, + *global_state.file_readers[file_idx]); } } - local_state.current_row = 0; } - if (local_state.cache == nullptr) { - // Create a unique value so each cache can be differentiated. - local_state.cache = make_uniq(global_state.cache_id++); + if (local_state.conversion_cache == nullptr) { + local_state.conversion_cache = make_uniq(global_state.cache_id++); } - local_state.current_row = local_state.array->ToDuckDBVector( - local_state.current_row, reinterpret_cast(&output), local_state.cache.get()); + local_state.array_row_offset = local_state.currently_scanned_array->ToDuckDBVector( + local_state.array_row_offset, reinterpret_cast(&output), local_state.conversion_cache.get()); - if (local_state.current_row == 0) { - local_state.array = nullptr; - local_state.cache = nullptr; + if (local_state.array_row_offset == 0) { + local_state.currently_scanned_array = nullptr; + local_state.conversion_cache = nullptr; } } @@ -300,17 +425,25 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, /// like projection pushdown and predicate pushdown. static unique_ptr VortexBind(ClientContext &context, TableFunctionBindInput &input, vector &column_types, vector &column_names) { - auto result = make_uniq(); - + auto result = make_uniq(); result->arena = make_uniq(); - // Get the filename glob from the input. - auto vec = duckdb::vector {input.inputs[0].GetValue()}; - result->file_list = make_shared_ptr(context, vec, FileGlobOptions::DISALLOW_EMPTY); + const static string VortexExtensionKey = std::string("vortex_extension:vortex_session"); + auto session = ObjectCache::GetObjectCache(context).Get(VortexExtensionKey); + if (session == nullptr) { + ObjectCache::GetObjectCache(context).Put(VortexExtensionKey, make_shared_ptr()); + session = ObjectCache::GetObjectCache(context).Get(VortexExtensionKey); + } - auto filename = EnsureFileProtocol(FileSystem::GetFileSystem(context), result->file_list->GetFirstFile()); + result->session = session; - result->initial_file = OpenFile(filename, column_types, column_names); + auto file_glob_strings = duckdb::vector {input.inputs[0].GetValue()}; + auto file_glob = duckdb::vector(file_glob_strings.begin(), file_glob_strings.end()); + result->file_list = make_shared_ptr(context, file_glob, FileGlobOptions::DISALLOW_EMPTY); + + // Open the first file to extract the schema. + auto filename = EnsureFileProtocol(FileSystem::GetFileSystem(context), result->file_list->GetFirstFile().path); + result->initial_file = OpenFile(filename, *result->session, column_types, column_names); result->column_names = column_names; result->columns_types = column_types; @@ -319,93 +452,75 @@ static unique_ptr VortexBind(ClientContext &context, TableFunction } unique_ptr VortexCardinality(ClientContext &context, const FunctionData *bind_data) { - auto &data = bind_data->Cast(); + auto &data = bind_data->Cast(); - return make_uniq(data.num_columns, data.num_columns); + auto row_count = data.initial_file->FileRowCount(); + if (data.file_list->GetTotalFileCount() == 1) { + return make_uniq(row_count, row_count); + } else { + return make_uniq(row_count * data.file_list->GetTotalFileCount()); + } } // Removes all filter expressions (from `filters`) which can be pushed down. void PushdownComplexFilter(ClientContext &context, LogicalGet &get, FunctionData *bind_data, vector> &filters) { - auto &bind = bind_data->Cast(); - if (filters.empty()) { return; } + auto &bind = bind_data->Cast(); bind.conjuncts.reserve(filters.size()); for (auto &filter : filters) { - auto expr = expression_into_vortex_expr(*bind.arena, *filter); - if (expr != nullptr) { + if (auto expr = expression_into_vortex_expr(*bind.arena, *filter); expr != nullptr) { bind.conjuncts.push_back(expr); } } } -void RegisterVortexScanFunction(DatabaseInstance &instance) { +void RegisterScanFunction(DatabaseInstance &instance) { TableFunction vortex_scan("read_vortex", {LogicalType::VARCHAR}, VortexScanFunction, VortexBind); vortex_scan.init_global = [](ClientContext &context, TableFunctionInitInput &input) -> unique_ptr { - auto &bind = input.bind_data->Cast(); - auto state = make_uniq(); + auto &bind = input.bind_data->CastNoConst(); + auto global_state = make_uniq(); - state->filter = input.filters; - state->projection_ids.reserve(input.projection_ids.size()); - for (auto proj_id : input.projection_ids) { - state->projection_ids.push_back(input.column_ids[proj_id]); + // TODO(joe): do this expansion gradually in the scan to avoid a slower start. + auto file_infos = bind.file_list->GetAllFiles(); + global_state->expanded_files.reserve(file_infos.size()); + for (const auto &file_info : file_infos) { + global_state->expanded_files.push_back(file_info.path); } + global_state->filter = input.filters; + global_state->column_ids = input.column_ids; - state->column_ids = input.column_ids; - - // TODO(joe): do this expansion gradually in the scan to avoid a slower start. - state->expanded_files = bind.file_list->GetAllFiles(); + PopulateProjection(*global_state, bind.column_names, input); // Most expressions are extracted from `PushdownComplexFilter`, the final filters come from `input.filters`. - vector conjuncts; - std::copy(bind.conjuncts.begin(), bind.conjuncts.end(), std::back_inserter(conjuncts)); - CreateFilterExpression(*bind.arena, bind.column_names, input.filters, input.column_ids, conjuncts); - - auto column_names = std::vector(); - for (auto col_id : state->projection_ids) { - assert(col_id < bind.column_names.size()); - column_names.push_back(bind.column_names[col_id].c_str()); - } - state->projected_column_names = column_names; - - auto exprs = flatten_exprs(*bind.arena, bind.conjuncts); - if (exprs != nullptr) { - state->filter_str = exprs->SerializeAsString(); + CreateFilterExpression(*bind.arena, bind.column_names, input.filters, input.column_ids, bind.conjuncts); + if (auto exprs = flatten_exprs(*bind.arena, bind.conjuncts); exprs != nullptr) { + global_state->filter_str = exprs->SerializeAsString(); } - // Can ignore mutex since no other threads are running now. - state->file_slots[0].array_stream = OpenArrayStream(bind, *state, bind.initial_file.get()); - state->next_file = 1; + // Resizing the empty vector default constructs std::shared pointers at all indices with nullptr. + global_state->file_readers.resize(global_state->expanded_files.size()); - // We are finished with the arena bind.arena->Reset(); - - return std::move(state); + return std::move(global_state); }; vortex_scan.init_local = [](ExecutionContext &context, TableFunctionInitInput &input, GlobalTableFunctionState *global_state) -> unique_ptr { - auto &v_global_state = global_state->Cast(); - - auto thread_id = v_global_state.thread_id_counter.fetch_add(1); - assert(thread_id < MAX_THREAD_COUNT); - - auto state = make_uniq(thread_id); - return state; + return make_uniq(); }; vortex_scan.table_scan_progress = [](ClientContext &context, const FunctionData *bind_data, - const GlobalTableFunctionState *global_state) { - auto &gstate = global_state->Cast(); - - return 100 * (static_cast(gstate.next_file.load()) / static_cast(gstate.expanded_files.size())); + const GlobalTableFunctionState *global_state) -> double { + auto &gstate = global_state->Cast(); + return 100.0 * (static_cast(gstate.partitions_processed) / static_cast(gstate.partitons_total)); }; vortex_scan.pushdown_complex_filter = PushdownComplexFilter; @@ -417,4 +532,4 @@ void RegisterVortexScanFunction(DatabaseInstance &instance) { ExtensionUtil::RegisterFunction(instance, vortex_scan); } -} // namespace duckdb +} // namespace vortex diff --git a/src/vortex_write.cpp b/src/vortex_write.cpp index e43358e..8eefd0d 100644 --- a/src/vortex_write.cpp +++ b/src/vortex_write.cpp @@ -1,18 +1,19 @@ -#define ENABLE_DUCKDB_FFI - -#include "vortex_write.hpp" -#include "vortex_common.hpp" #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" - #include "duckdb/common/exception.hpp" -#include "duckdb/common/multi_file_reader.hpp" #include "duckdb/main/extension_util.hpp" #include "duckdb/function/copy_function.hpp" #include "duckdb/parser/constraints/not_null_constraint.hpp" -namespace duckdb { +#include "vortex_write.hpp" +#include "vortex_common.hpp" + +// TODO(joe): enable multi-threaded writes, see `WriteSink`. + +using namespace duckdb; + +namespace vortex { -struct VortexWriteBindData : public TableFunctionData { +struct WriteBindData : public TableFunctionData { //! True is the column is nullable vector column_nullable; @@ -20,29 +21,23 @@ struct VortexWriteBindData : public TableFunctionData { vector column_names; }; -struct VortexWriteGlobalData : public GlobalFunctionData { - std::string file_name; - std::unique_ptr file; - unique_ptr array; +struct WriteGlobalData : public GlobalFunctionData { + unique_ptr sink; }; -struct VortexWriteLocalData : public LocalFunctionData {}; +struct WriteLocalData : public LocalFunctionData {}; -void VortexWriteSink(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate, - LocalFunctionData &lstate, DataChunk &input) { - auto &global_state = gstate.Cast(); - auto bind = bind_data.Cast(); - - auto chunk = DataChunk(); - chunk.Initialize(Allocator::Get(context.client), bind.sql_types); +void WriteSink(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate, + LocalFunctionData &lstate, DataChunk &input) { + auto &global_state = gstate.Cast(); + auto bind = bind_data.Cast(); for (auto i = 0u; i < input.ColumnCount(); i++) { input.data[i].Flatten(input.size()); } - - auto new_array = vx_array_append_duckdb_chunk( - global_state.array->array, reinterpret_cast(&input), bind.column_nullable.data()); - global_state.array = make_uniq(new_array); + // TODO(joe): go to a model of combining local chunked into arrays of a specific size + // before push each of these larger chunks into the global_state + global_state.sink->PushChunk(input); } std::vector TableNullability(ClientContext &context, const string &catalog_name, const string &schema, @@ -53,17 +48,17 @@ std::vector TableNullability(ClientContext &context, const string &catalo // Main is the default schema auto schema_name = schema != "" ? schema : "main"; - auto entry = catalog.GetEntry(context, CatalogType::TABLE_ENTRY, schema_name, table, OnEntryNotFound::RETURN_NULL, - error_context); + auto entry = catalog.GetEntry(context, schema_name, table, OnEntryNotFound::RETURN_NULL); auto vec = std::vector(); - if (!entry) { + // entry can non-null and not a table entry + if (!entry || entry->type != CatalogType::TABLE_ENTRY) { // If there is no entry, it is okay to return all nullable columns. return vec; } auto &table_entry = entry->Cast(); for (auto &constraint : table_entry.GetConstraints()) { - if (constraint->type == ConstraintType::NOT_NULL) { + if (constraint && constraint->type == ConstraintType::NOT_NULL) { auto &null_constraint = constraint->Cast(); vec.push_back(null_constraint.index.index); } @@ -71,11 +66,11 @@ std::vector TableNullability(ClientContext &context, const string &catalo return vec; } -void RegisterVortexWriteFunction(DatabaseInstance &instance) { +void RegisterWriteFunction(DatabaseInstance &instance) { CopyFunction function("vortex"); function.copy_to_bind = [](ClientContext &context, CopyFunctionBindInput &input, const vector &names, const vector &sql_types) -> unique_ptr { - auto result = make_uniq(); + auto result = make_uniq(); auto not_null = TableNullability(context, input.info.catalog, input.info.schema, input.info.table); @@ -90,9 +85,8 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) { }; function.copy_to_initialize_global = [](ClientContext &context, FunctionData &bind_data, const string &file_path) -> unique_ptr { - auto &bind = bind_data.Cast(); - auto gstate = make_uniq(); - gstate->file_name = file_path; + auto &bind = bind_data.Cast(); + auto gstate = make_uniq(); auto column_names = std::vector(); for (const auto &col_id : bind.column_names) { @@ -104,24 +98,18 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) { column_types.push_back(reinterpret_cast(&col_type)); } - vx_error *error = nullptr; - auto array = vx_array_create_empty_from_duckdb_table(column_types.data(), bind.column_nullable.data(), - column_names.data(), column_names.size(), &error); - HandleError(error); - - gstate->array = make_uniq(array); + auto dtype = DType::FromDuckDBTable(column_types, bind.column_nullable, column_names); + gstate->sink = ArrayStreamSink::Create(file_path, std::move(dtype)); return std::move(gstate); }; function.copy_to_initialize_local = [](ExecutionContext &context, FunctionData &bind_data) -> unique_ptr { - return std::move(make_uniq()); + return std::move(make_uniq()); }; - function.copy_to_sink = VortexWriteSink; + function.copy_to_sink = WriteSink; function.copy_to_finalize = [](ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate) { - auto &global_state = gstate.Cast(); - vx_error *error; - vx_file_write_array(global_state.file_name.c_str(), global_state.array->array, &error); - HandleError(error); + auto &global_state = gstate.Cast(); + global_state.sink->Close(); }; function.execution_mode = [](bool preserve_insertion_order, bool supports_batch_index) -> CopyFunctionExecutionMode { @@ -132,4 +120,4 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) { ExtensionUtil::RegisterFunction(instance, function); } -} // namespace duckdb +} // namespace vortex diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a260338..e4175aa 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,6 +1,6 @@ include_directories(${CMAKE_SOURCE_DIR}/../src/include) -add_executable(vortex_tests src/expr.cpp) +add_executable(vortex_tests src/vortex_expr_test.cpp) target_include_directories(vortex_tests PRIVATE ${catch2_SOURCE_DIR}/single_include ) diff --git a/test/sql/nulls.test b/test/sql/nulls.test index 4bb572d..0bb8a12 100644 --- a/test/sql/nulls.test +++ b/test/sql/nulls.test @@ -1,25 +1,37 @@ # name: test/sql/nulls.test # description: test nullable values # group: [vortex] - -# Before we load the extension, this will fail +# This test is used to verify that NULL values writen to a vortex file round trip. +# This is that they are preserved after a file write + read. require vortex # copy vortex statement ok -COPY ((SELECT generate_series as s1, NULL as s2 from generate_series(0, 4)) union (SELECT generate_series as s1, generate_series + 1 as s2 from generate_series(5,8))) TO '__TEST_DIR__/test.vortex' (FORMAT VORTEX) +COPY (( + SELECT + generate_series as s, + NULL as s1, + NULL as s2 + FROM generate_series(0, 4) +) UNION ( + SELECT + generate_series as s, + CAST(generate_series as decimal(15,2)) as s1, + generate_series + 1 as s2 + FROM generate_series(5,8))) +TO '__TEST_DIR__/test.vortex' (FORMAT VORTEX) # read vortex -query II -SELECT s1, s2 FROM read_vortex('__TEST_DIR__/test.vortex') ORDER BY s1; +query III +SELECT s, s1, s2 FROM read_vortex('__TEST_DIR__/test.vortex') ORDER BY s; ---- -0 NULL -1 NULL -2 NULL -3 NULL -4 NULL -5 6 -6 7 -7 8 -8 9 +0 NULL NULL +1 NULL NULL +2 NULL NULL +3 NULL NULL +4 NULL NULL +5 5.00 6 +6 6.00 7 +7 7.00 8 +8 8.00 9 diff --git a/test/sql/table.test b/test/sql/table.test index e05c377..b6f8f02 100644 --- a/test/sql/table.test +++ b/test/sql/table.test @@ -16,7 +16,7 @@ CREATE TABLE generated_data_table ( -- hugeint_col HUGEINT, all commented values are not supported yet. float_col FLOAT, double_col DOUBLE, --- decimal_col DECIMAL(10, 2), + decimal_col DECIMAL(10, 2), varchar_col VARCHAR, date_col DATE, timmestamp_s_col TIMESTAMP_S, @@ -39,7 +39,7 @@ SELECT -- CAST(seq * 1000 AS HUGEINT) AS hugeint_col, CAST(seq AS FLOAT) / 100.0 AS float_col, CAST(seq AS DOUBLE) / 1000.0 AS double_col, --- CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, + CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, 'Value ' || seq AS varchar_col, DATE '1992-03-22' + to_days(cast(seq AS INTEGER)) AS date_col, TIMESTAMP_S '2025-01-01 00:00:00' + to_seconds(cast(seq AS INTEGER)) as timmestamp_s_col, diff --git a/test/sql/table.test_slow b/test/sql/table.test_slow index 7e8b142..c55bc83 100644 --- a/test/sql/table.test_slow +++ b/test/sql/table.test_slow @@ -16,7 +16,7 @@ CREATE TABLE generated_data_table ( -- hugeint_col HUGEINT, all commented values are not supported yet. float_col FLOAT, double_col DOUBLE, --- decimal_col DECIMAL(10, 2), + decimal_col DECIMAL(10, 2), varchar_col VARCHAR, date_col DATE, timmestamp_s_col TIMESTAMP_S, @@ -39,7 +39,7 @@ SELECT -- CAST(seq * 1000 AS HUGEINT) AS hugeint_col, CAST(seq AS FLOAT) / 100.0 AS float_col, CAST(seq AS DOUBLE) / 1000.0 AS double_col, --- CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, + CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, 'Value ' || seq AS varchar_col, DATE '1992-03-22' + to_days(cast(seq AS INTEGER)) AS date_col, TIMESTAMP_S '2025-01-01 00:00:00' + to_seconds(cast(seq AS INTEGER)) as timmestamp_s_col, diff --git a/test/src/expr.cpp b/test/src/expr.cpp deleted file mode 100644 index a099fe8..0000000 --- a/test/src/expr.cpp +++ /dev/null @@ -1,30 +0,0 @@ -#include "catch2/catch_test_macros.hpp" -#include "duckdb.hpp" -#include "expr/expr.hpp" - -#include -#include - -TEST_CASE("Test DuckDB list handling", "[list]") { - - google::protobuf::Arena arena; - - auto filter = duckdb::make_uniq(duckdb::ExpressionType::COMPARE_EQUAL, duckdb::Value(1)); - auto filter2 = duckdb::make_uniq(duckdb::ExpressionType::COMPARE_EQUAL, duckdb::Value(2)); - auto filter3 = duckdb::make_uniq(duckdb::ExpressionType::COMPARE_EQUAL, duckdb::Value(3)); - - auto filter_and = duckdb::ConjunctionAndFilter(); - - filter_and.child_filters.push_back(std::move(filter)); - filter_and.child_filters.push_back(std::move(filter2)); - filter_and.child_filters.push_back(std::move(filter3)); - - auto col = std::string("a"); - auto expr = table_expression_into_expr(arena, filter_and, col); - - REQUIRE(expr->children().size() == 2); - REQUIRE(expr->kind().binary_op() == vortex::expr::Kind_BinaryOp_And); - - REQUIRE(expr->children()[0].children().size() == 2); - REQUIRE(expr->children()[0].kind().binary_op() == vortex::expr::Kind_BinaryOp_Eq); -} diff --git a/vortex b/vortex index fc3196c..ac32e4e 160000 --- a/vortex +++ b/vortex @@ -1 +1 @@ -Subproject commit fc3196ccd45a8a3451a45b1ec44d923e40c497e7 +Subproject commit ac32e4ea9d47734f12ab93096cfb3a60e3c91047 From 2ac17e1391912c53efec553e25230f7b905dfd91 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 28 May 2025 09:48:46 +0100 Subject: [PATCH 2/5] update to ddb v1.3.0 --- .github/workflows/MainDistributionPipeline.yml | 6 +++--- extension-ci-tools | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index ebf049b..3b8fdc2 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -14,10 +14,10 @@ concurrency: jobs: duckdb-stable-build: name: Build extension binaries - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@6a7a4f24c5999355ab36c0a6835baf891fc9d522 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@71d20029c5314dfc34f3bbdab808b9bce03b8003 with: - duckdb_version: v1.2.2 - ci_tools_version: 6a7a4f24c5999355ab36c0a6835baf891fc9d522 + duckdb_version: v1.3.0 + ci_tools_version: 71d20029c5314dfc34f3bbdab808b9bce03b8003 extension_name: vortex exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw;windows_amd64;linux_arm64" extra_toolchains: "rust" diff --git a/extension-ci-tools b/extension-ci-tools index 6a7a4f2..71d2002 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 6a7a4f24c5999355ab36c0a6835baf891fc9d522 +Subproject commit 71d20029c5314dfc34f3bbdab808b9bce03b8003 From fcb6ac5eb354fc9a500f926e699beb773b95a30a Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 28 May 2025 09:51:58 +0100 Subject: [PATCH 3/5] add --- src/include/vortex_expr.hpp | 14 ++++++++++++++ src/include/vortex_session.hpp | 28 ++++++++++++++++++++++++++++ test/src/vortex_expr_test.cpp | 30 ++++++++++++++++++++++++++++++ 3 files changed, 72 insertions(+) create mode 100644 src/include/vortex_expr.hpp create mode 100644 src/include/vortex_session.hpp create mode 100644 test/src/vortex_expr_test.cpp diff --git a/src/include/vortex_expr.hpp b/src/include/vortex_expr.hpp new file mode 100644 index 0000000..cdf831f --- /dev/null +++ b/src/include/vortex_expr.hpp @@ -0,0 +1,14 @@ +#pragma once + +#include "duckdb/planner/expression.hpp" +#include "duckdb/planner/table_filter.hpp" + +#include "expr.pb.h" + +namespace vortex { +vortex::expr::Expr *table_expression_into_expr(google::protobuf::Arena &arena, duckdb::TableFilter &filter, + const std::string &column_name); +vortex::expr::Expr *expression_into_vortex_expr(google::protobuf::Arena &arena, const duckdb::Expression &expr); +vortex::expr::Expr *flatten_exprs(google::protobuf::Arena &arena, + const duckdb::vector &child_filters); +} // namespace vortex diff --git a/src/include/vortex_session.hpp b/src/include/vortex_session.hpp new file mode 100644 index 0000000..cea5c7d --- /dev/null +++ b/src/include/vortex_session.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include "vortex.hpp" +#include "duckdb/storage/object_cache.hpp" + +namespace vortex { + +class VortexSession : public duckdb::ObjectCacheEntry { +public: + VortexSession() : session(vx_session_create()) { + } + + ~VortexSession() override { + vx_session_free(session); + } + + vx_session *session; + + static std::string ObjectType() { + return "vortex_session_cache_metadata"; + } + + std::string GetObjectType() override { + return ObjectType(); + } +}; + +} // namespace vortex \ No newline at end of file diff --git a/test/src/vortex_expr_test.cpp b/test/src/vortex_expr_test.cpp new file mode 100644 index 0000000..df67b34 --- /dev/null +++ b/test/src/vortex_expr_test.cpp @@ -0,0 +1,30 @@ +#include + +#include "catch2/catch_test_macros.hpp" +#include "duckdb.hpp" +#include "duckdb/planner/filter/constant_filter.hpp" + +#include "vortex_expr.hpp" + +TEST_CASE("Test DuckDB list handling", "[list]") { + google::protobuf::Arena arena; + + auto filter = duckdb::make_uniq(duckdb::ExpressionType::COMPARE_EQUAL, duckdb::Value(1)); + auto filter2 = duckdb::make_uniq(duckdb::ExpressionType::COMPARE_EQUAL, duckdb::Value(2)); + auto filter3 = duckdb::make_uniq(duckdb::ExpressionType::COMPARE_EQUAL, duckdb::Value(3)); + + auto filter_and = duckdb::ConjunctionAndFilter(); + + filter_and.child_filters.push_back(std::move(filter)); + filter_and.child_filters.push_back(std::move(filter2)); + filter_and.child_filters.push_back(std::move(filter3)); + + auto col = std::string("a"); + auto expr = vortex::table_expression_into_expr(arena, filter_and, col); + + REQUIRE(expr->children().size() == 2); + REQUIRE(expr->kind().binary_op() == vortex::expr::Kind_BinaryOp_And); + + REQUIRE(expr->children()[0].children().size() == 2); + REQUIRE(expr->children()[0].kind().binary_op() == vortex::expr::Kind_BinaryOp_Eq); +} From 49c369d670aed61c210b099ab61f94b2802f7b5e Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 28 May 2025 09:57:37 +0100 Subject: [PATCH 4/5] update --- .github/workflows/MainDistributionPipeline.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index 3b8fdc2..9741d94 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -21,4 +21,3 @@ jobs: extension_name: vortex exclude_archs: "wasm_mvp;wasm_eh;wasm_threads;windows_amd64_rtools;windows_amd64_mingw;windows_amd64;linux_arm64" extra_toolchains: "rust" - override_ci_tools_repository: spiraldb/extension-ci-tools From 9bcea9c6614e41eaeb63cfccf5c96bd7a74fa222 Mon Sep 17 00:00:00 2001 From: Joe Isaacs Date: Wed, 28 May 2025 10:01:43 +0100 Subject: [PATCH 5/5] update --- .gitmodules | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitmodules b/.gitmodules index c47fc9c..5e7eb45 100644 --- a/.gitmodules +++ b/.gitmodules @@ -8,7 +8,7 @@ branch = main [submodule "duckdb"] path = duckdb - url = https://github.com/spiraldb/duckdb.git + url = https://github.com/duckdb/duckdb.git [submodule "vcpkg"] path = vcpkg url = git@github.com:microsoft/vcpkg.git