diff --git a/CMakeLists.txt b/CMakeLists.txt index a136c1e..790fe96 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,9 @@ project(${TARGET_NAME}_project) set(CMAKE_EXPORT_COMPILE_COMMANDS ON) set(CMAKE_CXX_STANDARD 17) +# Allow C++20 designator syntax in C++17 +add_compile_options(-Wno-c++20-designator) + include(FetchContent) FetchContent_Declare( Corrosion @@ -17,19 +20,20 @@ FetchContent_MakeAvailable(Corrosion) find_package(Catch2 CONFIG REQUIRED) find_package(Protobuf CONFIG REQUIRED) + if (APPLE) find_library(SECURITY_FRAMEWORK Security) endif () corrosion_import_crate(MANIFEST_PATH vortex/Cargo.toml CRATES vortex-ffi - FEATURES duckdb + FEATURES duckdb mimalloc CRATE_TYPES staticlib FLAGS --crate-type=staticlib ) set(EXTENSION_NAME ${TARGET_NAME}_extension) -set(EXTENSION_SOURCES src/vortex_extension.cpp src/expr/expr.cpp src/vortex_write.cpp src/vortex_scan.cpp) +set(EXTENSION_SOURCES src/vortex_extension.cpp src/vortex_expr.cpp src/vortex_write.cpp src/vortex_scan.cpp) set(LOADABLE_EXTENSION_NAME ${TARGET_NAME}_loadable_extension) # Generate C++ code from .proto files. @@ -38,7 +42,7 @@ set(PROTO_GEN_DIR ${CMAKE_CURRENT_SOURCE_DIR}/gen) file(MAKE_DIRECTORY ${PROTO_GEN_DIR}) protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS ${PROTO_FILES} PROTOC_OUT_DIR ${PROTO_GEN_DIR}) -include_directories(src/include ${PROTO_GEN_DIR} vortex/vortex-ffi/cinclude) +include_directories(src/include ${PROTO_GEN_DIR} ../vortex-ffi/cinclude) build_static_extension(${TARGET_NAME} ${EXTENSION_SOURCES} ${PROTO_SRCS}) build_loadable_extension(${TARGET_NAME} ${EXTENSION_SOURCES} ${PROTO_SRCS}) diff --git a/Makefile b/Makefile index d8b80f9..1406557 100644 --- a/Makefile +++ b/Makefile @@ -5,8 +5,16 @@ EXT_CONFIG=${PROJ_DIR}extension_config.cmake EXT_FLAGS=-DCMAKE_OSX_DEPLOYMENT_TARGET=12.0 -DOVERRIDE_GIT_DESCRIBE=v1.2.2 export MACOSX_DEPLOYMENT_TARGET=12.0 -export VCPKG_OSX_DEPLOYMENT_TARGET=12.0 + +# The version of DuckDB and its Vortex extension is either implicitly set by Git tag, e.g. v1.2.2, or commit +# SHA if the current commit does not have a tag. The implicitly set version can be overridden by defining the +# `OVERRIDE_GIT_DESCRIBE` environment variable. In context of the DuckDB community extension build, we have to +# rely on the Git tag, as DuckDB's CI performs a checkout by Git tag. Therefore, the version can't be explicitly +# set via environment variable for the community extension build. + +# export OVERRIDE_GIT_DESCRIBE=v1.2.2 export VCPKG_FEATURE_FLAGS=-binarycaching +export VCPKG_OSX_DEPLOYMENT_TARGET=12.0 export VCPKG_TOOLCHAIN_PATH := ${PROJ_DIR}vcpkg/scripts/buildsystems/vcpkg.cmake include extension-ci-tools/makefiles/duckdb_extension.Makefile diff --git a/src/include/vortex.hpp b/src/include/vortex.hpp index fe95cac..3a7a3e9 100644 --- a/src/include/vortex.hpp +++ b/src/include/vortex.hpp @@ -1,9 +1,3 @@ +// This header prefixes the auto-generated vortex.h with #pragma once. #pragma once - - -// Include Vortex FFI, with the DuckDB FFI feature -#ifndef ENABLE_DUCKDB_FFI -#define ENABLE_DUCKDB_FFI -#endif - -#include "vortex.h" \ No newline at end of file +#include "vortex.h" diff --git a/src/include/vortex_common.hpp b/src/include/vortex_common.hpp index d817401..d94e9d2 100644 --- a/src/include/vortex_common.hpp +++ b/src/include/vortex_common.hpp @@ -1,77 +1,149 @@ #pragma once +#define ENABLE_DUCKDB_FFI #include "duckdb.hpp" +#include "duckdb/common/unique_ptr.hpp" + #include "vortex.hpp" #include "vortex_error.hpp" -#include +namespace vortex { + +struct DType { + explicit DType(vx_dtype *dtype) : dtype(dtype) { + } + + static duckdb::unique_ptr FromDuckDBTable(const std::vector &column_types, + const std::vector &column_nullable, + const std::vector &column_names) { + D_ASSERT(column_names.size() == column_nullable.size()); + D_ASSERT(column_names.size() == column_types.size()); + + auto dtype = Try([&](auto err) { + return vx_duckdb_logical_type_to_dtype(column_types.data(), column_nullable.data(), column_names.data(), + column_names.size(), err); + }); -struct VortexConversionCache { - explicit VortexConversionCache(const unsigned long cache_id) : cache(vx_conversion_cache_create(cache_id)) { + return duckdb::make_uniq(dtype); } - ~VortexConversionCache() { + ~DType() { + if (dtype != nullptr) { + vx_dtype_free(dtype); + } + } + + vx_dtype *dtype; +}; + +struct ConversionCache { + explicit ConversionCache(const unsigned long cache_id) : cache(vx_conversion_cache_create(cache_id)) { + } + + ~ConversionCache() { vx_conversion_cache_free(cache); } vx_conversion_cache *cache; }; -struct VortexFileReader { - explicit VortexFileReader(vx_file_reader *file) : file(file) { +struct FileReader { + explicit FileReader(vx_file_reader *file) : file(file) { } - ~VortexFileReader() { + ~FileReader() { vx_file_reader_free(file); } - static duckdb::unique_ptr Open(const vx_file_open_options *options) { - vx_error *error; - auto file = vx_file_open_reader(options, &error); - HandleError(error); - return duckdb::make_uniq(file); + static duckdb::unique_ptr Open(const vx_file_open_options *options) { + auto file = Try([&](auto err) { return vx_file_open_reader(options, err); }); + return duckdb::make_uniq(file); + } + + uint64_t FileRowCount() { + return Try([&](auto err) { return vx_file_row_count(file, err); }); + } + + struct DType DType() { + return vortex::DType(vx_file_dtype(file)); } vx_file_reader *file; }; -struct VortexArray { - explicit VortexArray(vx_array *array) : array(array) { +struct Array { + explicit Array(vx_array *array) : array(array) { } - ~VortexArray() { + ~Array() { if (array != nullptr) { vx_array_free(array); } } - idx_t ToDuckDBVector(idx_t current_row, duckdb_data_chunk output, const VortexConversionCache *cache) const { - vx_error *error; - auto idx = vx_array_to_duckdb_chunk(array, current_row, output, cache->cache, &error); - HandleError(error); - return idx; + static duckdb::unique_ptr FromDuckDBChunk(DType &dtype, duckdb::DataChunk &chunk) { + auto array = Try([&](auto err) { + return vx_duckdb_chunk_to_array(reinterpret_cast(&chunk), dtype.dtype, err); + }); + + return duckdb::make_uniq(array); + } + + idx_t ToDuckDBVector(idx_t current_row, duckdb_data_chunk output, const ConversionCache *cache) const { + return Try([&](auto err) { return vx_array_to_duckdb_chunk(array, current_row, output, cache->cache, err); }); } vx_array *array; }; -struct VortexArrayStream { - explicit VortexArrayStream(vx_array_stream *array_stream) : array_stream(array_stream) { +struct ArrayIterator { + explicit ArrayIterator(vx_array_iterator *array_iter) : array_iter(array_iter) { } - ~VortexArrayStream() { - vx_array_stream_free(array_stream); + ~ArrayIterator() { + vx_array_iter_free(array_iter); } - duckdb::unique_ptr NextArray() const { - vx_error *error; - auto array = vx_array_stream_next(array_stream, &error); - HandleError(error); + duckdb::unique_ptr NextArray() const { + auto array = Try([&](auto err) { return vx_array_iter_next(array_iter, err); }); + if (array == nullptr) { return nullptr; } - return duckdb::make_uniq(array); + + return duckdb::make_uniq(array); } - vx_array_stream *array_stream; + vx_array_iterator *array_iter; }; + +struct ArrayStreamSink { + explicit ArrayStreamSink(vx_array_sink *sink, duckdb::unique_ptr dtype) + : sink(sink), dtype(std::move(dtype)) { + } + + static duckdb::unique_ptr Create(std::string file_path, duckdb::unique_ptr &&dtype) { + auto sink = Try([&](auto err) { return vx_array_sink_open_file(file_path.c_str(), dtype->dtype, err); }); + return duckdb::make_uniq(sink, std::move(dtype)); + } + + void PushChunk(duckdb::DataChunk &chunk) { + auto array = Array::FromDuckDBChunk(*dtype, chunk); + Try([&](auto err) { vx_array_sink_push(sink, array->array, err); }); + } + + void Close() { + Try([&](auto err) { vx_array_sink_close(sink, err); }); + this->sink = nullptr; + } + + ~ArrayStreamSink() { + // "should dctor a sink, before closing it + D_ASSERT(sink == nullptr); + } + + vx_array_sink *sink; + duckdb::unique_ptr dtype; +}; + +} // namespace vortex diff --git a/src/include/vortex_error.hpp b/src/include/vortex_error.hpp index 9600285..b41565d 100644 --- a/src/include/vortex_error.hpp +++ b/src/include/vortex_error.hpp @@ -1,7 +1,13 @@ #pragma once +#include +#include + +#include "duckdb.hpp" #include "vortex.hpp" +namespace vortex { + inline void HandleError(vx_error *error) { if (error != nullptr) { auto msg = std::string(vx_error_get_message(error)); @@ -9,3 +15,19 @@ inline void HandleError(vx_error *error) { throw duckdb::InvalidInputException(msg); } } + +template +auto Try(Func func) { + vx_error *error = nullptr; + // Handle both void and non-void return types. + if constexpr (std::is_void_v>) { + func(&error); + HandleError(error); + } else { + auto result = func(&error); + HandleError(error); + return result; + } +} + +} // namespace vortex diff --git a/src/include/expr/expr.hpp b/src/include/vortex_expr.hpp similarity index 81% rename from src/include/expr/expr.hpp rename to src/include/vortex_expr.hpp index 47dab3b..cdf831f 100644 --- a/src/include/expr/expr.hpp +++ b/src/include/vortex_expr.hpp @@ -1,14 +1,14 @@ #pragma once -#include "expr.pb.h" #include "duckdb/planner/expression.hpp" +#include "duckdb/planner/table_filter.hpp" -#include +#include "expr.pb.h" +namespace vortex { vortex::expr::Expr *table_expression_into_expr(google::protobuf::Arena &arena, duckdb::TableFilter &filter, const std::string &column_name); - vortex::expr::Expr *expression_into_vortex_expr(google::protobuf::Arena &arena, const duckdb::Expression &expr); - vortex::expr::Expr *flatten_exprs(google::protobuf::Arena &arena, - const duckdb::vector &child_filters); \ No newline at end of file + const duckdb::vector &child_filters); +} // namespace vortex diff --git a/src/include/vortex_extension.hpp b/src/include/vortex_extension.hpp index eade158..fc34e42 100644 --- a/src/include/vortex_extension.hpp +++ b/src/include/vortex_extension.hpp @@ -2,13 +2,11 @@ #include "duckdb.hpp" -namespace duckdb { +// The entry point class API can't be scoped to the vortex namespace. -class VortexExtension : public Extension { +class VortexExtension : public duckdb::Extension { public: - void Load(DuckDB &db) override; + void Load(duckdb::DuckDB &db) override; std::string Name() override; std::string Version() const override; }; - -} // namespace duckdb diff --git a/src/include/vortex_layout_reader.hpp b/src/include/vortex_layout_reader.hpp new file mode 100644 index 0000000..99ad14a --- /dev/null +++ b/src/include/vortex_layout_reader.hpp @@ -0,0 +1,29 @@ +#pragma once + +#include "vortex_common.hpp" +#include "vortex_expr.hpp" + +namespace vortex { + +class LayoutReader { +public: + explicit LayoutReader(vx_layout_reader *reader) : reader(reader) { + } + + ~LayoutReader() { + vx_layout_reader_free(reader); + } + + static std::shared_ptr CreateFromFile(vortex::FileReader *file) { + auto reader = Try([&](auto err) { return vx_layout_reader_create(file->file, err); }); + return std::make_shared(reader); + } + + vx_array_iterator *Scan(const vx_file_scan_options *options) { + return Try([&](auto err) { return vx_layout_reader_scan(this->reader, options, err); }); + } + + vx_layout_reader *reader; +}; + +} // namespace vortex diff --git a/src/include/vortex_scan.hpp b/src/include/vortex_scan.hpp index 809b40f..66383ba 100644 --- a/src/include/vortex_scan.hpp +++ b/src/include/vortex_scan.hpp @@ -2,9 +2,6 @@ #include "duckdb/main/extension_util.hpp" - -namespace duckdb { - -void RegisterVortexScanFunction(DatabaseInstance &instance); - -} \ No newline at end of file +namespace vortex { +void RegisterScanFunction(duckdb::DatabaseInstance &instance); +} diff --git a/src/include/vortex_write.hpp b/src/include/vortex_write.hpp index 5e0b162..c8a0503 100644 --- a/src/include/vortex_write.hpp +++ b/src/include/vortex_write.hpp @@ -2,6 +2,6 @@ #include "duckdb/main/extension_util.hpp" -namespace duckdb { - void RegisterVortexWriteFunction(duckdb::DatabaseInstance &instance); -} \ No newline at end of file +namespace vortex { +void RegisterWriteFunction(duckdb::DatabaseInstance &instance); +} diff --git a/src/expr/expr.cpp b/src/vortex_expr.cpp similarity index 77% rename from src/expr/expr.cpp rename to src/vortex_expr.cpp index eeb2917..fae6f17 100644 --- a/src/expr/expr.cpp +++ b/src/vortex_expr.cpp @@ -1,21 +1,22 @@ -#include "expr/expr.hpp" +#include + #include "duckdb/planner/expression.hpp" #include "duckdb/planner/table_filter.hpp" #include "duckdb/planner/filter/constant_filter.hpp" #include "duckdb/common/exception.hpp" - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include "duckdb/parser/expression/columnref_expression.hpp" +#include "duckdb/parser/expression/comparison_expression.hpp" +#include "duckdb/parser/expression/constant_expression.hpp" +#include "duckdb/planner/expression/bound_between_expression.hpp" +#include "duckdb/planner/expression/bound_columnref_expression.hpp" +#include "duckdb/planner/expression/bound_comparison_expression.hpp" +#include "duckdb/planner/expression/bound_constant_expression.hpp" +#include "duckdb/planner/expression/bound_function_expression.hpp" +#include "duckdb/planner/expression/bound_operator_expression.hpp" +#include "duckdb/planner/filter/conjunction_filter.hpp" +#include "duckdb/planner/filter/optional_filter.hpp" + +#include "vortex_expr.hpp" using duckdb::ConjunctionAndFilter; using duckdb::ConstantFilter; @@ -30,6 +31,8 @@ using duckdb::Value; using google::protobuf::Arena; using std::string; +namespace vortex { + // vortex expr proto ids. const string BETWEEN_ID = "between"; const string BINARY_ID = "binary"; @@ -59,16 +62,16 @@ enum TimeUnit : uint8_t { D = 4, }; -vortex::expr::Kind_BinaryOp into_binary_operation(ExpressionType type) { - static const std::unordered_map op_map = { - {ExpressionType::COMPARE_EQUAL, vortex::expr::Kind_BinaryOp_Eq}, - {ExpressionType::COMPARE_NOTEQUAL, vortex::expr::Kind_BinaryOp_NotEq}, - {ExpressionType::COMPARE_LESSTHAN, vortex::expr::Kind_BinaryOp_Lt}, - {ExpressionType::COMPARE_GREATERTHAN, vortex::expr::Kind_BinaryOp_Gt}, - {ExpressionType::COMPARE_LESSTHANOREQUALTO, vortex::expr::Kind_BinaryOp_Lte}, - {ExpressionType::COMPARE_GREATERTHANOREQUALTO, vortex::expr::Kind_BinaryOp_Gte}, - {ExpressionType::CONJUNCTION_AND, vortex::expr::Kind_BinaryOp_And}, - {ExpressionType::CONJUNCTION_OR, vortex::expr::Kind_BinaryOp_Or}}; +expr::Kind_BinaryOp into_binary_operation(ExpressionType type) { + static const std::unordered_map op_map = { + {ExpressionType::COMPARE_EQUAL, expr::Kind_BinaryOp_Eq}, + {ExpressionType::COMPARE_NOTEQUAL, expr::Kind_BinaryOp_NotEq}, + {ExpressionType::COMPARE_LESSTHAN, expr::Kind_BinaryOp_Lt}, + {ExpressionType::COMPARE_GREATERTHAN, expr::Kind_BinaryOp_Gt}, + {ExpressionType::COMPARE_LESSTHANOREQUALTO, expr::Kind_BinaryOp_Lte}, + {ExpressionType::COMPARE_GREATERTHANOREQUALTO, expr::Kind_BinaryOp_Gte}, + {ExpressionType::CONJUNCTION_AND, expr::Kind_BinaryOp_And}, + {ExpressionType::CONJUNCTION_OR, expr::Kind_BinaryOp_Or}}; auto value = op_map.find(type); if (value == op_map.end()) { @@ -94,8 +97,8 @@ TimeUnit timestamp_to_time_unit(const LogicalType &type) { } } -vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, bool nullable) { - auto *dtype = Arena::Create(&arena); +dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, bool nullable) { + auto *dtype = Arena::Create(&arena); switch (type_.id()) { case LogicalTypeId::INVALID: case LogicalTypeId::SQLNULL: @@ -106,43 +109,43 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, return dtype; case LogicalTypeId::TINYINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I8); + dtype->mutable_primitive()->set_type(dtype::I8); return dtype; case LogicalTypeId::SMALLINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I16); + dtype->mutable_primitive()->set_type(dtype::I16); return dtype; case LogicalTypeId::INTEGER: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I32); + dtype->mutable_primitive()->set_type(dtype::I32); return dtype; case LogicalTypeId::BIGINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::I64); + dtype->mutable_primitive()->set_type(dtype::I64); return dtype; case LogicalTypeId::UTINYINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U8); + dtype->mutable_primitive()->set_type(dtype::U8); return dtype; case LogicalTypeId::USMALLINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U16); + dtype->mutable_primitive()->set_type(dtype::U16); return dtype; case LogicalTypeId::UINTEGER: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U32); + dtype->mutable_primitive()->set_type(dtype::U32); return dtype; case LogicalTypeId::UBIGINT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::U64); + dtype->mutable_primitive()->set_type(dtype::U64); return dtype; case LogicalTypeId::FLOAT: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::F32); + dtype->mutable_primitive()->set_type(dtype::F32); return dtype; case LogicalTypeId::DOUBLE: dtype->mutable_primitive()->set_nullable(nullable); - dtype->mutable_primitive()->set_type(vortex::dtype::F64); + dtype->mutable_primitive()->set_type(dtype::F64); return dtype; case LogicalTypeId::DECIMAL: { dtype->mutable_decimal()->set_nullable(nullable); @@ -162,7 +165,7 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, dtype->mutable_extension()->set_id(VORTEX_DATE_ID); auto storage = dtype->mutable_extension()->mutable_storage_dtype(); storage->mutable_primitive()->set_nullable(nullable); - storage->mutable_primitive()->set_type(vortex::dtype::I32); + storage->mutable_primitive()->set_type(dtype::I32); dtype->mutable_extension()->set_metadata(std::string({static_cast(TimeUnit::D)})); return dtype; } @@ -170,7 +173,7 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, dtype->mutable_extension()->set_id(VORTEX_TIME_ID); auto storage = dtype->mutable_extension()->mutable_storage_dtype(); storage->mutable_primitive()->set_nullable(nullable); - storage->mutable_primitive()->set_type(vortex::dtype::I32); + storage->mutable_primitive()->set_type(dtype::I32); dtype->mutable_extension()->set_metadata(std::string({static_cast(TimeUnit::Us)})); return dtype; } @@ -181,7 +184,7 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, dtype->mutable_extension()->set_id(VORTEX_TIMESTAMP_ID); auto storage = dtype->mutable_extension()->mutable_storage_dtype(); storage->mutable_primitive()->set_nullable(nullable); - storage->mutable_primitive()->set_type(vortex::dtype::I64); + storage->mutable_primitive()->set_type(dtype::I64); auto time_unit = static_cast(timestamp_to_time_unit(type_)); // This signifies a timestamp without a timezone // TODO(joe): support timezones @@ -193,15 +196,15 @@ vortex::dtype::DType *into_vortex_dtype(Arena &arena, const LogicalType &type_, } } -vortex::scalar::Scalar *into_null_scalar(Arena &arena, LogicalType &logical_type) { - auto scalar = Arena::Create(&arena); +scalar::Scalar *into_null_scalar(Arena &arena, LogicalType &logical_type) { + auto scalar = Arena::Create(&arena); scalar->set_allocated_dtype(into_vortex_dtype(arena, logical_type, true)); scalar->mutable_value()->set_null_value(google::protobuf::NULL_VALUE); return scalar; } -vortex::scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, bool nullable) { - auto scalar = Arena::Create(&arena); +scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, bool nullable) { + auto scalar = Arena::Create(&arena); auto dtype = into_vortex_dtype(arena, value.type(), nullable); scalar->set_allocated_dtype(dtype); @@ -281,7 +284,7 @@ vortex::scalar::Scalar *into_vortex_scalar(Arena &arena, const Value &value, boo } } -void set_column(const string &s, vortex::expr::Expr *column) { +void set_column(const string &s, expr::Expr *column) { column->set_id(GET_ITEM_ID); auto kind = column->mutable_kind(); auto get_item = kind->mutable_get_item(); @@ -292,15 +295,15 @@ void set_column(const string &s, vortex::expr::Expr *column) { id->set_id(IDENTITY_ID); } -void set_literal(Arena &arena, const Value &value, bool nullable, vortex::expr::Expr *constant) { +void set_literal(Arena &arena, const Value &value, bool nullable, expr::Expr *constant) { auto literal = constant->mutable_kind()->mutable_literal(); auto dvalue = into_vortex_scalar(arena, value, nullable); literal->set_allocated_value(dvalue); constant->set_id(LITERAL_ID); } -vortex::expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector> &child_filters, - const string &column_name) { +expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector> &child_filters, + const string &column_name) { D_ASSERT(!child_filters.empty()); if (child_filters.size() == 1) { @@ -308,14 +311,14 @@ vortex::expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector(nullptr); - auto hd = Arena::Create(&arena); + auto tail = static_cast(nullptr); + auto hd = Arena::Create(&arena); // Flatten the list of children into a linked list of AND values. for (size_t i = 0; i < child_filters.size() - 1; i++) { - vortex::expr::Expr *new_and = !tail ? hd : tail->add_children(); + expr::Expr *new_and = !tail ? hd : tail->add_children(); new_and->set_id(BINARY_ID); - new_and->mutable_kind()->set_binary_op(vortex::expr::Kind::And); + new_and->mutable_kind()->set_binary_op(expr::Kind::And); new_and->add_children()->Swap(table_expression_into_expr(arena, *child_filters[i], column_name)); tail = new_and; @@ -324,10 +327,10 @@ vortex::expr::Expr *flatten_table_filters(Arena &arena, duckdb::vector &child_filters) { +expr::Expr *flatten_exprs(Arena &arena, const duckdb::vector &child_filters) { if (child_filters.empty()) { - auto expr = arena.Create(&arena); + auto expr = arena.Create(&arena); set_literal(arena, Value(true), true, expr); return expr; } @@ -337,14 +340,14 @@ vortex::expr::Expr *flatten_exprs(Arena &arena, const duckdb::vector(nullptr); - auto hd = Arena::Create(&arena); + auto tail = static_cast(nullptr); + auto hd = Arena::Create(&arena); // Flatten the list of children into a linked list of AND values. for (size_t i = 0; i < child_filters.size() - 1; i++) { - vortex::expr::Expr *new_and = !tail ? hd : tail->add_children(); + expr::Expr *new_and = !tail ? hd : tail->add_children(); new_and->set_id(BINARY_ID); - new_and->mutable_kind()->set_binary_op(vortex::expr::Kind::And); + new_and->mutable_kind()->set_binary_op(expr::Kind::And); new_and->add_children()->Swap(child_filters[i]); tail = new_and; @@ -366,8 +369,8 @@ std::optional expr_to_like_pattern(const duckdb::Expression &dexpr) { } } -vortex::expr::Expr *expression_into_vortex_expr(Arena &arena, const duckdb::Expression &dexpr) { - auto expr = Arena::Create(&arena); +expr::Expr *expression_into_vortex_expr(Arena &arena, const duckdb::Expression &dexpr) { + auto expr = Arena::Create(&arena); switch (dexpr.expression_class) { case duckdb::ExpressionClass::BOUND_COLUMN_REF: { auto &dcol_ref = dexpr.Cast(); @@ -449,8 +452,8 @@ vortex::expr::Expr *expression_into_vortex_expr(Arena &arena, const duckdb::Expr } } -vortex::expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter, const string &column_name) { - auto expr = Arena::Create(&arena); +expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter, const string &column_name) { + auto expr = Arena::Create(&arena); switch (filter.filter_type) { case TableFilterType::CONSTANT_COMPARISON: { auto &constant_filter = filter.Cast(); @@ -485,3 +488,5 @@ vortex::expr::Expr *table_expression_into_expr(Arena &arena, TableFilter &filter throw Exception(ExceptionType::NOT_IMPLEMENTED, "table_expression_into_expr", {{"filter_type_id", std::to_string(static_cast(filter.filter_type))}}); } + +} // namespace vortex diff --git a/src/vortex_extension.cpp b/src/vortex_extension.cpp index 121647b..4f702b6 100644 --- a/src/vortex_extension.cpp +++ b/src/vortex_extension.cpp @@ -1,18 +1,14 @@ #define DUCKDB_EXTENSION_MAIN -#define ENABLE_DUCKDB_FFI - #include "duckdb/main/extension_util.hpp" -#include "vortex_extension.hpp" +#include "vortex_extension.hpp" #include "vortex_write.hpp" #include "vortex_scan.hpp" -#ifndef DUCKDB_EXTENSION_MAIN -#error DUCKDB_EXTENSION_MAIN not defined -#endif +using namespace duckdb; -namespace duckdb { +// The entry point class API can't be scoped to the vortex namespace. /// Called when the extension is loaded by DuckDB. /// It is responsible for registering functions and initializing state. @@ -22,8 +18,8 @@ namespace duckdb { void VortexExtension::Load(DuckDB &db) { DatabaseInstance &instance = *db.instance; - RegisterVortexWriteFunction(instance); - RegisterVortexScanFunction(instance); + vortex::RegisterWriteFunction(instance); + vortex::RegisterScanFunction(instance); } /// Returns the name of the Vortex extension. @@ -43,12 +39,10 @@ std::string VortexExtension::Version() const { return "0.1.0"; } -} // namespace duckdb - extern "C" { DUCKDB_EXTENSION_API void vortex_init(duckdb::DatabaseInstance &db) { duckdb::DuckDB db_wrapper(db); - db_wrapper.LoadExtension(); + db_wrapper.LoadExtension(); } DUCKDB_EXTENSION_API const char *vortex_version() { diff --git a/src/vortex_scan.cpp b/src/vortex_scan.cpp index be6fbac..8a79169 100644 --- a/src/vortex_scan.cpp +++ b/src/vortex_scan.cpp @@ -1,4 +1,10 @@ -#include "vortex_scan.hpp" +#define ENABLE_DUCKDB_FFI + +#include +#include +#include +#include +#include #include "duckdb/common/exception.hpp" #include "duckdb/common/helper.hpp" @@ -6,143 +12,154 @@ #include "duckdb/function/table_function.hpp" #include "duckdb/main/extension_util.hpp" #include "duckdb/common/file_system.hpp" -#include "vortex_extension.hpp" -#include +#include "concurrentqueue.h" +#include "vortex.hpp" +#include "vortex_extension.hpp" +#include "vortex_layout_reader.hpp" +#include "vortex_scan.hpp" #include "vortex_common.hpp" -#include "expr/expr.hpp" - -namespace duckdb { +#include "vortex_expr.hpp" -// A value large enough that most systems can use all their threads. -// This is used to allocate `file_slots`, we could remove this later by having the init_local method increase the -// file slots size for each running. -constexpr uint32_t MAX_THREAD_COUNT = 192; +using namespace duckdb; -// This is a multiple of the 2048 duckdb vector size, it needs tuning -// This has a few factor effecting it: -// 1. A smaller value means for work for the vortex file reader. -// 2. A larger value reduces the parallelism available to the scanner -constexpr uint32_t ROW_SPLIT_COUNT = 2048 * 32; +namespace vortex { /// Bind data for the Vortex table function that holds information about the /// file and its schema. This data is populated during the bind phase, which /// happens during the query planning phase. -struct VortexBindData : public TableFunctionData { +struct BindData : public TableFunctionData { + shared_ptr file_list; vector columns_types; vector column_names; - uint64_t num_columns; - unique_ptr initial_file; - shared_ptr file_list; + // Used to read the schema during the bind phase and cached here to + // avoid having to open the same file again during the scan phase. + unique_ptr initial_file; // Used to create an arena for protobuf exprs, need a ptr since the bind arg is const. unique_ptr arena; - vector conjuncts; + vector conjuncts; bool Equals(const FunctionData &other_p) const override { - auto &other = other_p.Cast(); + auto &other = other_p.Cast(); return file_list == other.file_list && column_names == other.column_names && - columns_types == other.columns_types && num_columns == other.num_columns; + columns_types == other.columns_types; } unique_ptr Copy() const override { - auto result = make_uniq(); + auto result = make_uniq(); result->file_list = file_list; result->columns_types = columns_types; result->column_names = column_names; - result->num_columns = num_columns; return std::move(result); } }; +struct ScanPartition { + uint64_t file_idx; + uint64_t start_row; + uint64_t end_row; +}; + /// Local state for the Vortex table function that tracks the progress of a scan /// operation. In DuckDB's execution model, a query reading from a file can be /// parallelized by dividing it into ranges, each handled by a different scan. -struct VortexScanLocalState : public LocalTableFunctionState { - idx_t current_row; - bool finished; - unique_ptr array; - unique_ptr cache; - uint32_t thread_id; - - explicit VortexScanLocalState(uint32_t thread_id) - : current_row(0), finished(false), array(nullptr), cache(nullptr), thread_id(thread_id) { - } -}; - -struct FileSlot { - std::mutex slot_lock; - unique_ptr array_stream; +struct ScanLocalState : public LocalTableFunctionState { + idx_t array_row_offset; + unique_ptr scanned_array; + unique_ptr array_iterator; + unique_ptr conversion_cache; + + // The vector is used as a contiguous memory queue implementation, + // with the partition index pointing to the next partition to process. + idx_t partition_idx; + std::vector scan_partitions; + + // Thread local file. + std::optional thread_local_file_idx; }; -struct VortexScanGlobalState : public GlobalTableFunctionState { - // Must be <= MAX_THREAD_COUNT. - std::atomic_uint32_t thread_id_counter; +struct ScanGlobalState : public GlobalTableFunctionState { std::atomic_bool finished; + std::atomic_uint64_t cache_id; - std::uint64_t cache_id; - - // Each thread owns a file slot and is the thing only one allowed to modify the slot itself. - // Other threads can work-steal array batches from the slot, by taking out the mutex in the FileSlot. - // We allocate MAX_THREAD_COUNT threads, the max number threads allowed by this extension. - std::array file_slots; - - std::atomic_uint32_t next_file; vector expanded_files; optional_ptr filter; - // The precomputed filter string used in the query std::string filter_str; - // The precomputed column names used in the query - std::vector projected_column_names; + + // Limited to indicate progress in `table_scan_progress`. + std::atomic_uint32_t partitions_processed; + std::atomic_uint32_t partitons_total; + + // Number of files which have are fully partitioned. + std::atomic_uint32_t files_partitioned; + + // Next file to partition. + std::atomic_uint32_t next_file_idx; + + // Multi producer, multi consumer lockfree queue. + duckdb_moodycamel::ConcurrentQueue scan_partitions {8192}; + + std::vector> layout_readers; // The column idx that must be returned by the scan. vector column_ids; vector projection_ids; + // The precomputed column names used in the query. + std::vector projected_column_names; // This is the max number threads that the extension might use. idx_t MaxThreads() const override { + constexpr uint32_t MAX_THREAD_COUNT = 192; return MAX_THREAD_COUNT; } - - explicit VortexScanGlobalState() - : thread_id_counter(0), finished(false), cache_id(0), file_slots(), next_file(0), filter(nullptr) { - } }; // Use to create vortex expressions from `TableFilterSet` filter. void CreateFilterExpression(google::protobuf::Arena &arena, vector column_names, optional_ptr filter, vector column_ids, - vector &conjuncts) { + vector &conjuncts) { if (filter == nullptr) { return; } for (const auto &[col_id, value] : filter->filters) { - auto col_name = column_names[column_ids[col_id]]; - auto conj = table_expression_into_expr(arena, *value, col_name); + auto column_name = column_names[column_ids[col_id]]; + auto conj = table_expression_into_expr(arena, *value, column_name); conjuncts.push_back(conj); } } +static void PopulateProjection(ScanGlobalState &global_state, const vector &column_names, + TableFunctionInitInput &input) { + global_state.projection_ids.reserve(input.projection_ids.size()); + for (auto proj_id : input.projection_ids) { + global_state.projection_ids.push_back(input.column_ids[proj_id]); + } + + global_state.projected_column_names.reserve(input.projection_ids.size()); + for (auto column_id : global_state.projection_ids) { + assert(column_id < column_names.size()); + global_state.projected_column_names.push_back(column_names[column_id].c_str()); + } +} + /// Extracts schema information from a Vortex file's data type. -static void ExtractVortexSchema(const vx_dtype *file_dtype, vector &column_types, - vector &column_names) { - uint32_t field_count = vx_dtype_field_count(file_dtype); +static void ExtractVortexSchema(DType &file_dtype, vector &column_types, vector &column_names) { + uint32_t field_count = vx_dtype_field_count(file_dtype.dtype); for (uint32_t idx = 0; idx < field_count; idx++) { char name_buffer[512]; int name_len = 0; - vx_dtype_field_name(file_dtype, idx, name_buffer, &name_len); + vx_dtype_field_name(file_dtype.dtype, idx, name_buffer, &name_len); std::string field_name(name_buffer, name_len); - vx_dtype *field_dtype = vx_dtype_field_dtype(file_dtype, idx); - vx_error *error = nullptr; - auto duckdb_type = vx_dtype_to_duckdb_logical_type(field_dtype, &error); - HandleError(error); + vx_dtype *field_dtype = vx_dtype_field_dtype(file_dtype.dtype, idx); + auto duckdb_type = Try([&](auto err) { return vx_dtype_to_duckdb_logical_type(field_dtype, err); }); column_names.push_back(field_name); column_types.push_back(LogicalType(*reinterpret_cast(duckdb_type))); @@ -151,10 +168,9 @@ static void ExtractVortexSchema(const vx_dtype *file_dtype, vector } } -const std::regex schema_prefix = std::regex("^[^/]*:\\/\\/.*$"); - std::string EnsureFileProtocol(FileSystem &fs, const std::string &path) { // If the path is a URL then don't change it, otherwise try to make the path an absolute path + static const std::regex schema_prefix = std::regex("^[^/]*:\\/\\/.*$"); if (std::regex_match(path, schema_prefix)) { return path; } @@ -168,19 +184,19 @@ std::string EnsureFileProtocol(FileSystem &fs, const std::string &path) { return prefix + absolute_path; } -static unique_ptr OpenFile(const std::string &filename, vector &column_types, - vector &column_names) { +static unique_ptr OpenFile(const std::string &filename, vector &column_types, + vector &column_names) { vx_file_open_options options { .uri = filename.c_str(), .property_keys = nullptr, .property_vals = nullptr, .property_len = 0}; - auto file = VortexFileReader::Open(&options); + auto file = FileReader::Open(&options); if (!file) { throw IOException("Failed to open Vortex file: " + filename); } - // This Ptr is owned by the file - const vx_dtype *file_dtype = vx_file_dtype(file->file); - if (vx_dtype_get(file_dtype) != DTYPE_STRUCT) { + // This pointer is owned by the file. + auto file_dtype = file->DType(); + if (vx_dtype_get(file_dtype.dtype) != DTYPE_STRUCT) { vx_file_reader_free(file->file); throw FatalException("Vortex file does not contain a struct array as a top-level dtype"); } @@ -190,107 +206,197 @@ static unique_ptr OpenFile(const std::string &filename, vector return file; } -static void VerifyNewFile(const VortexBindData &bind_data, vector &column_types, - vector &column_names) { +// Verifies that a new Vortex file's schema matches the expected schema from the bind phase. +// +// This function ensures schema consistency across all the files in a multi-file query. +// It compares the column types and names extracted from a new file against the schema +// obtained from the first file (stored in bind_data). +static void VerifyNewFile(const BindData &bind_data, vector &column_types, vector &column_names) { if (column_types.size() != bind_data.columns_types.size() || column_names != bind_data.column_names) { throw FatalException("Vortex file does not contain the same number of columns as the first"); } - for (auto idx = 0u; idx < bind_data.columns_types.size(); idx++) { - auto col_name = bind_data.column_names[idx]; - auto col_type = bind_data.columns_types[idx]; - if (col_name != column_names[idx]) { + + for (size_t idx = 0; idx < bind_data.columns_types.size(); ++idx) { + if (bind_data.column_names[idx] != column_names[idx]) { throw FatalException("Vortex file contains a column with a different name to the first"); } - if (col_type != column_types[idx]) { + if (bind_data.columns_types[idx] != column_types[idx]) { throw FatalException("Vortex file contains a column with a different type to the first"); } } } -static unique_ptr OpenFileAndVerify(FileSystem &fs, const std::string &filename, - const VortexBindData &bind_data) { +static unique_ptr OpenFileAndVerify(FileSystem &fs, const std::string &filename, + const BindData &bind_data) { auto new_column_names = vector(); new_column_names.reserve(bind_data.column_names.size()); + auto new_column_types = vector(); - new_column_names.reserve(bind_data.columns_types.size()); + new_column_types.reserve(bind_data.columns_types.size()); auto file = OpenFile(EnsureFileProtocol(fs, filename), new_column_types, new_column_names); VerifyNewFile(bind_data, new_column_types, new_column_names); return file; } -static unique_ptr OpenArrayStream(const VortexBindData &bind_data, - VortexScanGlobalState &global_state, VortexFileReader *file) { - auto options = vx_file_scan_options { +static bool PinFileToThread(ScanGlobalState &global_state) { + const auto thread_count = std::thread::hardware_concurrency(); + const auto file_count = global_state.expanded_files.size(); + return (file_count - global_state.files_partitioned) > thread_count; +} + +static void CreateScanPartitions(ClientContext &context, const BindData &bind, ScanGlobalState &global_state, + ScanLocalState &local_state, uint64_t file_idx, unique_ptr &file_reader) { + const auto file_name = global_state.expanded_files[file_idx]; + const auto row_count = Try([&](auto err) { return vx_file_row_count(file_reader->file, err); }); + + const auto thread_count = std::thread::hardware_concurrency(); + const auto file_count = global_state.expanded_files.size(); + + // This is a multiple of the 2048 DuckDB vector size: + // + // Factors to consider: + // 1. A smaller value means more work for the Vortex file reader. + // 2. A larger value reduces the parallelism available to the scanner + const uint64_t partition_size = 2048 * (thread_count > file_count ? 32 : 64); + + const auto partition_count = std::max(static_cast(1), row_count / partition_size); + global_state.partitons_total += partition_count; + + if (PinFileToThread(global_state)) { + local_state.partition_idx = 0; + local_state.scan_partitions.clear(); + local_state.thread_local_file_idx = file_idx; + + for (size_t partition_idx = 0; partition_idx < partition_count; ++partition_idx) { + local_state.scan_partitions.push_back(ScanPartition { + .file_idx = file_idx, + .start_row = partition_idx * partition_size, + .end_row = (partition_idx + 1) == partition_count ? row_count : (partition_idx + 1) * partition_size, + }); + } + } else { + for (size_t partition_idx = 0; partition_idx < partition_count; ++partition_idx) { + global_state.scan_partitions.enqueue(ScanPartition { + .file_idx = file_idx, + .start_row = partition_idx * partition_size, + .end_row = (partition_idx + 1) == partition_count ? row_count : (partition_idx + 1) * partition_size, + }); + } + } + + global_state.files_partitioned += 1; +} + +static unique_ptr OpenArrayIter(ScanGlobalState &global_state, + std::shared_ptr &layout_reader, + ScanPartition row_range_partition) { + const auto options = vx_file_scan_options { .projection = global_state.projected_column_names.data(), - .projection_len = static_cast(global_state.projected_column_names.size()), + .projection_len = static_cast(global_state.projected_column_names.size()), .filter_expression = global_state.filter_str.data(), - .filter_expression_len = static_cast(global_state.filter_str.length()), - .split_by_row_count = ROW_SPLIT_COUNT, + .filter_expression_len = static_cast(global_state.filter_str.length()), + .split_by_row_count = 0, + .row_range_start = row_range_partition.start_row, + .row_range_end = row_range_partition.end_row, }; - vx_error *error = nullptr; - auto scan = vx_file_scan(file->file, &options, &error); - HandleError(error); - - return make_uniq(scan); + return make_uniq(layout_reader->Scan(&options)); } -static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) { - auto &bind_data = data.bind_data->Cast(); // NOLINT - auto &global_state = data.global_state->Cast(); // NOLINT - auto &local_state = data.local_state->Cast(); // NOLINT - - if (local_state.array == nullptr) { - auto &slot = global_state.file_slots[local_state.thread_id]; - std::lock_guard _l(slot.slot_lock); +// Assigns the next array from the array stream. +// +// Returns true if a new array was assigned, false otherwise. +static bool GetNextArray(ClientContext &context, const BindData &bind_data, ScanGlobalState &global_state, + ScanLocalState &local_state, DataChunk &output) { - if (global_state.finished.load() && slot.array_stream == nullptr) { - output.SetCardinality(0); - return; + // Try to deque a partition off the thread local queue. + auto try_dequeue = [&](ScanPartition &scan_partition) { + if (local_state.scan_partitions.size() == local_state.partition_idx) { + return false; } + scan_partition = local_state.scan_partitions[local_state.partition_idx++]; + return true; + }; - // 1. check we can make progress on current owned file - // 2. check we can get another file - // todo: 3. check if we can work steal from another thread - // 4. we are done + if (local_state.array_iterator == nullptr) { + ScanPartition partition; - while (local_state.array == nullptr) { - if (slot.array_stream == nullptr) { - auto file_idx = global_state.next_file.fetch_add(1); + if (bool success = (try_dequeue(partition) || global_state.scan_partitions.try_dequeue(partition)); !success) { - if (file_idx >= global_state.expanded_files.size()) { - local_state.finished = true; + // Check if all files have been partitioned. + if (global_state.files_partitioned == global_state.expanded_files.size()) { + + // A new partition might have been created after the first pop. Therefore, + // one more pop is necessary to ensure no more partitions are left to process. + if (success = global_state.scan_partitions.try_dequeue(partition); !success) { global_state.finished = true; - output.Reset(); - return; + return false; } + } - auto file_name = global_state.expanded_files[file_idx]; - auto file = OpenFileAndVerify(FileSystem::GetFileSystem(context), file_name, bind_data); + if (!success) { + return false; + } + } + + auto layout_reader = global_state.layout_readers[partition.file_idx]; + local_state.array_iterator = OpenArrayIter(global_state, layout_reader, partition); + } - slot.array_stream = OpenArrayStream(bind_data, global_state, file.get()); + local_state.scanned_array = local_state.array_iterator->NextArray(); + local_state.array_row_offset = 0; + + if (local_state.scanned_array == nullptr) { + local_state.array_iterator = nullptr; + global_state.partitions_processed += 1; + + return false; + } + + return true; +} + +static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, DataChunk &output) { + auto &bind_data = data.bind_data->Cast(); + auto &global_state = data.global_state->Cast(); + auto &local_state = data.local_state->Cast(); + + if (local_state.scanned_array == nullptr) { + while (!GetNextArray(context, bind_data, global_state, local_state, output)) { + if (global_state.finished) { + output.Reset(); + output.SetCardinality(0); + return; } - local_state.array = slot.array_stream->NextArray(); - if (local_state.array == nullptr) { - slot.array_stream = nullptr; + // Free layout readers as long as we pin files to threads. + if (local_state.thread_local_file_idx.has_value() && PinFileToThread(global_state)) { + global_state.layout_readers[local_state.thread_local_file_idx.value()] = nullptr; + local_state.thread_local_file_idx.reset(); + } + + // Create new scan partitions in case the queue is empty. + if (auto file_idx = global_state.next_file_idx.fetch_add(1); + file_idx < global_state.expanded_files.size()) { + auto file_name = global_state.expanded_files[file_idx]; + auto vortex_file = OpenFileAndVerify(FileSystem::GetFileSystem(context), file_name, bind_data); + global_state.layout_readers[file_idx] = LayoutReader::CreateFromFile(vortex_file.get()); + CreateScanPartitions(context, bind_data, global_state, local_state, file_idx, vortex_file); } } - local_state.current_row = 0; } - if (local_state.cache == nullptr) { - // Create a unique value so each cache can be differentiated. - local_state.cache = make_uniq(global_state.cache_id++); + if (local_state.conversion_cache == nullptr) { + local_state.conversion_cache = make_uniq(global_state.cache_id++); } - local_state.current_row = local_state.array->ToDuckDBVector( - local_state.current_row, reinterpret_cast(&output), local_state.cache.get()); + local_state.array_row_offset = local_state.scanned_array->ToDuckDBVector( + local_state.array_row_offset, reinterpret_cast(&output), local_state.conversion_cache.get()); - if (local_state.current_row == 0) { - local_state.array = nullptr; - local_state.cache = nullptr; + if (local_state.array_row_offset == 0) { + local_state.scanned_array = nullptr; + local_state.conversion_cache = nullptr; } } @@ -300,16 +406,14 @@ static void VortexScanFunction(ClientContext &context, TableFunctionInput &data, /// like projection pushdown and predicate pushdown. static unique_ptr VortexBind(ClientContext &context, TableFunctionBindInput &input, vector &column_types, vector &column_names) { - auto result = make_uniq(); - + auto result = make_uniq(); result->arena = make_uniq(); - // Get the filename glob from the input. - auto vec = duckdb::vector {input.inputs[0].GetValue()}; - result->file_list = make_shared_ptr(context, vec, FileGlobOptions::DISALLOW_EMPTY); + auto file_glob = duckdb::vector {input.inputs[0].GetValue()}; + result->file_list = make_shared_ptr(context, file_glob, FileGlobOptions::DISALLOW_EMPTY); + // Open the first file to extract the schema. auto filename = EnsureFileProtocol(FileSystem::GetFileSystem(context), result->file_list->GetFirstFile()); - result->initial_file = OpenFile(filename, column_types, column_names); result->column_names = column_names; @@ -319,93 +423,73 @@ static unique_ptr VortexBind(ClientContext &context, TableFunction } unique_ptr VortexCardinality(ClientContext &context, const FunctionData *bind_data) { - auto &data = bind_data->Cast(); + auto &data = bind_data->Cast(); - return make_uniq(data.num_columns, data.num_columns); + auto row_count = data.initial_file->FileRowCount(); + if (data.file_list->GetTotalFileCount() == 1) { + return make_uniq(row_count, row_count); + } else { + return make_uniq(row_count * data.file_list->GetTotalFileCount()); + } } // Removes all filter expressions (from `filters`) which can be pushed down. void PushdownComplexFilter(ClientContext &context, LogicalGet &get, FunctionData *bind_data, vector> &filters) { - auto &bind = bind_data->Cast(); - if (filters.empty()) { return; } + auto &bind = bind_data->Cast(); bind.conjuncts.reserve(filters.size()); for (auto &filter : filters) { - auto expr = expression_into_vortex_expr(*bind.arena, *filter); - if (expr != nullptr) { + if (auto expr = expression_into_vortex_expr(*bind.arena, *filter); expr != nullptr) { bind.conjuncts.push_back(expr); } } } -void RegisterVortexScanFunction(DatabaseInstance &instance) { +void RegisterScanFunction(DatabaseInstance &instance) { TableFunction vortex_scan("read_vortex", {LogicalType::VARCHAR}, VortexScanFunction, VortexBind); vortex_scan.init_global = [](ClientContext &context, TableFunctionInitInput &input) -> unique_ptr { - auto &bind = input.bind_data->Cast(); - auto state = make_uniq(); - - state->filter = input.filters; - state->projection_ids.reserve(input.projection_ids.size()); - for (auto proj_id : input.projection_ids) { - state->projection_ids.push_back(input.column_ids[proj_id]); - } - - state->column_ids = input.column_ids; + auto &bind = input.bind_data->CastNoConst(); + auto global_state = make_uniq(); // TODO(joe): do this expansion gradually in the scan to avoid a slower start. - state->expanded_files = bind.file_list->GetAllFiles(); + global_state->expanded_files = bind.file_list->GetAllFiles(); + global_state->filter = input.filters; + global_state->column_ids = input.column_ids; - // Most expressions are extracted from `PushdownComplexFilter`, the final filters come from `input.filters`. - vector conjuncts; - std::copy(bind.conjuncts.begin(), bind.conjuncts.end(), std::back_inserter(conjuncts)); - CreateFilterExpression(*bind.arena, bind.column_names, input.filters, input.column_ids, conjuncts); - - auto column_names = std::vector(); - for (auto col_id : state->projection_ids) { - assert(col_id < bind.column_names.size()); - column_names.push_back(bind.column_names[col_id].c_str()); - } - state->projected_column_names = column_names; + PopulateProjection(*global_state, bind.column_names, input); - auto exprs = flatten_exprs(*bind.arena, bind.conjuncts); - if (exprs != nullptr) { - state->filter_str = exprs->SerializeAsString(); + // Most expressions are extracted from `PushdownComplexFilter`, the final filters come from `input.filters`. + CreateFilterExpression(*bind.arena, bind.column_names, input.filters, input.column_ids, bind.conjuncts); + if (auto exprs = flatten_exprs(*bind.arena, bind.conjuncts); exprs != nullptr) { + global_state->filter_str = exprs->SerializeAsString(); } - // Can ignore mutex since no other threads are running now. - state->file_slots[0].array_stream = OpenArrayStream(bind, *state, bind.initial_file.get()); - state->next_file = 1; + // Resizing the empty vector default constructs std::shared pointers at all indices with nullptr. + global_state->layout_readers.resize(global_state->expanded_files.size()); - // We are finished with the arena bind.arena->Reset(); - - return std::move(state); + return std::move(global_state); }; vortex_scan.init_local = [](ExecutionContext &context, TableFunctionInitInput &input, GlobalTableFunctionState *global_state) -> unique_ptr { - auto &v_global_state = global_state->Cast(); - - auto thread_id = v_global_state.thread_id_counter.fetch_add(1); - assert(thread_id < MAX_THREAD_COUNT); - - auto state = make_uniq(thread_id); - return state; + auto local_state = make_uniq(); + local_state->scan_partitions.reserve(4096); + return local_state; }; vortex_scan.table_scan_progress = [](ClientContext &context, const FunctionData *bind_data, - const GlobalTableFunctionState *global_state) { - auto &gstate = global_state->Cast(); - - return 100 * (static_cast(gstate.next_file.load()) / static_cast(gstate.expanded_files.size())); + const GlobalTableFunctionState *global_state) -> double { + auto &gstate = global_state->Cast(); + return 100.0 * (static_cast(gstate.partitions_processed) / static_cast(gstate.partitons_total)); }; vortex_scan.pushdown_complex_filter = PushdownComplexFilter; @@ -417,4 +501,4 @@ void RegisterVortexScanFunction(DatabaseInstance &instance) { ExtensionUtil::RegisterFunction(instance, vortex_scan); } -} // namespace duckdb +} // namespace vortex diff --git a/src/vortex_write.cpp b/src/vortex_write.cpp index e43358e..4a609e3 100644 --- a/src/vortex_write.cpp +++ b/src/vortex_write.cpp @@ -1,18 +1,20 @@ -#define ENABLE_DUCKDB_FFI - -#include "vortex_write.hpp" -#include "vortex_common.hpp" #include "duckdb/catalog/catalog_entry/table_catalog_entry.hpp" - #include "duckdb/common/exception.hpp" #include "duckdb/common/multi_file_reader.hpp" #include "duckdb/main/extension_util.hpp" #include "duckdb/function/copy_function.hpp" #include "duckdb/parser/constraints/not_null_constraint.hpp" -namespace duckdb { +#include "vortex_write.hpp" +#include "vortex_common.hpp" + +// TODO(joe): enable multi-threaded writes, see `WriteSink`. + +using namespace duckdb; + +namespace vortex { -struct VortexWriteBindData : public TableFunctionData { +struct WriteBindData : public TableFunctionData { //! True is the column is nullable vector column_nullable; @@ -20,29 +22,23 @@ struct VortexWriteBindData : public TableFunctionData { vector column_names; }; -struct VortexWriteGlobalData : public GlobalFunctionData { - std::string file_name; - std::unique_ptr file; - unique_ptr array; +struct WriteGlobalData : public GlobalFunctionData { + unique_ptr sink; }; -struct VortexWriteLocalData : public LocalFunctionData {}; +struct WriteLocalData : public LocalFunctionData {}; -void VortexWriteSink(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate, - LocalFunctionData &lstate, DataChunk &input) { - auto &global_state = gstate.Cast(); - auto bind = bind_data.Cast(); - - auto chunk = DataChunk(); - chunk.Initialize(Allocator::Get(context.client), bind.sql_types); +void WriteSink(ExecutionContext &context, FunctionData &bind_data, GlobalFunctionData &gstate, + LocalFunctionData &lstate, DataChunk &input) { + auto &global_state = gstate.Cast(); + auto bind = bind_data.Cast(); for (auto i = 0u; i < input.ColumnCount(); i++) { input.data[i].Flatten(input.size()); } - - auto new_array = vx_array_append_duckdb_chunk( - global_state.array->array, reinterpret_cast(&input), bind.column_nullable.data()); - global_state.array = make_uniq(new_array); + // TODO(joe): go to a model of combining local chunked into arrays of a specific size + // before push each of these larger chunks into the global_state + global_state.sink->PushChunk(input); } std::vector TableNullability(ClientContext &context, const string &catalog_name, const string &schema, @@ -71,11 +67,11 @@ std::vector TableNullability(ClientContext &context, const string &catalo return vec; } -void RegisterVortexWriteFunction(DatabaseInstance &instance) { +void RegisterWriteFunction(DatabaseInstance &instance) { CopyFunction function("vortex"); function.copy_to_bind = [](ClientContext &context, CopyFunctionBindInput &input, const vector &names, const vector &sql_types) -> unique_ptr { - auto result = make_uniq(); + auto result = make_uniq(); auto not_null = TableNullability(context, input.info.catalog, input.info.schema, input.info.table); @@ -90,9 +86,8 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) { }; function.copy_to_initialize_global = [](ClientContext &context, FunctionData &bind_data, const string &file_path) -> unique_ptr { - auto &bind = bind_data.Cast(); - auto gstate = make_uniq(); - gstate->file_name = file_path; + auto &bind = bind_data.Cast(); + auto gstate = make_uniq(); auto column_names = std::vector(); for (const auto &col_id : bind.column_names) { @@ -104,24 +99,18 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) { column_types.push_back(reinterpret_cast(&col_type)); } - vx_error *error = nullptr; - auto array = vx_array_create_empty_from_duckdb_table(column_types.data(), bind.column_nullable.data(), - column_names.data(), column_names.size(), &error); - HandleError(error); - - gstate->array = make_uniq(array); + auto dtype = DType::FromDuckDBTable(column_types, bind.column_nullable, column_names); + gstate->sink = ArrayStreamSink::Create(file_path, std::move(dtype)); return std::move(gstate); }; function.copy_to_initialize_local = [](ExecutionContext &context, FunctionData &bind_data) -> unique_ptr { - return std::move(make_uniq()); + return std::move(make_uniq()); }; - function.copy_to_sink = VortexWriteSink; + function.copy_to_sink = WriteSink; function.copy_to_finalize = [](ClientContext &context, FunctionData &bind_data, GlobalFunctionData &gstate) { - auto &global_state = gstate.Cast(); - vx_error *error; - vx_file_write_array(global_state.file_name.c_str(), global_state.array->array, &error); - HandleError(error); + auto &global_state = gstate.Cast(); + global_state.sink->Close(); }; function.execution_mode = [](bool preserve_insertion_order, bool supports_batch_index) -> CopyFunctionExecutionMode { @@ -132,4 +121,4 @@ void RegisterVortexWriteFunction(DatabaseInstance &instance) { ExtensionUtil::RegisterFunction(instance, function); } -} // namespace duckdb +} // namespace vortex diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a260338..2aef37c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,13 +1,13 @@ include_directories(${CMAKE_SOURCE_DIR}/../src/include) -add_executable(vortex_tests src/expr.cpp) +add_executable(vortex_tests src/vortex_expr_test.cpp) target_include_directories(vortex_tests PRIVATE - ${catch2_SOURCE_DIR}/single_include + ${catch2_SOURCE_DIR}/single_include ) target_link_libraries(vortex_tests PRIVATE - ${EXTENSION_NAME} - Catch2::Catch2WithMain) + ${EXTENSION_NAME} + Catch2::Catch2WithMain) enable_testing() add_test(NAME AllTests COMMAND vortex_tests) diff --git a/test/sql/nulls.test b/test/sql/nulls.test index 4bb572d..0035443 100644 --- a/test/sql/nulls.test +++ b/test/sql/nulls.test @@ -1,25 +1,37 @@ # name: test/sql/nulls.test # description: test nullable values # group: [vortex] - -# Before we load the extension, this will fail +# This test is used to verify that NULL values writen to a vortex file round trip. +# This is that they are preserved after a file write + read. require vortex # copy vortex statement ok -COPY ((SELECT generate_series as s1, NULL as s2 from generate_series(0, 4)) union (SELECT generate_series as s1, generate_series + 1 as s2 from generate_series(5,8))) TO '__TEST_DIR__/test.vortex' (FORMAT VORTEX) +COPY (( + SELECT + generate_series as s, + NULL as s1, + NULL as s2 + FROM generate_series(0, 4) +) UNION ( + SELECT + generate_series as s, + CAST(generate_series as decimal(15,2)) as s1, + generate_series + 1 as s2 + FROM generate_series(5,8))) +TO '__TEST_DIR__/test.vortex' (FORMAT VORTEX) # read vortex -query II -SELECT s1, s2 FROM read_vortex('__TEST_DIR__/test.vortex') ORDER BY s1; +query III +SELECT s, s1, s2 FROM read_vortex('__TEST_DIR__/test.vortex') ORDER BY s; ---- -0 NULL -1 NULL -2 NULL -3 NULL -4 NULL -5 6 -6 7 -7 8 -8 9 +0 NULL NULL +1 NULL NULL +2 NULL NULL +3 NULL NULL +4 NULL NULL +5 5.00 6 +6 6.00 7 +7 7.00 8 +8 8.00 9 diff --git a/test/sql/table.test b/test/sql/table.test index e05c377..b6f8f02 100644 --- a/test/sql/table.test +++ b/test/sql/table.test @@ -16,7 +16,7 @@ CREATE TABLE generated_data_table ( -- hugeint_col HUGEINT, all commented values are not supported yet. float_col FLOAT, double_col DOUBLE, --- decimal_col DECIMAL(10, 2), + decimal_col DECIMAL(10, 2), varchar_col VARCHAR, date_col DATE, timmestamp_s_col TIMESTAMP_S, @@ -39,7 +39,7 @@ SELECT -- CAST(seq * 1000 AS HUGEINT) AS hugeint_col, CAST(seq AS FLOAT) / 100.0 AS float_col, CAST(seq AS DOUBLE) / 1000.0 AS double_col, --- CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, + CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, 'Value ' || seq AS varchar_col, DATE '1992-03-22' + to_days(cast(seq AS INTEGER)) AS date_col, TIMESTAMP_S '2025-01-01 00:00:00' + to_seconds(cast(seq AS INTEGER)) as timmestamp_s_col, diff --git a/test/sql/table.test_slow b/test/sql/table.test_slow index 7e8b142..c55bc83 100644 --- a/test/sql/table.test_slow +++ b/test/sql/table.test_slow @@ -16,7 +16,7 @@ CREATE TABLE generated_data_table ( -- hugeint_col HUGEINT, all commented values are not supported yet. float_col FLOAT, double_col DOUBLE, --- decimal_col DECIMAL(10, 2), + decimal_col DECIMAL(10, 2), varchar_col VARCHAR, date_col DATE, timmestamp_s_col TIMESTAMP_S, @@ -39,7 +39,7 @@ SELECT -- CAST(seq * 1000 AS HUGEINT) AS hugeint_col, CAST(seq AS FLOAT) / 100.0 AS float_col, CAST(seq AS DOUBLE) / 1000.0 AS double_col, --- CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, + CAST(seq AS DECIMAL(10, 2)) / 10.0 AS decimal_col, 'Value ' || seq AS varchar_col, DATE '1992-03-22' + to_days(cast(seq AS INTEGER)) AS date_col, TIMESTAMP_S '2025-01-01 00:00:00' + to_seconds(cast(seq AS INTEGER)) as timmestamp_s_col,