Skip to content

Commit 18a98c5

Browse files
xiaoxmengmeta-codesync[bot]
authored andcommitted
feat: Add cluster index support in velox selective reader (#406)
Summary: Pull Request resolved: #406 This diff integrates the cluster index into the Nimble selective reader to enable filter-based row pruning at read time: **NimbleRowReaderOptions**: Added format-specific options to control index-based filtering via `setIndexEnabled()`. **ReaderBase refactoring**: - Changed constructor to static `create()` factory method for proper initialization ordering - Renamed `memoryPool_` to `pool_` for consistency - Added `StripeStreams::streamIndex()` to get stream index for decoders - Added `StripeStreams::enqueueKeyStream()` to load key stream data for IO coalescing with data streams **SelectiveNimbleRowReader index integration**: - Added `convertIndexColumnsToFileSchema()` to convert index column names from nimble schema to file schema (user-facing names) - `initIndexBounds()`: Converts index columns to file schema names, then converts ScanSpec filters to encoded key bounds - `updateStartStripeFromLowerIndexBound()` / `updateEndStripeFromUpperIndexBound()`: Uses TabletIndex lookup to skip entire stripes outside the filter range - `buildIndexReader()`: Creates IndexReader for first/last stripes only (they can be the same) to find exact row positions - `setStripeRowRange()`: Adjusts row range within stripes based on key bounds, then calls `columnReader_->seekTo()` to skip to the starting position **NimbleData updates**: - Changed `streams_` and `memoryPool_` from references to pointers for consistency - Updated decoder creation to pass stream index for index-aware decoding - Changed `VELOX_UNREACHABLE()` to `NIMBLE_UNSUPPORTED()` for unsupported operations **Random Skip Handling with Cluster Index: When random sampling (random skip) is enabled along with cluster index filtering, the random skip tracker must be updated to account for rows that are skipped due to index bounds (not just rows that are actually read). This ensures that random sampling produces consistent results whether or not index filtering is enabled. The implementation handles three scenarios: - Leading stripes filtered out: When updateStartStripeFromLowerIndexBound() skips entire stripes at the beginning due to the lower bound, maybeUpdateRandomSkip() is called immediately with the total row count of those filtered stripes. - Trailing stripes filtered out: When updateEndStripeFromUpperIndexBound() skips entire stripes at the end due to the upper bound, the row counts are accumulated into trailingSkippedRows_. Partial stripe rows filtered: When setStripeRowRange() adjusts the row range within stripes (rows skipped at the start of the first stripe due to lower bound, or rows skipped at the end of the last stripe due to upper bound), the skipped row counts are handled: leading rows trigger immediate maybeUpdateRandomSkip() call, while trailing rows are accumulated into trailingSkippedRows_. - Deferred update for trailing rows: All trailing skipped rows (from both entire stripes and partial last stripe) are updated together via maybeUpdateRandomSkip(trailingSkippedRows_) in nextRowNumber() when reaching kAtEnd. Added randomSkipWithIndex test case and integrated random skip testing into the randomSchemaAndFilters fuzzer test. **E2EIndexTest**: Comprehensive test suite covering: - Single-column keys: bigint, double, float, timestamp, boolean, varchar - Multi-column composite keys with various filter combinations (point + point, point + range, range + point, etc.) - Fuzzer testing with randomly generated table and index schema and randomly generated index column filter for conversion and non-index column filter combinations. - Verification that index-enabled and index-disabled reads produce identical results Reviewed By: Yuhta Differential Revision: D90074133 fbshipit-source-id: b64323dc51303e3c5ab968e11d73f166e0a2f592
1 parent b3b68ad commit 18a98c5

File tree

8 files changed

+3554
-70
lines changed

8 files changed

+3554
-70
lines changed

dwio/nimble/velox/selective/NimbleData.cpp

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ NimbleData::NimbleData(
3232
std::unique_ptr<Encoding>(velox::memory::MemoryPool&, std::string_view)>
3333
encodingFactory)
3434
: nimbleType_(nimbleType),
35-
streams_(streams),
36-
memoryPool_(memoryPool),
35+
streams_(&streams),
36+
pool_(&memoryPool),
3737
inMapDecoder_(inMapDecoder),
3838
encodingFactory_{encodingFactory} {
3939
switch (nimbleType->kind()) {
@@ -97,13 +97,13 @@ void NimbleData::readNulls(
9797
}
9898
auto numBytes = velox::bits::nbytes(numValues);
9999
if (!nulls || nulls->capacity() < numBytes) {
100-
nulls = AlignedBuffer::allocate<char>(numBytes, &memoryPool_);
100+
nulls = AlignedBuffer::allocate<char>(numBytes, pool_);
101101
}
102102
nulls->setSize(numBytes);
103103
auto* nullsPtr = nulls->asMutable<uint64_t>();
104104
if (inMapDecoder_) {
105105
if (nullsDecoder_) {
106-
dwio::common::ensureCapacity<char>(inMap_, numBytes, &memoryPool_);
106+
dwio::common::ensureCapacity<char>(inMap_, numBytes, pool_);
107107
inMapDecoder_->nextBools(
108108
inMap_->asMutable<uint64_t>(), numValues, incomingNulls);
109109
nullsDecoder_->nextBools(nullsPtr, numValues, inMap_->as<uint64_t>());
@@ -155,31 +155,34 @@ uint64_t NimbleData::skipNulls(uint64_t numValues, bool /*nullsOnly*/) {
155155
}
156156

157157
ChunkedDecoder NimbleData::makeScalarDecoder() {
158+
const auto streamId = nimbleType_->asScalar().scalarDescriptor().offset();
158159
return ChunkedDecoder(
159-
streams_.enqueue(nimbleType_->asScalar().scalarDescriptor().offset()),
160+
streams_->enqueue(streamId),
160161
/*decodeValuesWithNulls=*/false,
161-
/*streamIndex=*/nullptr,
162-
&memoryPool_);
162+
streams_->streamIndex(streamId),
163+
pool_);
163164
}
164165

165166
ChunkedDecoder NimbleData::makeMicrosDecoder() {
166167
VELOX_CHECK(nimbleType_->isTimestampMicroNano());
168+
const auto streamId =
169+
nimbleType_->asTimestampMicroNano().microsDescriptor().offset();
167170
return ChunkedDecoder(
168-
streams_.enqueue(
169-
nimbleType_->asTimestampMicroNano().microsDescriptor().offset()),
171+
streams_->enqueue(streamId),
170172
/*decodeValuesWithNulls=*/false,
171-
/*streamIndex=*/nullptr,
172-
&memoryPool_);
173+
streams_->streamIndex(streamId),
174+
pool_);
173175
}
174176

175177
ChunkedDecoder NimbleData::makeNanosDecoder() {
176178
VELOX_CHECK(nimbleType_->isTimestampMicroNano());
179+
const auto streamId =
180+
nimbleType_->asTimestampMicroNano().nanosDescriptor().offset();
177181
return ChunkedDecoder(
178-
streams_.enqueue(
179-
nimbleType_->asTimestampMicroNano().nanosDescriptor().offset()),
182+
streams_->enqueue(streamId),
180183
/*decodeValuesWithNulls=*/false,
181-
/*streamIndex=*/nullptr,
182-
&memoryPool_);
184+
streams_->streamIndex(streamId),
185+
pool_);
183186
}
184187

185188
std::unique_ptr<ChunkedDecoder> NimbleData::makeLengthDecoder() {
@@ -199,16 +202,15 @@ std::unique_ptr<ChunkedDecoder> NimbleData::makeLengthDecoder() {
199202
std::unique_ptr<ChunkedDecoder> NimbleData::makeDecoder(
200203
const StreamDescriptor& descriptor,
201204
bool decodeValuesWithNulls) {
202-
auto input = streams_.enqueue(descriptor.offset());
205+
auto input = streams_->enqueue(descriptor.offset());
203206
if (!input) {
204207
return nullptr;
205208
}
206209
return std::make_unique<ChunkedDecoder>(
207210
std::move(input),
208211
decodeValuesWithNulls,
209-
/*streamIndex=*/nullptr,
210-
&memoryPool_,
211-
encodingFactory_);
212+
streams_->streamIndex(descriptor.offset()),
213+
pool_);
212214
}
213215

214216
std::unique_ptr<velox::dwio::common::FormatData> NimbleParams::toFormatData(

dwio/nimble/velox/selective/NimbleData.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,15 @@ class NimbleData : public velox::dwio::common::FormatData {
5656

5757
velox::dwio::common::PositionProvider seekToRowGroup(
5858
int64_t /*index*/) final {
59-
VELOX_UNREACHABLE();
59+
NIMBLE_UNSUPPORTED();
6060
}
6161

6262
void filterRowGroups(
6363
const velox::common::ScanSpec& /*scanSpec*/,
6464
uint64_t /*rowsPerRowGroup*/,
6565
const velox::dwio::common::StatsContext& /*writerContext*/,
6666
FilterRowGroupsResult& /*result*/) final {
67-
VELOX_UNREACHABLE();
67+
NIMBLE_UNSUPPORTED();
6868
}
6969

7070
const Type& nimbleType() const {
@@ -103,8 +103,8 @@ class NimbleData : public velox::dwio::common::FormatData {
103103
bool decodeValuesWithNulls);
104104

105105
const std::shared_ptr<const Type> nimbleType_;
106-
StripeStreams& streams_;
107-
velox::memory::MemoryPool& memoryPool_;
106+
StripeStreams* const streams_;
107+
velox::memory::MemoryPool* const pool_;
108108
ChunkedDecoder* const inMapDecoder_;
109109
std::unique_ptr<ChunkedDecoder> nullsDecoder_;
110110
velox::BufferPtr inMap_;
@@ -169,7 +169,7 @@ class NimbleParams : public velox::dwio::common::FormatParams {
169169

170170
private:
171171
const std::shared_ptr<const Type> nimbleType_;
172-
StripeStreams* const streams_;
172+
StripeStreams* const streams_{nullptr};
173173
RowSizeTracker* const rowSizeTracker_{nullptr};
174174
const bool preserveFlatMapsInMemory_{false};
175175
ChunkedDecoder* inMapDecoder_{nullptr};
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright (c) Meta Platforms, Inc. and affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#pragma once
18+
19+
#include "velox/dwio/common/Options.h"
20+
21+
namespace facebook::nimble {
22+
23+
/// Format-specific options for Nimble row readers.
24+
class NimbleRowReaderOptions
25+
: public velox::dwio::common::FormatSpecificOptions {
26+
public:
27+
NimbleRowReaderOptions() = default;
28+
29+
/// Sets whether to use the cluster index for filter-based row pruning.
30+
/// When enabled (default), filters from ScanSpec are converted to index
31+
/// bounds for efficient row skipping based on the file's cluster index.
32+
/// When disabled, all rows are scanned without index-based pruning.
33+
void setIndexEnabled(bool enabled) {
34+
indexEnabled_ = enabled;
35+
}
36+
37+
/// Returns true if cluster index-based filtering is enabled.
38+
bool indexEnabled() const {
39+
return indexEnabled_;
40+
}
41+
42+
private:
43+
bool indexEnabled_{true};
44+
};
45+
46+
} // namespace facebook::nimble

dwio/nimble/velox/selective/ReaderBase.cpp

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include "dwio/nimble/velox/selective/ReaderBase.h"
1818

19+
#include "dwio/nimble/index/IndexConstants.h"
1920
#include "dwio/nimble/tablet/Constants.h"
2021
#include "dwio/nimble/velox/SchemaSerialization.h"
2122
#include "dwio/nimble/velox/SchemaUtils.h"
@@ -26,7 +27,6 @@ namespace facebook::nimble {
2627
using namespace facebook::velox;
2728

2829
namespace {
29-
3030
const std::string kSchemaSectionString(kSchemaSection);
3131
const std::vector<std::string> kPreloadOptionalSections = {
3232
kSchemaSectionString};
@@ -47,25 +47,50 @@ TypePtr getFileSchema(
4747
return dwio::common::Reader::updateColumnNames(
4848
fileSchema, options.fileSchema());
4949
}
50-
5150
} // namespace
5251

52+
std::shared_ptr<ReaderBase> ReaderBase::create(
53+
std::unique_ptr<velox::dwio::common::BufferedInput> input,
54+
const velox::dwio::common::ReaderOptions& options) {
55+
// Initialize all members
56+
auto tablet = TabletReader::create(
57+
// TODO: Make TabletReader taking BufferedInput.
58+
input->getReadFile().get(),
59+
options.memoryPool(),
60+
kPreloadOptionalSections);
61+
62+
auto* pool = &options.memoryPool();
63+
const auto& randomSkip = options.randomSkip();
64+
const auto& scanSpec = options.scanSpec();
65+
const auto nimbleSchema = loadSchema(*tablet);
66+
auto fileSchema =
67+
asRowType(getFileSchema(options, convertToVeloxType(*nimbleSchema)));
68+
69+
return std::shared_ptr<ReaderBase>(new ReaderBase(
70+
std::move(input),
71+
std::move(tablet),
72+
pool,
73+
randomSkip,
74+
scanSpec,
75+
nimbleSchema,
76+
std::move(fileSchema)));
77+
}
78+
5379
ReaderBase::ReaderBase(
54-
std::unique_ptr<dwio::common::BufferedInput> input,
55-
const dwio::common::ReaderOptions& options)
56-
: input_(std::move(input)),
57-
tablet_(
58-
TabletReader::create(
59-
// TODO: Make TabletReader taking BufferedInput.
60-
input_->getReadFile().get(),
61-
options.memoryPool(),
62-
kPreloadOptionalSections)),
63-
memoryPool_(&options.memoryPool()),
64-
randomSkip_(options.randomSkip()),
65-
scanSpec_(options.scanSpec()),
66-
nimbleSchema_(loadSchema(*tablet_)),
67-
fileSchema_(asRowType(
68-
getFileSchema(options, convertToVeloxType(*nimbleSchema_)))) {}
80+
std::unique_ptr<velox::dwio::common::BufferedInput> input,
81+
std::shared_ptr<TabletReader> tablet,
82+
velox::memory::MemoryPool* pool,
83+
const std::shared_ptr<velox::random::RandomSkipTracker>& randomSkip,
84+
const std::shared_ptr<velox::common::ScanSpec>& scanSpec,
85+
std::shared_ptr<const Type> nimbleSchema,
86+
velox::RowTypePtr fileSchema)
87+
: input_{std::move(input)},
88+
tablet_{std::move(tablet)},
89+
pool_{pool},
90+
randomSkip_{randomSkip},
91+
scanSpec_{scanSpec},
92+
nimbleSchema_{std::move(nimbleSchema)},
93+
fileSchema_{std::move(fileSchema)} {}
6994

7095
std::optional<common::Region> StripeStreams::streamRegion(int streamId) const {
7196
NIMBLE_CHECK(stripeIdentifier_.has_value());
@@ -94,4 +119,27 @@ std::unique_ptr<dwio::common::SeekableInputStream> StripeStreams::enqueue(
94119
return readerBase_->input().enqueue(*region, &sid);
95120
}
96121

122+
std::shared_ptr<index::StreamIndex> StripeStreams::streamIndex(
123+
int streamId) const {
124+
NIMBLE_CHECK(stripeIdentifier_.has_value());
125+
126+
const auto& indexGroup = stripeIdentifier_->indexGroup();
127+
if (indexGroup == nullptr) {
128+
return nullptr;
129+
}
130+
return indexGroup->createStreamIndex(stripe_, streamId);
131+
}
132+
133+
std::unique_ptr<velox::dwio::common::SeekableInputStream>
134+
StripeStreams::enqueueKeyStream() {
135+
NIMBLE_CHECK(stripeIdentifier_.has_value());
136+
137+
const auto& indexGroup = stripeIdentifier_->indexGroup();
138+
NIMBLE_CHECK_NOT_NULL(indexGroup);
139+
140+
const auto region = indexGroup->keyStreamRegion(stripe_);
141+
const dwio::common::StreamIdentifier sid(kKeyStreamId);
142+
return readerBase_->input().enqueue(region, &sid);
143+
}
144+
97145
} // namespace facebook::nimble

dwio/nimble/velox/selective/ReaderBase.h

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include "dwio/nimble/index/StripeIndexGroup.h"
1920
#include "dwio/nimble/tablet/TabletReader.h"
2021
#include "dwio/nimble/velox/SchemaReader.h"
2122
#include "velox/dwio/common/BufferedInput.h"
@@ -25,7 +26,7 @@ namespace facebook::nimble {
2526

2627
class ReaderBase {
2728
public:
28-
ReaderBase(
29+
static std::shared_ptr<ReaderBase> create(
2930
std::unique_ptr<velox::dwio::common::BufferedInput> input,
3031
const velox::dwio::common::ReaderOptions& options);
3132

@@ -37,8 +38,8 @@ class ReaderBase {
3738
return *tablet_;
3839
}
3940

40-
velox::memory::MemoryPool* memoryPool() const {
41-
return memoryPool_;
41+
velox::memory::MemoryPool* pool() const {
42+
return pool_;
4243
}
4344

4445
const std::shared_ptr<velox::random::RandomSkipTracker>& randomSkip() const {
@@ -64,9 +65,18 @@ class ReaderBase {
6465
}
6566

6667
private:
68+
ReaderBase(
69+
std::unique_ptr<velox::dwio::common::BufferedInput> input,
70+
std::shared_ptr<TabletReader> tablet,
71+
velox::memory::MemoryPool* pool,
72+
const std::shared_ptr<velox::random::RandomSkipTracker>& randomSkip,
73+
const std::shared_ptr<velox::common::ScanSpec>& scanSpec,
74+
std::shared_ptr<const Type> nimbleSchema,
75+
velox::RowTypePtr fileSchema);
76+
6777
const std::unique_ptr<velox::dwio::common::BufferedInput> input_;
6878
const std::shared_ptr<TabletReader> tablet_;
69-
velox::memory::MemoryPool* const memoryPool_;
79+
velox::memory::MemoryPool* const pool_;
7080
const std::shared_ptr<velox::random::RandomSkipTracker> randomSkip_;
7181
const std::shared_ptr<velox::common::ScanSpec> scanSpec_;
7282
const std::shared_ptr<const Type> nimbleSchema_;
@@ -80,9 +90,15 @@ class StripeStreams {
8090
explicit StripeStreams(const std::shared_ptr<ReaderBase>& readerBase)
8191
: readerBase_(readerBase) {}
8292

83-
void setStripe(int stripe) {
93+
void setStripe(int stripe, bool loadIndex = false) {
8494
stripe_ = stripe;
85-
stripeIdentifier_ = readerBase_->tablet().stripeIdentifier(stripe_);
95+
stripeIdentifier_ =
96+
readerBase_->tablet().stripeIdentifier(stripe_, loadIndex);
97+
if (loadIndex) {
98+
NIMBLE_CHECK_NOT_NULL(
99+
stripeIdentifier_->indexGroup(),
100+
"Index group should be set when loadIndex is true");
101+
}
86102
}
87103

88104
bool hasStream(int streamId) const {
@@ -92,10 +108,22 @@ class StripeStreams {
92108
std::unique_ptr<velox::dwio::common::SeekableInputStream> enqueue(
93109
int streamId);
94110

111+
std::unique_ptr<velox::dwio::common::SeekableInputStream> enqueueKeyStream();
112+
95113
void load() {
96114
readerBase_->input().load(velox::dwio::common::LogType::STREAM_BUNDLE);
97115
}
98116

117+
int32_t stripeIndex() const {
118+
return stripe_;
119+
}
120+
121+
const std::shared_ptr<StripeIndexGroup>& indexGroup() const {
122+
return stripeIdentifier_->indexGroup();
123+
}
124+
125+
std::shared_ptr<index::StreamIndex> streamIndex(int streamId) const;
126+
99127
private:
100128
std::optional<velox::common::Region> streamRegion(int streamId) const;
101129

0 commit comments

Comments
 (0)