diff --git a/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp b/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp index 1f51bf83cfda..97af75998141 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp +++ b/velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp @@ -51,14 +51,14 @@ uint64_t ClpIrCursor::fetchNext(uint64_t numRows) { } size_t ClpIrCursor::getNumFilteredRows() const { - return irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()->size(); + return filteredLogEvents_->size(); } VectorPtr ClpIrCursor::createVector( memory::MemoryPool* pool, const TypePtr& vectorType, size_t vectorSize) { - VELOX_CHECK_EQ( + VELOX_CHECK_LE( projectedColumnIdxNodeIdsMap_.size(), outputColumns_.size(), "Resolved node-id map size ({}) must not exceed projected columns ({})", @@ -72,7 +72,9 @@ ErrorCode ClpIrCursor::loadSplit() { ? NetworkAuthOption{.method = AuthMethod::None} : NetworkAuthOption{.method = AuthMethod::S3PresignedUrlV4}; - auto irHandler = ClpIrUnitHandler{}; + filteredLogEvents_ = std::make_shared< + std::vector>>(); + auto irHandler = ClpIrUnitHandler{filteredLogEvents_}; auto projections = splitFieldsToNamesAndTypes(); auto queryHandlerResult{QueryHandlerType::create( @@ -86,17 +88,31 @@ ErrorCode ClpIrCursor::loadSplit() { } auto queryHandler = std::move(queryHandlerResult).value(); + if (splitPath_.starts_with("http")) { + // Handle the case that the URL is not a S3 URL and does not need auth + inputSource_ = InputSource::Network; + } auto irPath = Path{.source = inputSource_, .path = splitPath_}; irReader_ = try_create_reader(irPath, networkAuthOption); - if (nullptr == irReader_) { + irReaderZstdWrapper_ = + std::make_shared<::clp::streaming_compression::zstd::Decompressor>(); + constexpr size_t cReaderBufferSize{64L * 1024L}; + irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize); + if (nullptr == irReaderZstdWrapper_) { VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_ << "\" for reading."; return ErrorCode::InternalError; } auto deserializerResult = ::clp::ffi::ir_stream::make_deserializer( - *irReader_, std::move(irHandler), std::move(queryHandler)); + *irReaderZstdWrapper_, irHandler, std::move(queryHandler)); if (!deserializerResult) { + if (deserializerResult.has_error()) { + auto error = deserializerResult.error(); + VLOG(2) << "Failed to create deserializer for deserialization, error: " + << error.message(); + return ErrorCode::InternalError; + } VLOG(2) << "Failed to create deserializer for deserialization."; return ErrorCode::InternalError; } @@ -147,11 +163,11 @@ ClpIrCursor::splitFieldsToNamesAndTypes() const { ystdlib::error_handling::Result ClpIrCursor::deserialize( uint64_t numRows) { - irDeserializer_->get_ir_unit_handler().clearFilteredLogEvents(); + filteredLogEvents_->clear(); uint64_t cnt{0}; while (cnt < numRows) { auto deserializeResult = - irDeserializer_->deserialize_next_ir_unit(*irReader_); + irDeserializer_->deserialize_next_ir_unit(*irReaderZstdWrapper_); if (deserializeResult.has_error()) { auto error = deserializeResult.error(); if (std::errc::result_out_of_range == error || @@ -201,7 +217,7 @@ VectorPtr ClpIrCursor::createVectorHelper( vectorType, vectorSize, std::make_unique( - irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(), + filteredLogEvents_, isResolved, std::move(projectedColumnNodeIds), projectedColumn.name, diff --git a/velox/connectors/clp/search_lib/ir/ClpIrCursor.h b/velox/connectors/clp/search_lib/ir/ClpIrCursor.h index c583a5b228ea..0436e82090ec 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrCursor.h +++ b/velox/connectors/clp/search_lib/ir/ClpIrCursor.h @@ -16,6 +16,7 @@ #pragma once +#include "clp/streaming_compression/zstd/Decompressor.hpp" #include "ffi/ir_stream/Deserializer.hpp" #include "velox/connectors/clp/search_lib/BaseClpCursor.h" #include "velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h" @@ -72,11 +73,16 @@ class ClpIrCursor final : public BaseClpCursor { }; using QueryHandlerType = ::clp::ffi::ir_stream::search::QueryHandler< decltype(projectionResolutionCallback_)>; + std::shared_ptr< + std::vector>> + filteredLogEvents_{nullptr}; bool ignoreCase_; std::shared_ptr< ::clp::ffi::ir_stream::Deserializer> irDeserializer_; std::shared_ptr<::clp::ReaderInterface> irReader_{nullptr}; + std::shared_ptr<::clp::streaming_compression::zstd::Decompressor> + irReaderZstdWrapper_{nullptr}; std::unordered_map> projectedColumnIdxNodeIdsMap_; size_t readerIndex_{0}; diff --git a/velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h b/velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h index e4c2ca77fe3d..5a3855d2c897 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h +++ b/velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h @@ -28,10 +28,11 @@ namespace facebook::velox::connector::clp::search_lib { class ClpIrUnitHandler { public: - ClpIrUnitHandler() { - filteredLogEvents_ = std::make_shared< - std::vector>>(); - } + ClpIrUnitHandler( + std::shared_ptr< + std::vector>> + filteredLogEvents) + : filteredLogEvents_(filteredLogEvents) {} // Destructor ~ClpIrUnitHandler() = default; @@ -67,16 +68,6 @@ class ClpIrUnitHandler { return ::clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success; } - std::shared_ptr< - const std::vector>> - getFilteredLogEvents() const { - return filteredLogEvents_; - } - - void clearFilteredLogEvents() { - filteredLogEvents_->clear(); - } - private: std::shared_ptr< std::vector>> diff --git a/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp b/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp index 09528a2ccfb0..f4320021964f 100644 --- a/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp +++ b/velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp @@ -85,7 +85,7 @@ void ClpIrVectorLoader::loadInternal( break; } case ColumnType::Float: { - auto floatVector = vector->asFlatVector(); + auto floatVector = vector->asFlatVector(); floatVector->set( vectorIndex, value->get_immutable_view<::clp::ffi::value_float_t>());