Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions velox/connectors/clp/search_lib/ir/ClpIrCursor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ uint64_t ClpIrCursor::fetchNext(uint64_t numRows) {
}

size_t ClpIrCursor::getNumFilteredRows() const {
return irDeserializer_->get_ir_unit_handler().getFilteredLogEvents()->size();
return filteredLogEvents_->size();
}

VectorPtr ClpIrCursor::createVector(
memory::MemoryPool* pool,
const TypePtr& vectorType,
size_t vectorSize) {
VELOX_CHECK_EQ(
VELOX_CHECK_LE(
projectedColumnIdxNodeIdsMap_.size(),
outputColumns_.size(),
"Resolved node-id map size ({}) must not exceed projected columns ({})",
Expand All @@ -72,7 +72,9 @@ ErrorCode ClpIrCursor::loadSplit() {
? NetworkAuthOption{.method = AuthMethod::None}
: NetworkAuthOption{.method = AuthMethod::S3PresignedUrlV4};

auto irHandler = ClpIrUnitHandler{};
filteredLogEvents_ = std::make_shared<
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>();
auto irHandler = ClpIrUnitHandler{filteredLogEvents_};

auto projections = splitFieldsToNamesAndTypes();
auto queryHandlerResult{QueryHandlerType::create(
Expand All @@ -86,17 +88,31 @@ ErrorCode ClpIrCursor::loadSplit() {
}
auto queryHandler = std::move(queryHandlerResult).value();

if (splitPath_.starts_with("http")) {
// Handle the case that the URL is not a S3 URL and does not need auth
inputSource_ = InputSource::Network;
}
auto irPath = Path{.source = inputSource_, .path = splitPath_};
irReader_ = try_create_reader(irPath, networkAuthOption);
if (nullptr == irReader_) {
irReaderZstdWrapper_ =
std::make_shared<::clp::streaming_compression::zstd::Decompressor>();
constexpr size_t cReaderBufferSize{64L * 1024L};
irReaderZstdWrapper_->open(*irReader_, cReaderBufferSize);
if (nullptr == irReaderZstdWrapper_) {
VLOG(2) << "Failed to open kv-ir stream \"" << splitPath_
<< "\" for reading.";
return ErrorCode::InternalError;
}

auto deserializerResult = ::clp::ffi::ir_stream::make_deserializer(
*irReader_, std::move(irHandler), std::move(queryHandler));
*irReaderZstdWrapper_, irHandler, std::move(queryHandler));
if (!deserializerResult) {
if (deserializerResult.has_error()) {
auto error = deserializerResult.error();
VLOG(2) << "Failed to create deserializer for deserialization, error: "
<< error.message();
return ErrorCode::InternalError;
}
VLOG(2) << "Failed to create deserializer for deserialization.";
return ErrorCode::InternalError;
}
Expand Down Expand Up @@ -147,11 +163,11 @@ ClpIrCursor::splitFieldsToNamesAndTypes() const {

ystdlib::error_handling::Result<void> ClpIrCursor::deserialize(
uint64_t numRows) {
irDeserializer_->get_ir_unit_handler().clearFilteredLogEvents();
filteredLogEvents_->clear();
uint64_t cnt{0};
while (cnt < numRows) {
auto deserializeResult =
irDeserializer_->deserialize_next_ir_unit(*irReader_);
irDeserializer_->deserialize_next_ir_unit(*irReaderZstdWrapper_);
if (deserializeResult.has_error()) {
auto error = deserializeResult.error();
if (std::errc::result_out_of_range == error ||
Expand Down Expand Up @@ -201,7 +217,7 @@ VectorPtr ClpIrCursor::createVectorHelper(
vectorType,
vectorSize,
std::make_unique<ClpIrVectorLoader>(
irDeserializer_->get_ir_unit_handler().getFilteredLogEvents(),
filteredLogEvents_,
isResolved,
std::move(projectedColumnNodeIds),
projectedColumn.name,
Expand Down
6 changes: 6 additions & 0 deletions velox/connectors/clp/search_lib/ir/ClpIrCursor.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "clp/streaming_compression/zstd/Decompressor.hpp"
#include "ffi/ir_stream/Deserializer.hpp"
#include "velox/connectors/clp/search_lib/BaseClpCursor.h"
#include "velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h"
Expand Down Expand Up @@ -72,11 +73,16 @@ class ClpIrCursor final : public BaseClpCursor {
};
using QueryHandlerType = ::clp::ffi::ir_stream::search::QueryHandler<
decltype(projectionResolutionCallback_)>;
std::shared_ptr<
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>
filteredLogEvents_{nullptr};
bool ignoreCase_;
std::shared_ptr<
::clp::ffi::ir_stream::Deserializer<ClpIrUnitHandler, QueryHandlerType>>
irDeserializer_;
std::shared_ptr<::clp::ReaderInterface> irReader_{nullptr};
std::shared_ptr<::clp::streaming_compression::zstd::Decompressor>
irReaderZstdWrapper_{nullptr};
std::unordered_map<size_t, std::vector<::clp::ffi::SchemaTree::Node::id_t>>
projectedColumnIdxNodeIdsMap_;
size_t readerIndex_{0};
Expand Down
19 changes: 5 additions & 14 deletions velox/connectors/clp/search_lib/ir/ClpIrUnitHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,11 @@ namespace facebook::velox::connector::clp::search_lib {

class ClpIrUnitHandler {
public:
ClpIrUnitHandler() {
filteredLogEvents_ = std::make_shared<
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>();
}
ClpIrUnitHandler(
std::shared_ptr<
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>
filteredLogEvents)
: filteredLogEvents_(filteredLogEvents) {}

// Destructor
~ClpIrUnitHandler() = default;
Expand Down Expand Up @@ -67,16 +68,6 @@ class ClpIrUnitHandler {
return ::clp::ffi::ir_stream::IRErrorCode::IRErrorCode_Success;
}

std::shared_ptr<
const std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>
getFilteredLogEvents() const {
return filteredLogEvents_;
}

void clearFilteredLogEvents() {
filteredLogEvents_->clear();
}

private:
std::shared_ptr<
std::vector<std::unique_ptr<::clp::ffi::KeyValuePairLogEvent>>>
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/clp/search_lib/ir/ClpIrVectorLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void ClpIrVectorLoader::loadInternal(
break;
}
case ColumnType::Float: {
auto floatVector = vector->asFlatVector<float>();
auto floatVector = vector->asFlatVector<double>();
floatVector->set(
vectorIndex,
value->get_immutable_view<::clp::ffi::value_float_t>());
Expand Down
Loading