Skip to content
Closed
15 changes: 15 additions & 0 deletions c++/include/orc/Reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ namespace orc {
* Get the number of stripes to look ahead for small stripe prefetch.
*/
uint64_t getSmallStripeLookAheadLimit() const;

/**
* Set the maximum dictionary size threshold for evaluation.
*
* Dictionaries with more entries than this threshold will not be evaluated.
* 0 to disable dictionary filtering.
*
* Defaults to 0.
*/
RowReaderOptions& setDictionaryFilteringSizeThreshold(uint32_t threshold);

/**
* Get the dictionary filtering size threshold.
*/
uint32_t getDictionaryFilteringSizeThreshold() const;
};

class RowReader;
Expand Down
3 changes: 3 additions & 0 deletions c++/include/orc/sargs/Literal.hh
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include "orc/Int128.hh"
#include "orc/Vector.hh"

#include <string_view>

namespace orc {

/**
Expand Down Expand Up @@ -123,6 +125,7 @@ namespace orc {
Timestamp getTimestamp() const;
double getFloat() const;
std::string getString() const;
std::string_view getStringView() const;
bool getBool() const;
Decimal getDecimal() const;

Expand Down
1 change: 1 addition & 0 deletions c++/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ set(SOURCE_FILES
ConvertColumnReader.cc
CpuInfoUtil.cc
Dictionary.cc
DictionaryLoader.cc
Exceptions.cc
Geospatial.cc
Int128.cc
Expand Down
72 changes: 29 additions & 43 deletions c++/src/ColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@
* limitations under the License.
*/

#include "orc/Int128.hh"
#include "ColumnReader.hh"

#include <math.h>

#include <iostream>

#include "Adaptor.hh"
#include "ByteRLE.hh"
#include "ColumnReader.hh"
#include "ConvertColumnReader.hh"
#include "DictionaryLoader.hh"
#include "RLE.hh"
#include "SchemaEvolution.hh"
#include "orc/Exceptions.hh"
#include "orc/Int128.hh"

#include <math.h>
#include <iostream>
Expand All @@ -36,19 +41,6 @@ namespace orc {
// PASS
}

inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
switch (static_cast<int64_t>(kind)) {
case proto::ColumnEncoding_Kind_DIRECT:
case proto::ColumnEncoding_Kind_DICTIONARY:
return RleVersion_1;
case proto::ColumnEncoding_Kind_DIRECT_V2:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
return RleVersion_2;
default:
throw ParseError("Unknown encoding in convertRleVersion");
}
}

ColumnReader::ColumnReader(const Type& type, StripeStreams& stripe)
: columnId(type.getColumnId()),
memoryPool(stripe.getMemoryPool()),
Expand Down Expand Up @@ -519,7 +511,10 @@ namespace orc {
std::unique_ptr<RleDecoder> rle_;

public:
StringDictionaryColumnReader(const Type& type, StripeStreams& stipe);
StringDictionaryColumnReader(const Type& type, StripeStreams& stripe);

StringDictionaryColumnReader(const Type& type, StripeStreams& stripe,
const std::shared_ptr<StringDictionary> dictionary);
~StringDictionaryColumnReader() override;

uint64_t skip(uint64_t numValues) override;
Expand All @@ -533,39 +528,23 @@ namespace orc {

StringDictionaryColumnReader::StringDictionaryColumnReader(const Type& type,
StripeStreams& stripe)
: ColumnReader(type, stripe), dictionary_(new StringDictionary(stripe.getMemoryPool())) {
: StringDictionaryColumnReader(type, stripe, nullptr) {}

StringDictionaryColumnReader::StringDictionaryColumnReader(
const Type& type, StripeStreams& stripe, const std::shared_ptr<StringDictionary> dictionary)
: ColumnReader(type, stripe), dictionary_(dictionary) {
RleVersion rleVersion = convertRleVersion(stripe.getEncoding(columnId).kind());
uint32_t dictSize = stripe.getEncoding(columnId).dictionary_size();
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_DATA, true);
if (stream == nullptr) {
throw ParseError("DATA stream not found in StringDictionaryColumn");
}
rle_ = createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
stream = stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
if (dictSize > 0 && stream == nullptr) {
throw ParseError("LENGTH stream not found in StringDictionaryColumn");
}
std::unique_ptr<RleDecoder> lengthDecoder =
createRleDecoder(std::move(stream), false, rleVersion, memoryPool, metrics);
dictionary_->dictionaryOffset.resize(dictSize + 1);
int64_t* lengthArray = dictionary_->dictionaryOffset.data();
lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
lengthArray[0] = 0;
for (uint32_t i = 1; i < dictSize + 1; ++i) {
if (lengthArray[i] < 0) {
throw ParseError("Negative dictionary entry length");
}
lengthArray[i] += lengthArray[i - 1];
}
int64_t blobSize = lengthArray[dictSize];
dictionary_->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
std::unique_ptr<SeekableInputStream> blobStream =
stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
if (blobSize > 0 && blobStream == nullptr) {
throw ParseError("DICTIONARY_DATA stream not found in StringDictionaryColumn");

// If no dictionary was provided, load it
if (!dictionary_) {
dictionary_ = loadStringDictionary(columnId, stripe, memoryPool);
}
readFully(dictionary_->dictionaryBlob.data(), blobSize, blobStream.get());
}

StringDictionaryColumnReader::~StringDictionaryColumnReader() {
Expand Down Expand Up @@ -1717,8 +1696,15 @@ namespace orc {
case GEOGRAPHY:
switch (static_cast<int64_t>(stripe.getEncoding(type.getColumnId()).kind())) {
case proto::ColumnEncoding_Kind_DICTIONARY:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
return std::make_unique<StringDictionaryColumnReader>(type, stripe);
case proto::ColumnEncoding_Kind_DICTIONARY_V2: {
// Check if we have a pre-loaded dictionary we can use
auto dictionary = stripe.getSharedDictionary(type.getColumnId());
if (dictionary) {
return std::make_unique<StringDictionaryColumnReader>(type, stripe, dictionary);
} else {
return std::unique_ptr<ColumnReader>(new StringDictionaryColumnReader(type, stripe));
}
}
case proto::ColumnEncoding_Kind_DIRECT:
case proto::ColumnEncoding_Kind_DIRECT_V2:
return std::make_unique<StringDirectColumnReader>(type, stripe);
Expand Down
7 changes: 7 additions & 0 deletions c++/src/ColumnReader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ namespace orc {
* @return the number of scale digits
*/
virtual int32_t getForcedScaleOnHive11Decimal() const = 0;

/**
* Get a shared dictionary for the given column if available.
* @param columnId the id of the column
* @return shared pointer to the StringDictionary or nullptr if not available
*/
virtual std::shared_ptr<StringDictionary> getSharedDictionary(uint64_t columnId) const = 0;

/**
* Whether decimals that have precision <=18 are encoded as fixed scale and values
Expand Down
100 changes: 100 additions & 0 deletions c++/src/DictionaryLoader.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "DictionaryLoader.hh"
#include "RLE.hh"

namespace orc {

namespace {

// Helper function to read data fully from a stream
void readFully(char* buffer, int64_t bufferSize, SeekableInputStream* stream) {
int64_t posn = 0;
while (posn < bufferSize) {
const void* chunk;
int length;
if (!stream->Next(&chunk, &length)) {
throw ParseError("bad read in readFully");
}
if (posn + length > bufferSize) {
throw ParseError("Corrupt dictionary blob");
}
memcpy(buffer + posn, chunk, static_cast<size_t>(length));
posn += length;
}
}

} // namespace

std::shared_ptr<StringDictionary> loadStringDictionary(uint64_t columnId, StripeStreams& stripe,
MemoryPool& pool) {
// Get encoding information
proto::ColumnEncoding encoding = stripe.getEncoding(columnId);
RleVersion rleVersion = convertRleVersion(encoding.kind());
uint32_t dictSize = encoding.dictionary_size();

// Create the dictionary object
auto dictionary = std::make_shared<StringDictionary>(pool);

// Read LENGTH stream to get dictionary entry lengths
std::unique_ptr<SeekableInputStream> stream =
stripe.getStream(columnId, proto::Stream_Kind_LENGTH, false);
if (dictSize > 0 && stream == nullptr) {
std::stringstream ss;
ss << "LENGTH stream not found in StringDictionaryColumn for column " << columnId;
throw ParseError(ss.str());
}
std::unique_ptr<RleDecoder> lengthDecoder =
createRleDecoder(std::move(stream), false, rleVersion, pool, stripe.getReaderMetrics());

// Decode dictionary entry lengths
dictionary->dictionaryOffset.resize(dictSize + 1);
int64_t* lengthArray = dictionary->dictionaryOffset.data();
lengthDecoder->next(lengthArray + 1, dictSize, nullptr);
lengthArray[0] = 0;

// Convert lengths to cumulative offsets
for (uint32_t i = 1; i < dictSize + 1; ++i) {
if (lengthArray[i] < 0) {
std::stringstream ss;
ss << "Negative dictionary entry length for column " << columnId;
throw ParseError(ss.str());
}
lengthArray[i] += lengthArray[i - 1];
}

int64_t blobSize = lengthArray[dictSize];

// Read DICTIONARY_DATA stream to get dictionary content
dictionary->dictionaryBlob.resize(static_cast<uint64_t>(blobSize));
std::unique_ptr<SeekableInputStream> blobStream =
stripe.getStream(columnId, proto::Stream_Kind_DICTIONARY_DATA, false);
if (blobSize > 0 && blobStream == nullptr) {
std::stringstream ss;
ss << "DICTIONARY_DATA stream not found in StringDictionaryColumn for column " << columnId;
throw ParseError(ss.str());
}

// Read the dictionary blob
readFully(dictionary->dictionaryBlob.data(), blobSize, blobStream.get());

return dictionary;
}

} // namespace orc
57 changes: 57 additions & 0 deletions c++/src/DictionaryLoader.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef ORC_DICTIONARY_LOADER_HH
#define ORC_DICTIONARY_LOADER_HH

#include "ColumnReader.hh"
#include "orc/Vector.hh"

namespace orc {

/**
* Load a string dictionary for a single column from a stripe.
* This function reads the LENGTH and DICTIONARY_DATA streams and populates
* the StringDictionary structure. It automatically uses ReadCache if available
* through the StripeStreams interface.
*
* @param columnId the column ID to load the dictionary for
* @param stripe the StripeStreams interface providing access to streams
* @param pool the memory pool to use for allocating the dictionary
* @return a shared pointer to the loaded StringDictionary, or nullptr if loading fails
*/
std::shared_ptr<StringDictionary> loadStringDictionary(uint64_t columnId, StripeStreams& stripe,
MemoryPool& pool);

// Helper function to convert encoding kind to RLE version
inline RleVersion convertRleVersion(proto::ColumnEncoding_Kind kind) {
switch (static_cast<int64_t>(kind)) {
case proto::ColumnEncoding_Kind_DIRECT:
case proto::ColumnEncoding_Kind_DICTIONARY:
return RleVersion_1;
case proto::ColumnEncoding_Kind_DIRECT_V2:
case proto::ColumnEncoding_Kind_DICTIONARY_V2:
return RleVersion_2;
default:
throw ParseError("Unknown encoding in convertRleVersion");
}
}

} // namespace orc

#endif
12 changes: 12 additions & 0 deletions c++/src/Options.hh
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "io/Cache.hh"

#include <cstdint>
#include <iostream>
#include <limits>

Expand Down Expand Up @@ -156,6 +157,7 @@ namespace orc {
bool throwOnSchemaEvolutionOverflow;
bool enableAsyncPrefetch;
uint64_t smallStripeLookAheadLimit;
uint32_t dictionaryFilteringSizeThreshold;

RowReaderOptionsPrivate() {
selection = ColumnSelection_NONE;
Expand All @@ -169,6 +171,7 @@ namespace orc {
throwOnSchemaEvolutionOverflow = false;
enableAsyncPrefetch = false;
smallStripeLookAheadLimit = 8;
dictionaryFilteringSizeThreshold = 0;
}
};

Expand Down Expand Up @@ -362,6 +365,15 @@ namespace orc {
return privateBits_->smallStripeLookAheadLimit;
}

RowReaderOptions& RowReaderOptions::setDictionaryFilteringSizeThreshold(uint32_t threshold) {
privateBits_->dictionaryFilteringSizeThreshold = threshold;
return *this;
}

uint32_t RowReaderOptions::getDictionaryFilteringSizeThreshold() const {
return privateBits_->dictionaryFilteringSizeThreshold;
}

} // namespace orc

#endif
Loading
Loading