diff --git a/CMake/ExternalAntlr4Cpp.cmake b/CMake/ExternalAntlr4Cpp.cmake new file mode 100644 index 000000000000..0b2b521e737e --- /dev/null +++ b/CMake/ExternalAntlr4Cpp.cmake @@ -0,0 +1,175 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: ExternalAntlr4Cpp.cmake taken from +# https://github.com/antlr/antlr4/blob/4.13.1/runtime/Cpp/cmake/ExternalAntlr4Cpp.cmake + +cmake_minimum_required(VERSION 3.7) + +if(POLICY CMP0114) + cmake_policy(SET CMP0114 NEW) +endif() + +include(ExternalProject) + +set(ANTLR4_ROOT ${CMAKE_CURRENT_BINARY_DIR}/antlr4_runtime/src/antlr4_runtime) +set(ANTLR4_INCLUDE_DIRS ${ANTLR4_ROOT}/runtime/Cpp/runtime/src) +set(ANTLR4_GIT_REPOSITORY https://github.com/antlr/antlr4.git) +if(NOT DEFINED ANTLR4_TAG) + # Set to branch name to keep library updated at the cost of needing to rebuild + # after 'clean' Set to commit hash to keep the build stable and does not need + # to rebuild after 'clean' + set(ANTLR4_TAG master) +endif() + +# Ensure that the include dir already exists at configure time (to avoid cmake +# erroring on non-existent include dirs) +file(MAKE_DIRECTORY "${ANTLR4_INCLUDE_DIRS}") + +if(${CMAKE_GENERATOR} MATCHES "Visual Studio.*") + set(ANTLR4_OUTPUT_DIR ${ANTLR4_ROOT}/runtime/Cpp/dist/$(Configuration)) +elseif(${CMAKE_GENERATOR} MATCHES "Xcode.*") + set(ANTLR4_OUTPUT_DIR ${ANTLR4_ROOT}/runtime/Cpp/dist/$(CONFIGURATION)) +else() + set(ANTLR4_OUTPUT_DIR ${ANTLR4_ROOT}/runtime/Cpp/dist) +endif() + +if(MSVC) + set(ANTLR4_STATIC_LIBRARIES ${ANTLR4_OUTPUT_DIR}/antlr4-runtime-static.lib) + set(ANTLR4_SHARED_LIBRARIES ${ANTLR4_OUTPUT_DIR}/antlr4-runtime.lib) + set(ANTLR4_RUNTIME_LIBRARIES ${ANTLR4_OUTPUT_DIR}/antlr4-runtime.dll) +else() + set(ANTLR4_STATIC_LIBRARIES ${ANTLR4_OUTPUT_DIR}/libantlr4-runtime.a) + if(MINGW) + set(ANTLR4_SHARED_LIBRARIES ${ANTLR4_OUTPUT_DIR}/libantlr4-runtime.dll.a) + set(ANTLR4_RUNTIME_LIBRARIES ${ANTLR4_OUTPUT_DIR}/libantlr4-runtime.dll) + elseif(CYGWIN) + set(ANTLR4_SHARED_LIBRARIES ${ANTLR4_OUTPUT_DIR}/libantlr4-runtime.dll.a) + set(ANTLR4_RUNTIME_LIBRARIES + ${ANTLR4_OUTPUT_DIR}/cygantlr4-runtime-${ANTLR4_TAG}.dll) + elseif(APPLE) + set(ANTLR4_RUNTIME_LIBRARIES ${ANTLR4_OUTPUT_DIR}/libantlr4-runtime.dylib) + else() + set(ANTLR4_RUNTIME_LIBRARIES ${ANTLR4_OUTPUT_DIR}/libantlr4-runtime.so) + endif() +endif() + +if(${CMAKE_GENERATOR} MATCHES ".* Makefiles") + # This avoids 'warning: jobserver unavailable: using -j1. Add '+' to parent + # make rule.' + set(ANTLR4_BUILD_COMMAND $(MAKE)) +elseif(${CMAKE_GENERATOR} MATCHES "Visual Studio.*") + set(ANTLR4_BUILD_COMMAND ${CMAKE_COMMAND} --build . --config $(Configuration) + --target) +elseif(${CMAKE_GENERATOR} MATCHES "Xcode.*") + set(ANTLR4_BUILD_COMMAND ${CMAKE_COMMAND} --build . --config $(CONFIGURATION) + --target) +else() + set(ANTLR4_BUILD_COMMAND ${CMAKE_COMMAND} --build . --target) +endif() + +if(NOT DEFINED ANTLR4_WITH_STATIC_CRT) + set(ANTLR4_WITH_STATIC_CRT ON) +endif() + +if(ANTLR4_ZIP_REPOSITORY) + ExternalProject_Add( + antlr4_runtime + PREFIX antlr4_runtime + URL ${ANTLR4_ZIP_REPOSITORY} + DOWNLOAD_DIR ${CMAKE_CURRENT_BINARY_DIR} + BUILD_COMMAND "" + BUILD_IN_SOURCE 1 + SOURCE_DIR ${ANTLR4_ROOT} + SOURCE_SUBDIR runtime/Cpp + CMAKE_CACHE_ARGS + -DCMAKE_BUILD_TYPE:STRING=${CMAKE_BUILD_TYPE} + -DWITH_STATIC_CRT:BOOL=${ANTLR4_WITH_STATIC_CRT} + -DDISABLE_WARNINGS:BOOL=ON + # -DCMAKE_CXX_STANDARD:STRING=17 # if desired, compile the runtime with a + # different C++ standard -DCMAKE_CXX_STANDARD:STRING=${CMAKE_CXX_STANDARD} + # alternatively, compile the runtime with the same C++ standard as the + # outer project + INSTALL_COMMAND "" + EXCLUDE_FROM_ALL 1) +else() + ExternalProject_Add( + antlr4_runtime + PREFIX antlr4_runtime + GIT_REPOSITORY ${ANTLR4_GIT_REPOSITORY} + GIT_TAG ${ANTLR4_TAG} + DOWNLOAD_DIR ${CMAKE_CURRENT_BINARY_DIR} + BUILD_COMMAND "" + BUILD_IN_SOURCE 1 + SOURCE_DIR ${ANTLR4_ROOT} + SOURCE_SUBDIR runtime/Cpp + CMAKE_CACHE_ARGS + -DCMAKE_BUILD_TYPE:STRING=${CMAKE_BUILD_TYPE} + -DWITH_STATIC_CRT:BOOL=${ANTLR4_WITH_STATIC_CRT} + -DDISABLE_WARNINGS:BOOL=ON + # -DCMAKE_CXX_STANDARD:STRING=17 # if desired, compile the runtime with a + # different C++ standard -DCMAKE_CXX_STANDARD:STRING=${CMAKE_CXX_STANDARD} + # alternatively, compile the runtime with the same C++ standard as the + # outer project + INSTALL_COMMAND "" + EXCLUDE_FROM_ALL 1) +endif() + +# Separate build step as rarely people want both +set(ANTLR4_BUILD_DIR ${ANTLR4_ROOT}) +if(${CMAKE_VERSION} VERSION_GREATER_EQUAL "3.14.0") + # CMake 3.14 builds in above's SOURCE_SUBDIR when BUILD_IN_SOURCE is true + set(ANTLR4_BUILD_DIR ${ANTLR4_ROOT}/runtime/Cpp) +endif() + +ExternalProject_Add_Step( + antlr4_runtime build_static + COMMAND ${ANTLR4_BUILD_COMMAND} antlr4_static + # Depend on target instead of step (a custom command) to avoid running + # dependent steps concurrently + DEPENDS antlr4_runtime + BYPRODUCTS ${ANTLR4_STATIC_LIBRARIES} + EXCLUDE_FROM_MAIN 1 + WORKING_DIRECTORY ${ANTLR4_BUILD_DIR}) +ExternalProject_Add_StepTargets(antlr4_runtime build_static) + +add_library(antlr4_static STATIC IMPORTED) +add_dependencies(antlr4_static antlr4_runtime-build_static) +set_target_properties( + antlr4_static + PROPERTIES IMPORTED_LOCATION ${ANTLR4_STATIC_LIBRARIES}) +target_include_directories(antlr4_static INTERFACE ${ANTLR4_INCLUDE_DIRS}) + +ExternalProject_Add_Step( + antlr4_runtime build_shared + COMMAND ${ANTLR4_BUILD_COMMAND} antlr4_shared + # Depend on target instead of step (a custom command) to avoid running + # dependent steps concurrently + DEPENDS antlr4_runtime + BYPRODUCTS ${ANTLR4_SHARED_LIBRARIES} ${ANTLR4_RUNTIME_LIBRARIES} + EXCLUDE_FROM_MAIN 1 + WORKING_DIRECTORY ${ANTLR4_BUILD_DIR}) +ExternalProject_Add_StepTargets(antlr4_runtime build_shared) + +add_library(antlr4_shared SHARED IMPORTED) +add_dependencies(antlr4_shared antlr4_runtime-build_shared) +set_target_properties( + antlr4_shared + PROPERTIES IMPORTED_LOCATION ${ANTLR4_RUNTIME_LIBRARIES}) +target_include_directories(antlr4_shared INTERFACE ${ANTLR4_INCLUDE_DIRS}) + +if(ANTLR4_SHARED_LIBRARIES) + set_target_properties(antlr4_shared PROPERTIES IMPORTED_IMPLIB + ${ANTLR4_SHARED_LIBRARIES}) +endif() diff --git a/CMake/FindANTLR.cmake b/CMake/FindANTLR.cmake new file mode 100644 index 000000000000..e1d1cfc9d091 --- /dev/null +++ b/CMake/FindANTLR.cmake @@ -0,0 +1,174 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# NOTE: FindANTLR.cmake taken from +# https://github.com/antlr/antlr4/blob/4.13.1/runtime/Cpp/cmake/FindANTLR.cmake + +# Set ANTLR version +set(ANTLR4_TAG 4.13.1) +add_definitions(-DANTLR4CPP_STATIC) + +# Define the JAR name and path +set(ANTLR_JAR_NAME antlr-${ANTLR4_TAG}-complete.jar) + +# Set the download directory +set(ANTLR_DOWNLOAD_DIR ${CMAKE_BINARY_DIR}/antlr) +file(MAKE_DIRECTORY ${ANTLR_DOWNLOAD_DIR}) + +# Define the full path to the ANTLR JAR +set(ANTLR_EXECUTABLE ${ANTLR_DOWNLOAD_DIR}/${ANTLR_JAR_NAME}) + +# Download the ANTLR JAR if it does not exist +if(NOT EXISTS ${ANTLR_EXECUTABLE}) + message(STATUS "ANTLR JAR not found. Downloading ANTLR ${ANTLR4_TAG}...") + file( + DOWNLOAD https://www.antlr.org/download/${ANTLR_JAR_NAME} + ${ANTLR_EXECUTABLE} + SHOW_PROGRESS + EXPECTED_HASH + SHA256=bc13a9c57a8dd7d5196888211e5ede657cb64a3ce968608697e4f668251a8487 + TLS_VERIFY ON) +endif() + +# Include the ANTLR C++ runtime integration +include(ExternalAntlr4Cpp) + +find_package(Java 11 REQUIRED COMPONENTS Runtime) + +if(NOT ANTLR_EXECUTABLE) + find_program(ANTLR_EXECUTABLE NAMES antlr.jar antlr4.jar antlr-4.jar + antlr-${ANTLR4_TAG}-complete.jar) +endif() + +if(ANTLR_EXECUTABLE AND Java_JAVA_EXECUTABLE) + execute_process( + COMMAND ${Java_JAVA_EXECUTABLE} -jar ${ANTLR_EXECUTABLE} + OUTPUT_VARIABLE ANTLR_COMMAND_OUTPUT + ERROR_VARIABLE ANTLR_COMMAND_ERROR + RESULT_VARIABLE ANTLR_COMMAND_RESULT + OUTPUT_STRIP_TRAILING_WHITESPACE) + + if(ANTLR_COMMAND_RESULT EQUAL 0) + string(REGEX MATCH "Version [0-9]+(\\.[0-9]+)*" ANTLR_VERSION + ${ANTLR_COMMAND_OUTPUT}) + string(REPLACE "Version " "" ANTLR_VERSION ${ANTLR_VERSION}) + else() + message( + SEND_ERROR "Command '${Java_JAVA_EXECUTABLE} -jar ${ANTLR_EXECUTABLE}' " + "failed with the output '${ANTLR_COMMAND_ERROR}'") + endif() + + macro(ANTLR_TARGET Name InputFile) + set(ANTLR_OPTIONS LEXER PARSER LISTENER VISITOR) + set(ANTLR_ONE_VALUE_ARGS PACKAGE OUTPUT_DIRECTORY DEPENDS_ANTLR) + set(ANTLR_MULTI_VALUE_ARGS COMPILE_FLAGS DEPENDS) + cmake_parse_arguments( + ANTLR_TARGET + "${ANTLR_OPTIONS}" + "${ANTLR_ONE_VALUE_ARGS}" + "${ANTLR_MULTI_VALUE_ARGS}" + ${ARGN}) + set(ANTLR_${Name}_INPUT ${InputFile}) + + get_filename_component(ANTLR_INPUT ${InputFile} NAME_WE) + get_filename_component(ANTLR_INPUT_PARENT_DIR "${InputFile}" DIRECTORY) + + if(ANTLR_TARGET_OUTPUT_DIRECTORY) + set(ANTLR_${Name}_OUTPUT_DIR ${ANTLR_TARGET_OUTPUT_DIRECTORY}) + else() + set(ANTLR_${Name}_OUTPUT_DIR + ${CMAKE_CURRENT_BINARY_DIR}/antlr4cpp_generated_src/${ANTLR_INPUT}) + endif() + + unset(ANTLR_${Name}_CXX_OUTPUTS) + + if((ANTLR_TARGET_LEXER AND NOT ANTLR_TARGET_PARSER) + OR (ANTLR_TARGET_PARSER AND NOT ANTLR_TARGET_LEXER)) + list(APPEND ANTLR_${Name}_CXX_OUTPUTS + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}.h + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}.cpp) + set(ANTLR_${Name}_OUTPUTS + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}.interp + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}.tokens) + else() + list( + APPEND + ANTLR_${Name}_CXX_OUTPUTS + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Lexer.h + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Lexer.cpp + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Parser.h + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Parser.cpp) + list(APPEND ANTLR_${Name}_OUTPUTS + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Lexer.interp + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Lexer.tokens) + endif() + + if(ANTLR_TARGET_LISTENER) + list( + APPEND + ANTLR_${Name}_CXX_OUTPUTS + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}BaseListener.h + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}BaseListener.cpp + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Listener.h + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Listener.cpp) + list(APPEND ANTLR_TARGET_COMPILE_FLAGS -listener) + endif() + + if(ANTLR_TARGET_VISITOR) + list( + APPEND + ANTLR_${Name}_CXX_OUTPUTS + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}BaseVisitor.h + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}BaseVisitor.cpp + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Visitor.h + ${ANTLR_${Name}_OUTPUT_DIR}/${ANTLR_INPUT}Visitor.cpp) + list(APPEND ANTLR_TARGET_COMPILE_FLAGS -visitor) + endif() + + if(ANTLR_TARGET_PACKAGE) + list(APPEND ANTLR_TARGET_COMPILE_FLAGS -package ${ANTLR_TARGET_PACKAGE}) + endif() + + list(APPEND ANTLR_${Name}_OUTPUTS ${ANTLR_${Name}_CXX_OUTPUTS}) + + if(ANTLR_TARGET_DEPENDS_ANTLR) + if(ANTLR_${ANTLR_TARGET_DEPENDS_ANTLR}_INPUT) + list(APPEND ANTLR_TARGET_DEPENDS + ${ANTLR_${ANTLR_TARGET_DEPENDS_ANTLR}_INPUT}) + list(APPEND ANTLR_TARGET_DEPENDS + ${ANTLR_${ANTLR_TARGET_DEPENDS_ANTLR}_OUTPUTS}) + else() + message( + SEND_ERROR "ANTLR target '${ANTLR_TARGET_DEPENDS_ANTLR}' not found") + endif() + endif() + + add_custom_command( + OUTPUT ${ANTLR_${Name}_OUTPUTS} + COMMAND + ${Java_JAVA_EXECUTABLE} -jar ${ANTLR_EXECUTABLE} ${InputFile} -o + ${ANTLR_${Name}_OUTPUT_DIR} -no-listener -Dlanguage=Cpp + ${ANTLR_TARGET_COMPILE_FLAGS} + DEPENDS ${InputFile} ${ANTLR_TARGET_DEPENDS} + WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR} + COMMENT "Building ${Name} with ANTLR ${ANTLR_VERSION}") + endmacro(ANTLR_TARGET) + +endif(ANTLR_EXECUTABLE AND Java_JAVA_EXECUTABLE) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args( + ANTLR + REQUIRED_VARS ANTLR_EXECUTABLE Java_JAVA_EXECUTABLE + VERSION_VAR ANTLR_VERSION) diff --git a/CMake/resolve_dependency_modules/clp.cmake b/CMake/resolve_dependency_modules/clp.cmake new file mode 100644 index 000000000000..bf50d26d7c69 --- /dev/null +++ b/CMake/resolve_dependency_modules/clp.cmake @@ -0,0 +1,38 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +include_guard(GLOBAL) + +FetchContent_Declare( + clp + GIT_REPOSITORY https://github.com/y-scope/clp.git + GIT_TAG 7b1b169a89abdfe44c159d6200b168391b697877 + GIT_SUBMODULES "" GIT_SUBMODULES_RECURSE TRUE) + +FetchContent_MakeAvailable(clp) + +if(clp_POPULATED) + message(STATUS "Updating submodules for clp...") + execute_process( + COMMAND ${CMAKE_COMMAND} -E chdir "${clp_SOURCE_DIR}" git submodule update + --init --recursive + RESULT_VARIABLE submodule_update_result + OUTPUT_VARIABLE submodule_update_output + ERROR_VARIABLE submodule_update_error) + if(NOT ${submodule_update_result} EQUAL 0) + message(ERROR + "Failed to update submodules for clp:\n${submodule_update_error}") + else() + message(STATUS "Submodules for clp updated successfully.") + endif() +endif() diff --git a/CMake/resolve_dependency_modules/msgpack-cxx.cmake b/CMake/resolve_dependency_modules/msgpack-cxx.cmake new file mode 100644 index 000000000000..ce39b7cf1676 --- /dev/null +++ b/CMake/resolve_dependency_modules/msgpack-cxx.cmake @@ -0,0 +1,34 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +include_guard(GLOBAL) + +set(VELOX_MSGPACK_BUILD_VERSION cpp-7.0.0) +set(VELOX_MSGPACK_BUILD_SHA256_CHECKSUM + 070881ebea9208cf7e731fd5a46a11404025b2f260ab9527e32dfcb7c689fbfc) +string( + CONCAT VELOX_MSGPACK_SOURCE_URL + "https://github.com/msgpack/msgpack-c/archive/refs/tags/" + "${VELOX_MSGPACK_BUILD_VERSION}.tar.gz") + +velox_resolve_dependency_url(MSGPACK) + +message(STATUS "Building msgpack-cxx from source") + +FetchContent_Declare( + msgpack-cxx + URL ${VELOX_MSGPACK_SOURCE_URL} + URL_HASH ${VELOX_MSGPACK_BUILD_SHA256_CHECKSUM} + OVERRIDE_FIND_PACKAGE EXCLUDE_FROM_ALL SYSTEM) + +FetchContent_MakeAvailable(msgpack-cxx) diff --git a/CMake/resolve_dependency_modules/spdlog.cmake b/CMake/resolve_dependency_modules/spdlog.cmake new file mode 100644 index 000000000000..022525fc1340 --- /dev/null +++ b/CMake/resolve_dependency_modules/spdlog.cmake @@ -0,0 +1,35 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +include_guard(GLOBAL) + +set(VELOX_SPDLOG_BUILD_VERSION 1.12.0) +set(VELOX_SPDLOG_BUILD_SHA256_CHECKSUM + 4dccf2d10f410c1e2feaff89966bfc49a1abb29ef6f08246335b110e001e09a9) +string( + CONCAT VELOX_SPDLOG_SOURCE_URL + "https://github.com/gabime/spdlog/archive/refs/tags/" + "v${VELOX_SPDLOG_BUILD_VERSION}.tar.gz") + +velox_resolve_dependency_url(SPDLOG) + +message(STATUS "Building spdlog from source") + +FetchContent_Declare( + spdlog + URL ${VELOX_SPDLOG_SOURCE_URL} + URL_HASH ${VELOX_SPDLOG_BUILD_SHA256_CHECKSUM} + OVERRIDE_FIND_PACKAGE EXCLUDE_FROM_ALL SYSTEM) + +set(SPDLOG_FMT_EXTERNAL ON) +FetchContent_MakeAvailable(spdlog) diff --git a/CMakeLists.txt b/CMakeLists.txt index 9f0b90106449..154d98146acf 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -123,6 +123,7 @@ set(VELOX_GFLAGS_TYPE ) option(VELOX_ENABLE_EXEC "Build exec." ON) option(VELOX_ENABLE_AGGREGATES "Build aggregates." ON) +option(VELOX_ENABLE_CLP_CONNECTOR "Build CLP connector." ON) option(VELOX_ENABLE_HIVE_CONNECTOR "Build Hive connector." ON) option(VELOX_ENABLE_TPCH_CONNECTOR "Build TPC-H connector." ON) option(VELOX_ENABLE_PRESTO_FUNCTIONS "Build Presto SQL functions." ON) @@ -169,6 +170,7 @@ if(${VELOX_BUILD_MINIMAL} OR ${VELOX_BUILD_MINIMAL_WITH_DWIO}) set(VELOX_ENABLE_EXPRESSION ON) set(VELOX_ENABLE_EXEC OFF) set(VELOX_ENABLE_AGGREGATES OFF) + set(VELOX_ENABLE_CLP_CONNECTOR OFF) set(VELOX_ENABLE_HIVE_CONNECTOR OFF) set(VELOX_ENABLE_TPCH_CONNECTOR OFF) set(VELOX_ENABLE_SPARK_FUNCTIONS OFF) @@ -429,9 +431,11 @@ set(BOOST_INCLUDE_LIBRARIES context date_time filesystem + iostreams program_options regex system + url thread) velox_set_source(Boost) @@ -485,6 +489,24 @@ endif() velox_set_source(re2) velox_resolve_dependency(re2) +if(${VELOX_ENABLE_CLP_CONNECTOR}) + set(clp_SOURCE BUNDLED) + velox_resolve_dependency(clp) + + set(spdlog_SOURCE BUNDLED) + velox_resolve_dependency(spdlog) + + velox_set_source(msgpack-cxx) + velox_resolve_dependency(msgpack-cxx) + + velox_set_source(absl) + velox_resolve_dependency(absl) + + set(curl_SOURCE BUNDLED) + velox_resolve_dependency(curl) + find_package(ANTLR REQUIRED) +endif() + if(${VELOX_BUILD_PYTHON_PACKAGE}) find_package( Python 3.9 diff --git a/velox/connectors/CMakeLists.txt b/velox/connectors/CMakeLists.txt index 3cc600201f6b..743ecbcaac00 100644 --- a/velox/connectors/CMakeLists.txt +++ b/velox/connectors/CMakeLists.txt @@ -17,6 +17,10 @@ velox_link_libraries(velox_connector velox_common_config velox_vector) add_subdirectory(fuzzer) +if(${VELOX_ENABLE_CLP_CONNECTOR}) + add_subdirectory(clp) +endif() + if(${VELOX_ENABLE_HIVE_CONNECTOR}) add_subdirectory(hive) endif() diff --git a/velox/connectors/clp/CMakeLists.txt b/velox/connectors/clp/CMakeLists.txt new file mode 100644 index 000000000000..8fc435d8bc4f --- /dev/null +++ b/velox/connectors/clp/CMakeLists.txt @@ -0,0 +1,31 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +set(CLP_SRC_DIR ${clp_SOURCE_DIR}/components/core/src) +add_subdirectory(search_lib) + +velox_add_library( + velox_clp_connector + OBJECT + ClpConnector.cpp + ClpDataSource.cpp + ClpTableHandle.cpp) + +velox_link_libraries(velox_clp_connector + PRIVATE clp-s-search simdjson::simdjson velox_connector) + +target_compile_features(velox_clp_connector PRIVATE cxx_std_20) + +if(${VELOX_BUILD_TESTING}) + add_subdirectory(tests) +endif() diff --git a/velox/connectors/clp/ClpColumnHandle.h b/velox/connectors/clp/ClpColumnHandle.h new file mode 100644 index 000000000000..d7857d827248 --- /dev/null +++ b/velox/connectors/clp/ClpColumnHandle.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" + +namespace facebook::velox::connector::clp { +class ClpColumnHandle : public ColumnHandle { + public: + ClpColumnHandle( + const std::string& columnName, + const std::string& originalColumnName, + const TypePtr& columnType, + bool nullable) + : columnName_(columnName), + originalColumnName_(originalColumnName), + columnType_(columnType), + nullable_(nullable) {} + + const std::string& columnName() const { + return columnName_; + } + + const std::string& originalColumnName() const { + return originalColumnName_; + } + + const TypePtr& columnType() const { + return columnType_; + } + + bool nullable() const { + return nullable_; + } + + private: + const std::string columnName_; + const std::string originalColumnName_; + const TypePtr columnType_; + const bool nullable_; +}; +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpConfig.h b/velox/connectors/clp/ClpConfig.h new file mode 100644 index 000000000000..5d6a254a81ee --- /dev/null +++ b/velox/connectors/clp/ClpConfig.h @@ -0,0 +1,43 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/common/config/Config.h" + +namespace facebook::velox::config { +class ConfigBase; +} + +namespace facebook::velox::connector::clp { +class ClpConfig { + public: + explicit ClpConfig(std::shared_ptr config) { + VELOX_CHECK_NOT_NULL(config, "Config is null for CLP initialization"); + config_ = std::move(config); + } + + [[nodiscard]] const std::shared_ptr& config() + const { + return config_; + } + + private: + std::shared_ptr config_; +}; +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpConnector.cpp b/velox/connectors/clp/ClpConnector.cpp new file mode 100644 index 000000000000..54ffe63a5ac7 --- /dev/null +++ b/velox/connectors/clp/ClpConnector.cpp @@ -0,0 +1,61 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "clp_s/TimestampPattern.hpp" + +#include "velox/connectors/clp/ClpConnector.h" +#include "velox/connectors/clp/ClpDataSource.h" + +namespace facebook::velox::connector::clp { +ClpConnector::ClpConnector( + const std::string& id, + std::shared_ptr config) + : Connector(id), config_(std::make_shared(config)) {} + +std::unique_ptr ClpConnector::createDataSource( + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + ConnectorQueryCtx* connectorQueryCtx) { + return std::make_unique( + outputType, + tableHandle, + columnHandles, + connectorQueryCtx->memoryPool(), + config_); +} + +std::unique_ptr ClpConnector::createDataSink( + RowTypePtr inputType, + std::shared_ptr connectorInsertTableHandle, + ConnectorQueryCtx* connectorQueryCtx, + CommitStrategy commitStrategy) { + VELOX_NYI("createDataSink for ClpConnector is not implemented!"); +} + +ClpConnectorFactory::ClpConnectorFactory() + : ConnectorFactory(kClpConnectorName) { + clp_s::TimestampPattern::init(); +} + +ClpConnectorFactory::ClpConnectorFactory(const char* connectorName) + : ConnectorFactory(connectorName) { + clp_s::TimestampPattern::init(); +} + +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpConnector.h b/velox/connectors/clp/ClpConnector.h new file mode 100644 index 000000000000..ae9c28419ac8 --- /dev/null +++ b/velox/connectors/clp/ClpConnector.h @@ -0,0 +1,75 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" +#include "velox/connectors/clp/ClpConfig.h" + +namespace facebook::velox::connector::clp { +class ClpConnector : public Connector { + public: + ClpConnector( + const std::string& id, + std::shared_ptr config); + + [[nodiscard]] const std::shared_ptr& + connectorConfig() const override { + return config_->config(); + } + + [[nodiscard]] bool canAddDynamicFilter() const override { + return false; + } + + std::unique_ptr createDataSource( + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + ConnectorQueryCtx* connectorQueryCtx) override; + + bool supportsSplitPreload() override { + return false; + } + + std::unique_ptr createDataSink( + RowTypePtr inputType, + std::shared_ptr connectorInsertTableHandle, + ConnectorQueryCtx* connectorQueryCtx, + CommitStrategy commitStrategy) override; + + private: + std::shared_ptr config_; +}; + +class ClpConnectorFactory : public ConnectorFactory { + public: + static constexpr const char* kClpConnectorName = "clp"; + + ClpConnectorFactory(); + explicit ClpConnectorFactory(const char* connectorName); + + std::shared_ptr newConnector( + const std::string& id, + std::shared_ptr config, + folly::Executor* /*ioExecutor*/, + folly::Executor* /*cpuExecutor*/) override { + return std::make_shared(id, config); + } +}; +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpConnectorSplit.h b/velox/connectors/clp/ClpConnectorSplit.h new file mode 100644 index 000000000000..3b00a4b6b5e8 --- /dev/null +++ b/velox/connectors/clp/ClpConnectorSplit.h @@ -0,0 +1,32 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" + +namespace facebook::velox::connector::clp { +struct ClpConnectorSplit : public connector::ConnectorSplit { + ClpConnectorSplit(const std::string& connectorId, const std::string& path) + : connector::ConnectorSplit(connectorId), path_(path) {} + + [[nodiscard]] std::string toString() const override { + return fmt::format("CLP Split: {}", path_); + } + + const std::string path_; +}; +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpDataSource.cpp b/velox/connectors/clp/ClpDataSource.cpp new file mode 100644 index 000000000000..7f3783cb1c8f --- /dev/null +++ b/velox/connectors/clp/ClpDataSource.cpp @@ -0,0 +1,176 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/connectors/clp/ClpColumnHandle.h" +#include "velox/connectors/clp/ClpConnectorSplit.h" +#include "velox/connectors/clp/ClpDataSource.h" +#include "velox/connectors/clp/ClpTableHandle.h" +#include "velox/connectors/clp/search_lib/ClpCursor.h" +#include "velox/connectors/clp/search_lib/ClpVectorLoader.h" +#include "velox/vector/FlatVector.h" + +namespace facebook::velox::connector::clp { + +ClpDataSource::ClpDataSource( + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + velox::memory::MemoryPool* pool, + std::shared_ptr& clpConfig) + : pool_(pool), outputType_(outputType) { + auto clpTableHandle = std::dynamic_pointer_cast(tableHandle); + storageType_ = clpTableHandle->storageType(); + if (auto query = clpTableHandle->kqlQuery(); query && !query->empty()) { + kqlQuery_ = *query; + } else { + kqlQuery_ = "*"; + } + + for (const auto& outputName : outputType->names()) { + auto columnHandle = columnHandles.find(outputName); + VELOX_CHECK( + columnHandle != columnHandles.end(), + "ColumnHandle not found for output name: {}", + outputName); + auto clpColumnHandle = + std::dynamic_pointer_cast(columnHandle->second); + VELOX_CHECK_NOT_NULL( + clpColumnHandle, + "ColumnHandle must be an instance of ClpColumnHandle for output name: {}", + outputName); + auto columnName = clpColumnHandle->originalColumnName(); + auto columnType = clpColumnHandle->columnType(); + addFieldsRecursively(columnType, columnName); + } +} + +void ClpDataSource::addFieldsRecursively( + const TypePtr& columnType, + const std::string& parentName) { + if (columnType->kind() == TypeKind::ROW) { + const auto& rowType = columnType->asRow(); + for (uint32_t i = 0; i < rowType.size(); ++i) { + const auto& childType = rowType.childAt(i); + const auto childName = parentName + "." + rowType.nameOf(i); + addFieldsRecursively(childType, childName); + } + } else { + search_lib::ColumnType clpColumnType = search_lib::ColumnType::Unknown; + switch (columnType->kind()) { + case TypeKind::BOOLEAN: + clpColumnType = search_lib::ColumnType::Boolean; + break; + case TypeKind::INTEGER: + case TypeKind::BIGINT: + case TypeKind::SMALLINT: + case TypeKind::TINYINT: + clpColumnType = search_lib::ColumnType::Integer; + break; + case TypeKind::DOUBLE: + case TypeKind::REAL: + clpColumnType = search_lib::ColumnType::Float; + break; + case TypeKind::VARCHAR: + clpColumnType = search_lib::ColumnType::String; + break; + case TypeKind::ARRAY: + clpColumnType = search_lib::ColumnType::Array; + break; + default: + VELOX_USER_FAIL("Type not supported: {}", columnType->name()); + } + fields_.emplace_back(search_lib::Field{clpColumnType, parentName}); + } +} + +void ClpDataSource::addSplit(std::shared_ptr split) { + auto clpSplit = std::dynamic_pointer_cast(split); + + if (storageType_ == ClpTableHandle::StorageType::kFS) { + cursor_ = std::make_unique( + clp_s::InputSource::Filesystem, clpSplit->path_); + } else if (storageType_ == ClpTableHandle::StorageType::kS3) { + cursor_ = std::make_unique( + clp_s::InputSource::Network, clpSplit->path_); + } + + cursor_->executeQuery(kqlQuery_, fields_); +} + +VectorPtr ClpDataSource::createVector( + const TypePtr& type, + size_t size, + const std::vector& projectedColumns, + const std::shared_ptr>& filteredRows, + size_t& readerIndex) { + if (type->kind() == TypeKind::ROW) { + std::vector children; + auto& rowType = type->as(); + for (uint32_t i = 0; i < rowType.size(); ++i) { + children.push_back(createVector( + rowType.childAt(i), + size, + projectedColumns, + filteredRows, + readerIndex)); + } + return std::make_shared( + pool_, type, nullptr, size, std::move(children)); + } + auto vector = BaseVector::create(type, size, pool_); + vector->setNulls(allocateNulls(size, pool_, bits::kNull)); + + VELOX_CHECK_LT( + readerIndex, projectedColumns.size(), "Reader index out of bounds"); + auto projectedColumn = projectedColumns[readerIndex]; + auto projectedType = fields_[readerIndex].type; + readerIndex++; + return std::make_shared( + pool_, + type, + size, + std::make_unique( + projectedColumn, projectedType, filteredRows), + std::move(vector)); +} + +std::optional ClpDataSource::next( + uint64_t size, + ContinueFuture& future) { + auto filteredRows = std::make_shared>(); + auto rowsScanned = cursor_->fetchNext(size, filteredRows); + auto rowsFiltered = filteredRows->size(); + if (rowsFiltered == 0) { + return nullptr; + } + completedRows_ += rowsScanned; + size_t readerIndex = 0; + const auto& projectedColumns = cursor_->getProjectedColumns(); + if (projectedColumns.size() != fields_.size()) { + VELOX_USER_FAIL( + "Projected columns size {} does not match fields size {}", + projectedColumns.size(), + fields_.size()); + } + return std::dynamic_pointer_cast(createVector( + outputType_, rowsFiltered, projectedColumns, filteredRows, readerIndex)); +} + +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpDataSource.h b/velox/connectors/clp/ClpDataSource.h new file mode 100644 index 000000000000..fd2be3848842 --- /dev/null +++ b/velox/connectors/clp/ClpDataSource.h @@ -0,0 +1,106 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "velox/connectors/Connector.h" +#include "velox/connectors/clp/ClpConfig.h" +#include "velox/connectors/clp/ClpTableHandle.h" +#include "velox/connectors/clp/search_lib/ClpCursor.h" + +namespace facebook::velox::connector::clp { +class ClpDataSource : public DataSource { + public: + ClpDataSource( + const RowTypePtr& outputType, + const std::shared_ptr& tableHandle, + const std::unordered_map< + std::string, + std::shared_ptr>& columnHandles, + velox::memory::MemoryPool* pool, + std::shared_ptr& clpConfig); + + void addSplit(std::shared_ptr split) override; + + std::optional next(uint64_t size, velox::ContinueFuture& future) + override; + + void addDynamicFilter( + column_index_t outputChannel, + const std::shared_ptr& filter) override { + VELOX_NYI("Dynamic filters not supported by ClpConnector."); + } + + uint64_t getCompletedBytes() override { + return completedBytes_; + } + + uint64_t getCompletedRows() override { + return completedRows_; + } + + std::unordered_map runtimeStats() override { + return {}; + } + + private: + /** + * Recursively adds fields from the column type to the list of fields to be + * retrieved from the data source. + * + * @param columnType The type of the column. + * @param parentName The name of the parent field (used for nested fields). + */ + void addFieldsRecursively( + const TypePtr& columnType, + const std::string& parentName); + + /** + * Creates a Vector of the specified type and size. + * + * This method recursively creates vectors for complex types like ROW. For + * primitive types, it creates a LazyVector that will load the data from the + * underlying data source when it is accessed. + * + * @param type The type of the Vector to create. + * @param size The number of elements in the Vector. + * @param projectedColumns The readers the projected columns. + * @param filteredRows The rows to be read. + * @param readerIndex The index of the column reader. + * @return A Vector of the specified type and size. + */ + VectorPtr createVector( + const TypePtr& type, + size_t size, + const std::vector& projectedColumns, + const std::shared_ptr>& filteredRows, + size_t& readerIndex); + + ClpTableHandle::StorageType storageType_; + std::string kqlQuery_; + velox::memory::MemoryPool* pool_; + RowTypePtr outputType_; + std::set columnUntypedNames_; + uint64_t completedRows_{0}; + uint64_t completedBytes_{0}; + + std::vector fields_; + + std::unique_ptr cursor_; +}; +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpTableHandle.cpp b/velox/connectors/clp/ClpTableHandle.cpp new file mode 100644 index 000000000000..5ef7bfecf48b --- /dev/null +++ b/velox/connectors/clp/ClpTableHandle.cpp @@ -0,0 +1,27 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/clp/ClpTableHandle.h" + +namespace facebook::velox::connector::clp { +std::string ClpTableHandle::toString() const { + return ConnectorTableHandle::toString(); +} + +folly::dynamic ClpTableHandle::serialize() const { + return ConnectorTableHandle::serialize(); +} +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/ClpTableHandle.h b/velox/connectors/clp/ClpTableHandle.h new file mode 100644 index 000000000000..801a9ff163dc --- /dev/null +++ b/velox/connectors/clp/ClpTableHandle.h @@ -0,0 +1,60 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "velox/connectors/Connector.h" + +namespace facebook::velox::connector::clp { +class ClpTableHandle : public ConnectorTableHandle { + public: + enum class StorageType { + kFS, + kS3, + }; + + ClpTableHandle( + const std::string& connectorId, + const std::string& tableName, + StorageType storageType, + std::shared_ptr kqlQuery) + : ConnectorTableHandle(connectorId), + tableName_(tableName), + storageType_(storageType), + kqlQuery_(std::move(kqlQuery)) {} + + [[nodiscard]] const std::string& tableName() const { + return tableName_; + } + + [[nodiscard]] const StorageType storageType() const { + return storageType_; + } + + [[nodiscard]] const std::shared_ptr& kqlQuery() const { + return kqlQuery_; + } + + std::string toString() const override; + + folly::dynamic serialize() const override; + + private: + const std::string tableName_; + const StorageType storageType_; + std::shared_ptr kqlQuery_; +}; +} // namespace facebook::velox::connector::clp diff --git a/velox/connectors/clp/search_lib/CMakeLists.txt b/velox/connectors/clp/search_lib/CMakeLists.txt new file mode 100644 index 000000000000..3e41e905a708 --- /dev/null +++ b/velox/connectors/clp/search_lib/CMakeLists.txt @@ -0,0 +1,129 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set(CLP_EXTERNAL_BINARY_DIR ${CMAKE_BINARY_DIR}/external/clp) +add_subdirectory(${clp_SOURCE_DIR}/components/core/src/clp/string_utils + ${CLP_EXTERNAL_BINARY_DIR}/string_utils) +set(YSTDLIB_CPP_BUILD_TESTING OFF) +add_subdirectory(${clp_SOURCE_DIR}/components/core/submodules/ystdlib-cpp + ${CLP_EXTERNAL_BINARY_DIR}/ystdlib-cpp EXCLUDE_FROM_ALL) + +string(LENGTH "${CMAKE_SOURCE_DIR}/" SOURCE_PATH_SIZE) + +antlr_target( + KqlParser + ${CLP_SRC_DIR}/clp_s/search/kql/Kql.g4 + LEXER + PARSER + VISITOR + PACKAGE + kql) + +set(CLP_SRC_FILES + ${ANTLR_KqlParser_CXX_OUTPUTS} + ${CLP_SRC_DIR}/clp_s/ArchiveReader.cpp + ${CLP_SRC_DIR}/clp_s/ArchiveReaderAdaptor.cpp + ${CLP_SRC_DIR}/clp_s/ColumnReader.cpp + ${CLP_SRC_DIR}/clp_s/DictionaryEntry.cpp + ${CLP_SRC_DIR}/clp_s/DictionaryWriter.cpp + ${CLP_SRC_DIR}/clp_s/FileReader.cpp + ${CLP_SRC_DIR}/clp_s/FileWriter.cpp + ${CLP_SRC_DIR}/clp_s/InputConfig.cpp + ${CLP_SRC_DIR}/clp_s/PackedStreamReader.cpp + ${CLP_SRC_DIR}/clp_s/ReaderUtils.cpp + ${CLP_SRC_DIR}/clp_s/Schema.cpp + ${CLP_SRC_DIR}/clp_s/SchemaMap.cpp + ${CLP_SRC_DIR}/clp_s/SchemaReader.cpp + ${CLP_SRC_DIR}/clp_s/SchemaTree.cpp + ${CLP_SRC_DIR}/clp_s/TimestampDictionaryReader.cpp + ${CLP_SRC_DIR}/clp_s/TimestampDictionaryWriter.cpp + ${CLP_SRC_DIR}/clp_s/TimestampEntry.cpp + ${CLP_SRC_DIR}/clp_s/TimestampPattern.cpp + ${CLP_SRC_DIR}/clp_s/Utils.cpp + ${CLP_SRC_DIR}/clp_s/VariableEncoder.cpp + ${CLP_SRC_DIR}/clp_s/VariableDecoder.cpp + ${CLP_SRC_DIR}/clp_s/ZstdCompressor.cpp + ${CLP_SRC_DIR}/clp_s/ZstdDecompressor.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/AndExpr.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/BooleanLiteral.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/ColumnDescriptor.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/ConstantProp.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/ConvertToExists.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/DateLiteral.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/EmptyExpr.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/Expression.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/FilterExpr.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/Integral.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/NarrowTypes.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/NullLiteral.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/OrExpr.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/OrOfAndForm.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/SearchUtils.cpp + ${CLP_SRC_DIR}/clp_s/search/ast/StringLiteral.cpp + ${CLP_SRC_DIR}/clp_s/search/clp_search/EncodedVariableInterpreter.cpp + ${CLP_SRC_DIR}/clp_s/search/clp_search/Grep.cpp + ${CLP_SRC_DIR}/clp_s/search/clp_search/Query.cpp + ${CLP_SRC_DIR}/clp_s/search/EvaluateTimestampIndex.cpp + ${CLP_SRC_DIR}/clp_s/search/kql/kql.cpp + ${CLP_SRC_DIR}/clp_s/search/Projection.cpp + ${CLP_SRC_DIR}/clp_s/search/QueryRunner.cpp + ${CLP_SRC_DIR}/clp_s/search/SchemaMatch.cpp + ${CLP_SRC_DIR}/clp/aws/AwsAuthenticationSigner.cpp + ${CLP_SRC_DIR}/clp/BoundedReader.cpp + ${CLP_SRC_DIR}/clp/CurlDownloadHandler.cpp + ${CLP_SRC_DIR}/clp/CurlGlobalInstance.cpp + ${CLP_SRC_DIR}/clp/Defs.h + ${CLP_SRC_DIR}/clp/FileReader.cpp + ${CLP_SRC_DIR}/clp/hash_utils.cpp + ${CLP_SRC_DIR}/clp/NetworkReader.cpp + ${CLP_SRC_DIR}/clp/ReaderInterface.cpp + ${CLP_SRC_DIR}/clp/Thread.cpp) + +velox_add_library( + clp-s-search + STATIC + ${CLP_SRC_FILES} + ClpVectorLoader.cpp + ClpVectorLoader.h + ClpCursor.cpp + ClpCursor.h + ClpQueryRunner.cpp + ClpQueryRunner.h) +target_compile_features(clp-s-search PRIVATE cxx_std_20) +target_compile_definitions(clp-s-search + PUBLIC SOURCE_PATH_SIZE=${SOURCE_PATH_SIZE}) +velox_include_directories( + clp-s-search + PUBLIC ${ANTLR_KqlParser_OUTPUT_DIR} + ${clp_SOURCE_DIR}/components/core/submodules ${CLP_SRC_DIR}) + +velox_link_libraries( + clp-s-search + PUBLIC msgpack-cxx spdlog::spdlog + PRIVATE + absl::flat_hash_map + antlr4_static + Boost::filesystem + Boost::iostreams + Boost::program_options + Boost::url + clp::string_utils + ${CURL_LIBRARIES} + glog::glog + OpenSSL::Crypto + simdjson::simdjson + velox_vector + ystdlib::containers + ystdlib::error_handling + zstd::zstd) diff --git a/velox/connectors/clp/search_lib/ClpCursor.cpp b/velox/connectors/clp/search_lib/ClpCursor.cpp new file mode 100644 index 000000000000..e2ae5276c953 --- /dev/null +++ b/velox/connectors/clp/search_lib/ClpCursor.cpp @@ -0,0 +1,254 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "clp_s/search/EvaluateTimestampIndex.hpp" +#include "clp_s/search/ast/ConvertToExists.hpp" +#include "clp_s/search/ast/EmptyExpr.hpp" +#include "clp_s/search/ast/NarrowTypes.hpp" +#include "clp_s/search/ast/OrOfAndForm.hpp" +#include "clp_s/search/ast/SearchUtils.hpp" +#include "clp_s/search/kql/kql.hpp" +#include "velox/connectors/clp/search_lib/ClpCursor.h" + +using namespace clp_s; +using namespace clp_s::search; +using namespace clp_s::search::ast; + +namespace facebook::velox::connector::clp::search_lib { +ClpCursor::ClpCursor(InputSource inputSource, std::string archivePath) + : errorCode_(ErrorCode::QueryNotInitialized), + inputSource_(inputSource), + archivePath_(std::move(archivePath)), + archiveReader_(std::make_shared()) {} + +ClpCursor::~ClpCursor() { + if (currentArchiveLoaded_) { + archiveReader_->close(); + } +} + +void ClpCursor::executeQuery( + const std::string& query, + const std::vector& outputColumns) { + query_ = query; + outputColumns_ = outputColumns; + errorCode_ = preprocessQuery(); +} + +uint64_t ClpCursor::fetchNext( + uint64_t numRows, + const std::shared_ptr>& filteredRowIndices) { + if (ErrorCode::Success != errorCode_) { + return 0; + } + + if (false == currentArchiveLoaded_) { + errorCode_ = loadArchive(); + if (ErrorCode::Success != errorCode_) { + return 0; + } + + archiveReader_->open_packed_streams(); + currentArchiveLoaded_ = true; + queryRunner_ = std::make_shared( + schemaMatch_, expr_, archiveReader_, false, projection_); + queryRunner_->global_init(); + } + + while (currentSchemaIndex_ < matchedSchemas_.size()) { + if (false == currentSchemaTableLoaded_) { + currentSchemaId_ = matchedSchemas_[currentSchemaIndex_]; + if (EvaluatedValue::False == + queryRunner_->schema_init(currentSchemaId_)) { + currentSchemaIndex_ += 1; + currentSchemaTableLoaded_ = false; + errorCode_ = ErrorCode::DictionaryNotFound; + continue; + } + + auto& reader = + archiveReader_->read_schema_table(currentSchemaId_, false, false); + reader.initialize_filter_with_column_map(queryRunner_.get()); + + errorCode_ = ErrorCode::Success; + currentSchemaTableLoaded_ = true; + } + + auto rowsScanned = queryRunner_->fetchNext(numRows, filteredRowIndices); + if (false == filteredRowIndices->empty()) { + return rowsScanned; + } + + currentSchemaIndex_ += 1; + currentSchemaTableLoaded_ = false; + } + + return 0; +} + +const std::vector& ClpCursor::getProjectedColumns() + const { + if (queryRunner_) { + return queryRunner_->getProjectedColumns(); + } + static std::vector kEmpty; + return kEmpty; +} + +ErrorCode ClpCursor::preprocessQuery() { + auto queryStream = std::istringstream(query_); + expr_ = kql::parse_kql_expression(queryStream); + if (nullptr == expr_) { + VLOG(2) << "Failed to parse query '" << query_ << "'"; + return ErrorCode::InvalidQuerySyntax; + } + + if (std::dynamic_pointer_cast(expr_)) { + VLOG(2) << "Query '" << query_ << "' is logically false"; + return ErrorCode::LogicalError; + } + + OrOfAndForm standardizePass; + if (expr_ = standardizePass.run(expr_); + std::dynamic_pointer_cast(expr_)) { + VLOG(2) << "Query '" << query_ << "' is logically false"; + return ErrorCode::LogicalError; + } + + NarrowTypes narrowPass; + if (expr_ = narrowPass.run(expr_); + std::dynamic_pointer_cast(expr_)) { + VLOG(2) << "Query '" << query_ << "' is logically false"; + return ErrorCode::LogicalError; + } + + ConvertToExists convertPass; + if (expr_ = convertPass.run(expr_); + std::dynamic_pointer_cast(expr_)) { + VLOG(2) << "Query '" << query_ << "' is logically false"; + return ErrorCode::LogicalError; + } + + return ErrorCode::Success; +} + +ErrorCode ClpCursor::loadArchive() { + auto networkAuthOption = inputSource_ == InputSource::Filesystem + ? NetworkAuthOption{.method = AuthMethod::None} + : NetworkAuthOption{.method = AuthMethod::S3PresignedUrlV4}; + + try { + archiveReader_->open( + get_path_object_for_raw_path(archivePath_), networkAuthOption); + } catch (std::exception& e) { + VLOG(2) << "Failed to open archive file: " << e.what(); + return ErrorCode::InternalError; + } + + auto timestampDict = archiveReader_->get_timestamp_dictionary(); + auto schemaTree = archiveReader_->get_schema_tree(); + auto schemaMap = archiveReader_->get_schema_map(); + + EvaluateTimestampIndex timestampIndex(timestampDict); + if (clp_s::EvaluatedValue::False == timestampIndex.run(expr_)) { + VLOG(2) << "No matching timestamp ranges for query '" << query_ << "'"; + return ErrorCode::InvalidTimestampRange; + } + + schemaMatch_ = std::make_shared(schemaTree, schemaMap); + if (expr_ = schemaMatch_->run(expr_); + std::dynamic_pointer_cast(expr_)) { + VLOG(2) << "No matching schemas for query '" << query_ << "'"; + return ErrorCode::SchemaNotFound; + } + + projection_ = std::make_shared( + outputColumns_.empty() ? ReturnAllColumns : ReturnSelectedColumns); + try { + for (auto const& column : outputColumns_) { + std::vector descriptorTokens; + std::string descriptorNamespace; + if (false == + tokenize_column_descriptor( + column.name, descriptorTokens, descriptorNamespace)) { + VLOG(2) << "Can not tokenize invalid column: '" << column.name << "'"; + return ErrorCode::InternalError; + } + + auto columnDescriptor = ColumnDescriptor::create_from_escaped_tokens( + descriptorTokens, descriptorNamespace); + switch (column.type) { + case ColumnType::String: + columnDescriptor->set_matching_types( + LiteralType::ClpStringT | LiteralType::VarStringT | + LiteralType::EpochDateT); + break; + case ColumnType::Integer: + columnDescriptor->set_matching_types(LiteralType::IntegerT); + break; + case ColumnType::Float: + columnDescriptor->set_matching_types(LiteralType::FloatT); + break; + case ColumnType::Boolean: + columnDescriptor->set_matching_types(LiteralType::BooleanT); + break; + case ColumnType::Array: + columnDescriptor->set_matching_types(LiteralType::ArrayT); + break; + default: + break; + } + + projection_->add_column(columnDescriptor); + } + } catch (TraceableException& e) { + VLOG(2) << e.what(); + return ErrorCode::InternalError; + } + projection_->resolve_columns(schemaTree); + archiveReader_->set_projection(projection_); + + archiveReader_->read_metadata(); + + matchedSchemas_.clear(); + for (auto schemaId : archiveReader_->get_schema_ids()) { + if (schemaMatch_->schema_matched(schemaId)) { + matchedSchemas_.push_back(schemaId); + } + } + + if (matchedSchemas_.empty()) { + return ErrorCode::SchemaNotFound; + } + + EvaluateTimestampIndex timestamp_index(timestampDict); + if (EvaluatedValue::False == timestamp_index.run(expr_)) { + VLOG(2) << "No matching timestamp ranges for query '" << query_ << "'"; + return ErrorCode::InvalidTimestampRange; + } + + archiveReader_->read_variable_dictionary(); + archiveReader_->read_log_type_dictionary(); + archiveReader_->read_array_dictionary(); + + currentSchemaIndex_ = 0; + currentSchemaTableLoaded_ = false; + return ErrorCode::Success; +} + +} // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/search_lib/ClpCursor.h b/velox/connectors/clp/search_lib/ClpCursor.h new file mode 100644 index 000000000000..bafd5f00c98c --- /dev/null +++ b/velox/connectors/clp/search_lib/ClpCursor.h @@ -0,0 +1,123 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include "clp_s/ArchiveReader.hpp" +#include "clp_s/search/SchemaMatch.hpp" +#include "clp_s/search/ast/Expression.hpp" +#include "velox/connectors/clp/search_lib/ClpQueryRunner.h" + +namespace facebook::velox::connector::clp::search_lib { + +enum class ErrorCode { + Success, + QueryNotInitialized, + InvalidQuerySyntax, + SchemaNotFound, + LogicalError, + DictionaryNotFound, + InvalidTimestampRange, + InternalError +}; + +enum class ColumnType { String, Integer, Float, Array, Boolean, Unknown = -1 }; + +struct Field { + ColumnType type; + std::string name; +}; + +/** + * This class is a query execution interface that manages the lifecycle of a + * query on a CLP-S archive, including parsing and validating the query, loading + * the relevant schemas and archives, applying filters, and iterating over the + * results. It abstracts away the low-level details of archive access and schema + * matching while supporting projection and batch-oriented retrieval of filtered + * rows. + */ +class ClpCursor { + public: + // Constructor + explicit ClpCursor(clp_s::InputSource inputSource, std::string archivePath); + + // Destructor + ~ClpCursor(); + + /** + * Executes a query. This function parses, validates, and prepares the given + * query for execution. + * @param query The KQL query to execute. + * @param outputColumns A vector specifying the columns to be included in the + * query result. + */ + void executeQuery( + const std::string& query, + const std::vector& outputColumns); + + /** + * Fetches the next set of rows from the cursor.If the archive and schema are + * not yet loaded, this function will perform the necessary loading. + * @param numRows The maximum number of rows to fetch. + * @param filteredRowIndices A vector of row indices that match the filter. + * @return The number of rows scanned. + */ + uint64_t fetchNext( + uint64_t numRows, + const std::shared_ptr>& filteredRowIndices); + + /** + * Retrieves the projected columns + * @return A vector of BaseColumnReader pointers representing the projected + * columns. + */ + const std::vector& getProjectedColumns() const; + + private: + /** + * Preprocesses the query, performing parsing, validation, and optimization. + * @return The error code. + */ + ErrorCode preprocessQuery(); + + /** + * Loads the archive at the current index. + * @return The error code. + */ + ErrorCode loadArchive(); + + ErrorCode errorCode_; + + clp_s::InputSource inputSource_{clp_s::InputSource::Filesystem}; + std::string archivePath_; + std::string query_; + std::vector outputColumns_; + std::vector matchedSchemas_; + size_t currentSchemaIndex_{0}; + int32_t currentSchemaId_{-1}; + bool currentSchemaTableLoaded_{false}; + bool currentArchiveLoaded_{false}; + + std::shared_ptr expr_; + std::shared_ptr schemaMatch_; + std::shared_ptr queryRunner_; + std::shared_ptr projection_; + std::shared_ptr archiveReader_; +}; +} // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/search_lib/ClpQueryRunner.cpp b/velox/connectors/clp/search_lib/ClpQueryRunner.cpp new file mode 100644 index 000000000000..6c1395d6345f --- /dev/null +++ b/velox/connectors/clp/search_lib/ClpQueryRunner.cpp @@ -0,0 +1,82 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "velox/connectors/clp/search_lib/ClpQueryRunner.h" +#include "clp_s/search/clp_search/Grep.hpp" +#include "velox/vector/ComplexVector.h" + +using namespace clp_s; +using namespace clp_s::search; +using namespace clp_s::search::clp_search; + +namespace facebook::velox::connector::clp::search_lib { +void ClpQueryRunner::init( + clp_s::SchemaReader* schemaReader, + std::unordered_map const& columnMap) { + numMessages_ = schemaReader->get_num_messages(); + curMessage_ = 0; + clear_readers(); + + projectedColumns_.clear(); + auto matchingNodesList = projection_->get_ordered_matching_nodes(); + for (const auto& nodeIds : matchingNodesList) { + if (nodeIds.empty()) { + projectedColumns_.push_back(nullptr); + continue; + } + + // Try to find a matching column in columnMap + bool foundReader = false; + for (const auto nodeId : nodeIds) { + auto columnIt = columnMap.find(nodeId); + if (columnIt != columnMap.end()) { + projectedColumns_.push_back(columnIt->second); + foundReader = true; + break; + } + } + + if (!foundReader) { + projectedColumns_.push_back(nullptr); + } + } + + for (auto& [columnId, columnReader] : columnMap) { + initialize_reader(columnId, columnReader); + } +} + +uint64_t ClpQueryRunner::fetchNext( + uint64_t numRows, + const std::shared_ptr>& filteredRowIndices) { + size_t rowsfiltered = 0; + size_t rowsScanned = 0; + while (curMessage_ < numMessages_) { + if (filter(curMessage_)) { + filteredRowIndices->emplace_back(curMessage_); + rowsfiltered += 1; + } + + curMessage_ += 1; + rowsScanned += 1; + if (rowsfiltered >= numRows) { + break; + } + } + return rowsScanned; +} + +} // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/search_lib/ClpQueryRunner.h b/velox/connectors/clp/search_lib/ClpQueryRunner.h new file mode 100644 index 000000000000..643988f97f6c --- /dev/null +++ b/velox/connectors/clp/search_lib/ClpQueryRunner.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +#include "clp_s/SchemaReader.hpp" +#include "clp_s/search/Projection.hpp" +#include "clp_s/search/QueryRunner.hpp" +#include "clp_s/search/SchemaMatch.hpp" +#include "clp_s/search/ast/Expression.hpp" +#include "velox/vector/FlatVector.h" + +namespace facebook::velox::connector::clp::search_lib { +/** + * This class extends the generic QueryRunner to support column projection and + * row filtering over CLP-S archives. It is used by the Velox CLP connector to + * efficiently identify matching rows and project relevant columns, which are + * then consumed by the ClpVectorLoader. + */ +class ClpQueryRunner : public clp_s::search::QueryRunner { + public: + // Constructor + ClpQueryRunner( + const std::shared_ptr& match, + const std::shared_ptr& expr, + const std::shared_ptr& archiveReader, + bool ignoreCase, + const std::shared_ptr& projection) + : clp_s::search::QueryRunner(match, expr, archiveReader, ignoreCase), + projection_(projection) {} + + /** + * Initializes the filter with schema information and column readers. + * @param schemaReader A pointer to the SchemaReader + * @param columnMap An unordered map associating column IDs with + * BaseColumnReader pointers. + */ + void init( + clp_s::SchemaReader* schemaReader, + std::unordered_map const& columnMap) + override; + + /** + * Fetches the next set of rows from the cursor. + * @param numRows The maximum number of rows to fetch. + * @param filteredRowIndices A vector to store the row indices that match the + * filter. + */ + uint64_t fetchNext( + uint64_t numRows, + const std::shared_ptr>& filteredRowIndices); + + /** + * @return A reference to the vector of BaseColumnReader pointers that + * represent the columns involved in the scanning operation. + */ + std::vector& getProjectedColumns() { + return projectedColumns_; + } + + private: + std::shared_ptr expr_; + std::shared_ptr schemaTree_; + std::shared_ptr projection_; + std::vector projectedColumns_; + + uint64_t curMessage_{}; + uint64_t numMessages_{}; +}; +} // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/search_lib/ClpVectorLoader.cpp b/velox/connectors/clp/search_lib/ClpVectorLoader.cpp new file mode 100644 index 000000000000..c05908bf1312 --- /dev/null +++ b/velox/connectors/clp/search_lib/ClpVectorLoader.cpp @@ -0,0 +1,155 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "velox/connectors/clp/search_lib/ClpVectorLoader.h" +#include "velox/vector/ComplexVector.h" + +namespace facebook::velox::connector::clp::search_lib { +ClpVectorLoader::ClpVectorLoader( + clp_s::BaseColumnReader* columnReader, + ColumnType nodeType, + std::shared_ptr> filteredRowIndices) + : columnReader_(columnReader), + nodeType_(nodeType), + filteredRowIndices_(std::move(filteredRowIndices)) {} + +template +void ClpVectorLoader::populateData(RowSet rows, VectorPtr vector) { + if (columnReader_ == nullptr) { + for (int vectorIndex : rows) { + vector->setNull(vectorIndex, true); + } + return; + } + + for (int vectorIndex : rows) { + auto messageIndex = (*filteredRowIndices_)[vectorIndex]; + + if constexpr (std::is_same_v) { + auto string_value = + std::get(columnReader_->extract_value(messageIndex)); + vector->set(vectorIndex, StringView(string_value)); + } else { + vector->set( + vectorIndex, std::get(columnReader_->extract_value(messageIndex))); + } + + vector->setNull(vectorIndex, false); + } +} + +void ClpVectorLoader::loadInternal( + RowSet rows, + ValueHook* hook, + vector_size_t resultSize, + VectorPtr* result) { + if (!result) { + VELOX_USER_FAIL("vector is null"); + } + + auto vector = *result; + switch (nodeType_) { + case ColumnType::Integer: { + auto intVector = vector->asFlatVector(); + populateData(rows, intVector); + break; + } + case ColumnType::Float: { + auto floatVector = vector->asFlatVector(); + populateData(rows, floatVector); + break; + } + case ColumnType::Boolean: { + auto boolVector = vector->asFlatVector(); + populateData(rows, boolVector); + break; + } + case ColumnType::String: { + auto stringVector = vector->asFlatVector(); + populateData(rows, stringVector); + break; + } + case ColumnType::Array: { + auto arrayVector = std::dynamic_pointer_cast(vector); + auto elements = arrayVector->elements()->asFlatVector(); + vector_size_t elementIndex = 0; + + for (int vectorIndex : rows) { + auto messageIndex = (*filteredRowIndices_)[vectorIndex]; + + auto jsonString = + std::get(columnReader_->extract_value(messageIndex)); + + simdjson::padded_string padded(jsonString); + simdjson::ondemand::document doc; + try { + doc = arrayParser_->iterate(padded); + } catch (const simdjson::simdjson_error& e) { + VELOX_FAIL("JSON parse error at row {}: {}", vectorIndex, e.what()); + } + + simdjson::ondemand::array array; + try { + array = doc.get_array(); + } catch (const simdjson::simdjson_error& e) { + VELOX_FAIL( + "Expected JSON array at row {}: {}", vectorIndex, e.what()); + } + + std::vector arrayElements; + for (auto arrayElement : array) { + arrayElements.emplace_back( + simdjson::to_json_string(arrayElement).value()); + } + + if (elementIndex + arrayElements.size() > elements->size()) { + size_t newSize = std::max( + elementIndex + arrayElements.size(), + static_cast(elements->size()) * 2); + elements->resize(newSize); + } + + arrayVector->setOffsetAndSize( + vectorIndex, elementIndex, arrayElements.size()); + for (auto& arrayElement : arrayElements) { + elements->set(elementIndex++, StringView(arrayElement)); + } + + arrayVector->setNull(vectorIndex, false); + } + break; + } + default: + VELOX_FAIL("Unsupported column type"); + } +} + +// Explicit template instantiations for linker +template void ClpVectorLoader::populateData( + RowSet rows, + FlatVector* vector); +template void ClpVectorLoader::populateData( + RowSet rows, + FlatVector* vector); +template void ClpVectorLoader::populateData( + RowSet rows, + FlatVector* vector); +template void ClpVectorLoader::populateData( + RowSet rows, + FlatVector* vector); +} // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/search_lib/ClpVectorLoader.h b/velox/connectors/clp/search_lib/ClpVectorLoader.h new file mode 100644 index 000000000000..98779e404aff --- /dev/null +++ b/velox/connectors/clp/search_lib/ClpVectorLoader.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "clp_s/ColumnReader.hpp" +#include "velox/connectors/clp/search_lib/ClpCursor.h" +#include "velox/vector/LazyVector.h" + +namespace facebook::velox::connector::clp::search_lib { +/** + * ClpVectorLoader is a custom Velox VectorLoader that populates Velox vectors + * from a CLP-based column reader. It supports various column types including + * integers, floats, booleans, strings, and arrays of strings. + */ +class ClpVectorLoader : public VectorLoader { + public: + ClpVectorLoader( + clp_s::BaseColumnReader* columnReader, + ColumnType nodeType, + std::shared_ptr> filteredRowIndices); + + private: + void loadInternal( + RowSet rows, + ValueHook* hook, + vector_size_t resultSize, + VectorPtr* result) override; + + template + void populateData(RowSet rows, VectorPtr vector); + + clp_s::BaseColumnReader* columnReader_; + ColumnType nodeType_; + std::shared_ptr> filteredRowIndices_; + + inline static thread_local std::unique_ptr + arrayParser_ = std::make_unique(); +}; +} // namespace facebook::velox::connector::clp::search_lib diff --git a/velox/connectors/clp/tests/CMakeLists.txt b/velox/connectors/clp/tests/CMakeLists.txt new file mode 100644 index 000000000000..6c0c21473665 --- /dev/null +++ b/velox/connectors/clp/tests/CMakeLists.txt @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +add_executable(velox_clp_connector_test ClpConnectorTest.cpp) + +add_test(velox_clp_connector_test velox_clp_connector_test) + +target_link_libraries( + velox_clp_connector_test + velox_clp_connector + velox_vector_test_lib + velox_exec_test_lib + GTest::gtest + GTest::gtest_main) + +file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/examples + DESTINATION ${CMAKE_CURRENT_BINARY_DIR}) diff --git a/velox/connectors/clp/tests/ClpConnectorTest.cpp b/velox/connectors/clp/tests/ClpConnectorTest.cpp new file mode 100644 index 000000000000..c7473bf99803 --- /dev/null +++ b/velox/connectors/clp/tests/ClpConnectorTest.cpp @@ -0,0 +1,317 @@ +/* + * Copyright (c) Facebook, Inc. and its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include "velox/common/base/Fs.h" +#include "velox/common/base/tests/GTestUtils.h" +#include "velox/connectors/clp/ClpColumnHandle.h" +#include "velox/connectors/clp/ClpConnector.h" +#include "velox/connectors/clp/ClpConnectorSplit.h" +#include "velox/connectors/clp/ClpTableHandle.h" +#include "velox/exec/tests/utils/AssertQueryBuilder.h" +#include "velox/exec/tests/utils/OperatorTestBase.h" +#include "velox/exec/tests/utils/PlanBuilder.h" + +namespace { + +using namespace facebook::velox; +using namespace facebook::velox::connector::clp; + +using facebook::velox::exec::test::PlanBuilder; + +class ClpConnectorTest : public exec::test::OperatorTestBase { + public: + const std::string kClpConnectorId = "test-clp"; + + void SetUp() override { + OperatorTestBase::SetUp(); + connector::registerConnectorFactory( + std::make_shared()); + auto clpConnector = + connector::getConnectorFactory( + connector::clp::ClpConnectorFactory::kClpConnectorName) + ->newConnector( + kClpConnectorId, + std::make_shared( + std::unordered_map{ + {"clp.split-source", "local"}})); + connector::registerConnector(clpConnector); + } + + void TearDown() override { + connector::unregisterConnector(kClpConnectorId); + connector::unregisterConnectorFactory( + connector::clp::ClpConnectorFactory::kClpConnectorName); + OperatorTestBase::TearDown(); + } + + exec::Split makeClpSplit(const std::string& splitPath) { + return exec::Split( + std::make_shared(kClpConnectorId, splitPath)); + } + + RowVectorPtr getResults( + const core::PlanNodePtr& planNode, + std::vector&& splits) { + return exec::test::AssertQueryBuilder(planNode) + .splits(std::move(splits)) + .copyResults(pool()); + } + + static std::string getExampleFilePath(const std::string& filePath) { + std::string current_path = fs::current_path().string(); + return current_path + "/examples/" + filePath; + } +}; + +TEST_F(ClpConnectorTest, test1NoPushdown) { + auto plan = PlanBuilder() + .startTableScan() + .outputType( + ROW({"requestId", "userId", "method"}, + {VARCHAR(), VARCHAR(), VARCHAR()})) + .tableHandle(std::make_shared( + kClpConnectorId, + "test_1", + ClpTableHandle::StorageType::kFS, + nullptr)) + .assignments({ + {"requestId", + std::make_shared( + "requestId", "requestId", VARCHAR(), true)}, + {"userId", + std::make_shared( + "userId", "userId", VARCHAR(), true)}, + {"method", + std::make_shared( + "method", "method", VARCHAR(), true)}, + }) + .endTableScan() + .filter("method = 'GET'") + .planNode(); + + auto output = + getResults(plan, {makeClpSplit(getExampleFilePath("test_1.clps"))}); + auto expected = makeRowVector( + {// requestId + makeFlatVector( + {"req-100", "req-105", "req-107", "req-109", "req-102"}), + // userId + makeNullableFlatVector( + {"user201", "user204", "user202", "user203", std::nullopt}), + // method + makeFlatVector({ + "GET", + "GET", + "GET", + "GET", + "GET", + })}); + test::assertEqualVectors(expected, output); +} + +TEST_F(ClpConnectorTest, test1Pushdown) { + auto plan = PlanBuilder() + .startTableScan() + .outputType( + ROW({"requestId", "userId", "path"}, + {VARCHAR(), VARCHAR(), VARCHAR()})) + .tableHandle(std::make_shared( + kClpConnectorId, + "test_1", + ClpTableHandle::StorageType::kFS, + std::make_shared( + "method: \"POST\" AND status: 200"))) + .assignments({ + {"requestId", + std::make_shared( + "requestId", "requestId", VARCHAR(), true)}, + {"userId", + std::make_shared( + "userId", "userId", VARCHAR(), true)}, + {"path", + std::make_shared( + "path", "path", VARCHAR(), true)}, + }) + .endTableScan() + .planNode(); + + auto output = + getResults(plan, {makeClpSplit(getExampleFilePath("test_1.clps"))}); + auto expected = + makeRowVector({// requestId + makeFlatVector({"req-106"}), + // userId + makeNullableFlatVector({std::nullopt}), + // path + makeFlatVector({"/auth/login"})}); + test::assertEqualVectors(expected, output); +} + +TEST_F(ClpConnectorTest, test2NoPushdown) { + auto plan = + PlanBuilder(pool_.get()) + .startTableScan() + .outputType( + ROW({"timestamp", "event"}, + {VARCHAR(), + ROW({"type", "subtype", "severity"}, + {VARCHAR(), VARCHAR(), VARCHAR()})})) + .tableHandle(std::make_shared( + kClpConnectorId, + "test_2", + ClpTableHandle::StorageType::kFS, + nullptr)) + .assignments( + {{"timestamp", + std::make_shared( + "timestamp", "timestamp", VARCHAR(), true)}, + {"event", + std::make_shared( + "event", + "event", + ROW({"type", "subtype", "severity"}, + {VARCHAR(), VARCHAR(), VARCHAR()}), + true)}}) + .endTableScan() + .filter( + "event.severity IN ('WARNING', 'ERROR') AND " + "((event.type = 'network' AND event.subtype = 'connection') OR " + "(event.type = 'storage' AND event.subtype LIKE 'disk_usage%'))") + .planNode(); + + auto output = + getResults(plan, {makeClpSplit(getExampleFilePath("test_2.clps"))}); + auto expected = + makeRowVector({// timestamp + makeFlatVector({"2025-04-30T08:50:05Z"}), + // event + makeRowVector({ + // event.type + makeFlatVector({"storage"}), + // event.subtype + makeFlatVector({"disk_usage"}), + // event.severity + makeFlatVector({"WARNING"}), + })}); + test::assertEqualVectors(expected, output); +} + +TEST_F(ClpConnectorTest, test2Pushdown) { + auto plan = + PlanBuilder() + .startTableScan() + .outputType( + ROW({"timestamp", "event"}, + {VARCHAR(), + ROW({"type", "subtype", "severity"}, + {VARCHAR(), VARCHAR(), VARCHAR()})})) + .tableHandle(std::make_shared( + kClpConnectorId, + "test_2", + ClpTableHandle::StorageType::kFS, + std::make_shared( + "(event.severity: \"WARNING\" OR event.severity: \"ERROR\") AND " + "((event.type: \"network\" AND event.subtype: \"connection\") OR " + "(event.type: \"storage\" AND event.subtype: \"disk*\"))"))) + .assignments( + {{"timestamp", + std::make_shared( + "timestamp", "timestamp", VARCHAR(), true)}, + {"event", + std::make_shared( + "event", + "event", + ROW({"type", "subtype", "severity"}, + {VARCHAR(), VARCHAR(), VARCHAR()}), + true)}}) + .endTableScan() + .planNode(); + + auto output = + getResults(plan, {makeClpSplit(getExampleFilePath("test_2.clps"))}); + auto expected = + makeRowVector({// timestamp + makeFlatVector({"2025-04-30T08:50:05Z"}), + // event + makeRowVector({ + // event.type + makeFlatVector({"storage"}), + // event.subtype + makeFlatVector({"disk_usage"}), + // event.severity + makeFlatVector({"WARNING"}), + })}); + test::assertEqualVectors(expected, output); +} + +TEST_F(ClpConnectorTest, test2Hybrid) { + auto plan = + PlanBuilder(pool_.get()) + .startTableScan() + .outputType( + ROW({"timestamp", "event"}, + {VARCHAR(), + ROW({"type", "subtype", "severity"}, + {VARCHAR(), VARCHAR(), VARCHAR()})})) + .tableHandle(std::make_shared( + kClpConnectorId, + "test_2", + ClpTableHandle::StorageType::kFS, + std::make_shared( + "((event.type: \"network\" AND event.subtype: \"connection\") OR " + "(event.type: \"storage\" AND event.subtype: \"disk*\"))"))) + .assignments( + {{"timestamp", + std::make_shared( + "timestamp", "timestamp", VARCHAR(), true)}, + {"event", + std::make_shared( + "event", + "event", + ROW({"type", "subtype", "severity"}, + {VARCHAR(), VARCHAR(), VARCHAR()}), + true)}}) + .endTableScan() + .filter("upper(event.severity) IN ('WARNING', 'ERROR')") + .planNode(); + + auto output = + getResults(plan, {makeClpSplit(getExampleFilePath("test_2.clps"))}); + auto expected = + makeRowVector({// timestamp + makeFlatVector({"2025-04-30T08:50:05Z"}), + // event + makeRowVector({ + // event.type + makeFlatVector({"storage"}), + // event.subtype + makeFlatVector({"disk_usage"}), + // event.severity + makeFlatVector({"WARNING"}), + })}); + test::assertEqualVectors(expected, output); +} + +} // namespace + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + folly::Init init{&argc, &argv, false}; + return RUN_ALL_TESTS(); +} diff --git a/velox/connectors/clp/tests/examples/test_1.clps b/velox/connectors/clp/tests/examples/test_1.clps new file mode 100644 index 000000000000..9538d88c997f Binary files /dev/null and b/velox/connectors/clp/tests/examples/test_1.clps differ diff --git a/velox/connectors/clp/tests/examples/test_1.ndjson b/velox/connectors/clp/tests/examples/test_1.ndjson new file mode 100644 index 000000000000..aca627d5fadd --- /dev/null +++ b/velox/connectors/clp/tests/examples/test_1.ndjson @@ -0,0 +1,10 @@ +{"timestamp": "2025-04-30T08:45:00Z", "requestId": "req-100", "userId": "user201", "method": "GET", "path": "/api/users/1", "responseTimeMs": 25, "status": 200} +{"timestamp": "2025-04-30T08:45:05Z", "requestId": "req-101", "userId": "user202", "method": "POST", "path": "/api/orders", "responseTimeMs": 110, "status": 201} +{"timestamp": "2025-04-30T08:45:10Z", "requestId": "req-102", "userId": null, "method": "GET", "path": "/public/products", "responseTimeMs": 18, "status": 200} +{"timestamp": "2025-04-30T08:45:15Z", "requestId": "req-103", "userId": "user203", "method": "PUT", "path": "/api/items/5", "responseTimeMs": 95, "status": 200} +{"timestamp": "2025-04-30T08:45:20Z", "requestId": "req-104", "userId": "user201", "method": "DELETE", "path": "/api/notifications/3", "responseTimeMs": 32, "status": 204} +{"timestamp": "2025-04-30T08:45:25Z", "requestId": "req-105", "userId": "user204", "method": "GET", "path": "/api/dashboard", "responseTimeMs": 155, "status": 200} +{"timestamp": "2025-04-30T08:45:30Z", "requestId": "req-106", "userId": null, "method": "POST", "path": "/auth/login", "responseTimeMs": 68, "status": 200} +{"timestamp": "2025-04-30T08:45:35Z", "requestId": "req-107", "userId": "user202", "method": "GET", "path": "/api/users/2/details", "responseTimeMs": 41, "status": 200} +{"timestamp": "2025-04-30T08:45:40Z", "requestId": "req-108", "userId": "user205", "method": "PATCH", "path": "/api/settings", "responseTimeMs": 128, "status": 200} +{"timestamp": "2025-04-30T08:45:45Z", "requestId": "req-109", "userId": "user203", "method": "GET", "path": "/api/products?category=books", "responseTimeMs": 88, "status": 200} diff --git a/velox/connectors/clp/tests/examples/test_2.clps b/velox/connectors/clp/tests/examples/test_2.clps new file mode 100644 index 000000000000..a4c84a4a9101 Binary files /dev/null and b/velox/connectors/clp/tests/examples/test_2.clps differ diff --git a/velox/connectors/clp/tests/examples/test_2.ndjson b/velox/connectors/clp/tests/examples/test_2.ndjson new file mode 100644 index 000000000000..b67558643448 --- /dev/null +++ b/velox/connectors/clp/tests/examples/test_2.ndjson @@ -0,0 +1,10 @@ +{"timestamp": "2025-04-30T08:50:00Z", "event": {"type": "network", "subtype": "connection", "severity": "INFO", "details": {"source": {"ip": "192.168.3.10"}, "destination": {"ip": "10.0.1.5", "port": 80}}}} +{"timestamp": "2025-04-30T08:50:05Z", "event": {"type": "storage", "subtype": "disk_usage", "severity": "WARNING", "details": {"mount": "/var/log", "usage": {"percent": 92}}}} +{"timestamp": "2025-04-30T08:50:10Z", "event": {"type": "process", "subtype": "start", "severity": "INFO", "details": {"name": "cron", "id": 1234}}} +{"timestamp": "2025-04-30T08:50:15Z", "event": {"type": "authentication", "subtype": "failure", "severity": "ERROR", "user": {"name": "guest", "method": "ssh"}, "details": {"reason": "invalid password", "ip": "203.0.113.20"}}} +{"timestamp": "2025-04-30T08:50:20Z", "event": {"type": "service", "subtype": "status", "severity": "INFO", "details": {"name": "httpd", "state": "running", "uptime": {"seconds": 3600}}}} +{"timestamp": "2025-04-30T08:50:25Z", "event": {"type": "security", "subtype": "alert", "severity": "HIGH", "details": {"rule": {"id": "SQLI-001"}, "target": "/api/data", "payload": "SELECT * FROM users;"}}} +{"timestamp": "2025-04-30T08:50:30Z", "event": {"type": "memory", "subtype": "usage", "severity": "NORMAL", "details": {"total": {"gb": 16}, "used": {"gb": 8.5}, "free": {"gb": 7.5}}}} +{"timestamp": "2025-04-30T08:50:35Z", "event": {"type": "configuration", "subtype": "change", "severity": "INFO", "user": {"name": "admin"}, "details": {"component": "database", "setting": {"name": "timeout", "oldValue": 30, "newValue": 60}}}} +{"timestamp": "2025-04-30T08:50:40Z", "event": {"type": "file", "subtype": "access", "severity": "WARNING", "user": {"name": "user101"}, "details": {"operation": "read", "path": "/etc/shadow", "permissions": "rw-------"}}} +{"timestamp": "2025-04-30T08:50:45Z", "event": {"type": "backup", "subtype": "status", "severity": "INFO", "details": {"job": {"name": "daily_backup", "status": "completed"}, "duration": {"seconds": 1200}, "size": {"gb": 25.3}}}} diff --git a/velox/docs/develop/connectors.rst b/velox/docs/develop/connectors.rst index 2d7ac8a011f3..b5b04a70c3ac 100644 --- a/velox/docs/develop/connectors.rst +++ b/velox/docs/develop/connectors.rst @@ -29,8 +29,8 @@ Connector Interface * - Connector Factory - Enables creating instances of a particular connector. -Velox provides Hive and TPC-H Connectors out of the box. -Let's see how the above connector interfaces are implemented in the Hive Connector in detail below. +Velox provides Hive, CLP and TPC-H Connectors out of the box. +Let's examine the implementation details of both the Hive and CLP Connectors as examples Hive Connector -------------- @@ -121,3 +121,30 @@ This is the behavior when the proxy settings are enabled: 4. The no_proxy/NO_PROXY list is comma separated. 5. Use . or \*. to indicate domain suffix matching, e.g. `.foobar.com` will match `test.foobar.com` or `foo.foobar.com`. + +CLP Connector +------------- +The CLP Connector is used to read CLP archives stored in a local file system or S3. It implements similar +interfaces as the Hive Connector except for the `DataSink` interface. Here we only describe the `DataSource` +interface and the `ConnectorSplit` interface implementation since `Connector` and `ConnectorFactory` are +similar to the Hive Connector. + +ClpConnectorSplit +~~~~~~~~~~~~~~~~~ +The ClpConnectorSplit describes a data chunk using `split_path`. For now, only archive format is supported. +The `split_path` is the path to the archive file. + +ClpDataSource +~~~~~~~~~~~~~ +The ClpDataSource implements the `addSplit` API that consumes a ClpConnectorSplit and `next` API that +processes the split and returns a batch of rows. + +During initialization, it records the KQL query and archive source (S3 or local). It then iterates through each +output column, accessing its handle to get its type and original name, and specifically for row types, it +recursively traverses the nested structure to process each field; for non-row types, it directly maps the +Velox column type to a CLP column type. + +When a split is added, a `ClpCursor` is created with the archive path and input source. The query is parsed +and simplified into an AST. On `next`, the cursor finds matching row indices and, if any exist, `ClpDataSource` +recursively creates a row vector composed of lazy vectors, which use CLP column readers to decode and load data +as needed during execution. \ No newline at end of file