Skip to content

Commit 5ed0be0

Browse files
xiaoxmengfacebook-github-bot
authored andcommitted
feat: Support stripe level batched index read (facebookincubator#16360)
Summary: X-link: facebookincubator/nimble#480 This diff implements stripe-level batched index read support for Nimble files in Velox. The key motivation is to improve index lookup performance by processing multiple lookup requests in batches at the stripe level, rather than processing each request individually. **New `SelectiveNimbleIndexReader` class**: A new format-specific index reader that handles: - Encoding index bounds into Nimble-specific encoded keys - Looking up stripes and row ranges using the tablet index - Managing stripe iteration and data reading with batched processing - Returning results in request order via an iterator pattern (`startLookup`/`hasNext`/`next`) **Batched stripe processing**: Instead of loading stripes per-request, the reader: - Maps all lookup requests to their matching stripes upfront - Merges overlapping row ranges within stripes for efficient reading - Tracks output references with ref-counting to share read data across requests **Optimized row range handling**: - Without filters: Merges overlapping row ranges and each request extracts its portion - With filters: Splits overlapping ranges into non-overlapping segments to preserve filter semantics **HiveIndexReader refactoring**: Simplified to focus on index bounds creation and result assembly, delegating control logic to format-specific readers. **KeyEncoder enhancements**: Added support for encoding index bounds with constant values for more efficient range queries. **New runtime stats**: Added metrics for tracking index lookup performance: - `kNumIndexLookupRequests`: Total lookup requests - `kNumIndexLookupStripes`: Number of stripes accessed - `kNumIndexLookupReadSegments`: Number of read segments processed Differential Revision: D92848948
1 parent 44f99c3 commit 5ed0be0

File tree

8 files changed

+1062
-509
lines changed

8 files changed

+1062
-509
lines changed

velox/connectors/hive/HiveIndexReader.cpp

Lines changed: 185 additions & 345 deletions
Large diffs are not rendered by default.

velox/connectors/hive/HiveIndexReader.h

Lines changed: 30 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,8 @@
2020
#include "velox/connectors/Connector.h"
2121
#include "velox/connectors/hive/FileHandle.h"
2222
#include "velox/core/PlanNode.h"
23-
#include "velox/dwio/common/Options.h"
2423
#include "velox/dwio/common/Reader.h"
25-
#include "velox/vector/DecodedVector.h"
24+
#include "velox/serializers/KeyEncoder.h"
2625

2726
namespace facebook::velox::connector {
2827
class ConnectorQueryCtx;
@@ -35,14 +34,15 @@ class HiveTableHandle;
3534
class HiveColumnHandle;
3635
class HiveConfig;
3736

38-
/// HiveIndexReader is similar to SplitReader but supports index lookup API.
39-
/// It takes a request row vector, converts each row into filters inserted into
40-
/// the scan spec, and reads matching data from the underlying reader. The
41-
/// result includes matched rows and a buffer containing the count of matches
42-
/// for each input request row.
37+
/// HiveIndexReader handles index lookups for Hive tables with cluster indexes.
38+
/// It focuses on:
39+
/// - Creating index bounds from join conditions
40+
/// - Delegating actual index lookups to the format-specific IndexReader
4341
///
44-
/// The HiveIndexReader is designed to be reusable across multiple lookup
45-
/// requests on the same split.
42+
/// The format-specific IndexReader (e.g., SelectiveNimbleIndexReader) handles:
43+
/// - Encoding keys into format-specific representations
44+
/// - Stripe iteration and row range computation
45+
/// - Data reading and output assembly
4646
class HiveIndexReader {
4747
public:
4848
HiveIndexReader(
@@ -54,8 +54,8 @@ class HiveIndexReader {
5454
const std::vector<core::IndexLookupConditionPtr>& joinConditions,
5555
const RowTypePtr& requestType,
5656
const RowTypePtr& outputType,
57-
const std::shared_ptr<io::IoStatistics>& ioStatistics,
58-
const std::shared_ptr<IoStats>& ioStats,
57+
const std::shared_ptr<io::IoStatistics>& ioStats,
58+
const std::shared_ptr<IoStats>& fsStats,
5959
FileHandleFactory* fileHandleFactory,
6060
folly::Executor* ioExecutor);
6161

@@ -65,17 +65,16 @@ class HiveIndexReader {
6565
using Result = IndexSource::Result;
6666

6767
/// Sets the input for index lookup. Each row in 'input' will be converted
68-
/// to filters on key columns and used to query matching rows from the
69-
/// underlying data.
68+
/// to index bounds and passed to the format-specific IndexReader.
7069
///
7170
/// @param request The lookup request containing input row vector with lookup
7271
/// keys.
73-
void setRequest(const Request& request);
72+
void startLookup(const Request& request);
7473

75-
/// Returns true if there are more input rows to process.
74+
/// Returns true if there are more results to fetch from the current lookup.
7675
bool hasNext() const;
7776

78-
/// Reads the next batch of matching rows for the current input rows.
77+
/// Returns the next batch of matching rows for the current input rows.
7978
/// The result from a single request row is never split across multiple
8079
/// calls to next().
8180
///
@@ -87,37 +86,17 @@ class HiveIndexReader {
8786
std::string toString() const;
8887

8988
private:
90-
// Resets filter caches for reuse.
91-
void resetFilterCaches();
92-
9389
// Creates the file reader for reading file metadata and schema.
94-
// NOTE: Called from constructor initializer list, so only accesses members
95-
// declared before fileReader_.
9690
std::unique_ptr<dwio::common::Reader> createFileReader();
9791

98-
// Creates the row reader.
99-
// NOTE: Called from constructor initializer list, so only accesses members
100-
// declared before rowReader_.
101-
std::unique_ptr<dwio::common::RowReader> createRowReader();
102-
103-
// Initializes joinIndexColumnSpecs_ and requestColumnIndices_ from join
104-
// conditions.
105-
void initJoinConditions();
106-
107-
// Converts the input row at the given index to filters on key columns
108-
// and applies them to the scan spec.
109-
void applyFiltersFromRequest(vector_size_t row);
110-
111-
// Clears filters applied to key columns in the scan spec.
112-
void clearKeyFilters();
92+
// Creates the format-specific index reader.
93+
std::unique_ptr<dwio::common::IndexReader> createIndexReader();
11394

114-
// Reads the next batch of rows based on the current filters.
115-
// Returns the number of rows read. The output vector is passed by reference
116-
// and will be populated with the matching rows.
117-
uint64_t readNext(VectorPtr& output);
95+
// Parses join conditions to extract column indices and constant values.
96+
void parseJoinConditions();
11897

119-
// Resets request_ and requestRow_.
120-
void reset();
98+
// Builds IndexBounds from the request row vector.
99+
serializer::IndexBounds buildRequestIndexBounds(const RowVectorPtr& request);
121100

122101
std::shared_ptr<const HiveConnectorSplit> hiveSplit_;
123102
const std::shared_ptr<const HiveTableHandle> tableHandle_;
@@ -134,34 +113,29 @@ class HiveIndexReader {
134113

135114
const std::shared_ptr<common::ScanSpec> scanSpec_;
136115
const std::unique_ptr<dwio::common::Reader> fileReader_;
137-
const std::unique_ptr<dwio::common::RowReader> rowReader_;
116+
const std::unique_ptr<dwio::common::IndexReader> indexReader_;
138117
// Join conditions (including equal conditions converted from join keys).
139118
const std::vector<core::IndexLookupConditionPtr> joinConditions_;
140119

141-
// Cached ScanSpec children for index columns used in join conditions.
142-
std::vector<common::ScanSpec*> joinIndexColumnSpecs_;
143120
// Request column indices for each join condition (for probe side columns).
144121
// For EqualIndexLookupCondition, stores {valueIndex}.
145122
// For BetweenIndexLookupCondition, stores {lowerIndex, upperIndex}.
146123
std::vector<std::vector<column_index_t>> requestColumnIndices_;
147124

148-
// Current request for lookup.
149-
RowVectorPtr request_;
150-
// Current row index in the request being processed.
151-
vector_size_t requestRow_{0};
152-
153-
// Decoded vectors for input columns used in join conditions.
154-
// Indexed by join condition index and then by column index within that
155-
// condition (0 for equal condition value, 0/1 for between condition
156-
// lower/upper).
157-
std::vector<std::vector<DecodedVector>> decodedRequestVectors_;
158-
159125
// For BetweenIndexLookupCondition with constant bounds, stores the constant
160126
// values directly. The outer vector is indexed by join condition index. The
161127
// inner vector has size 2 for between conditions (lower, upper). If a bound
162128
// is a constant, the corresponding optional contains the value; otherwise
163129
// it's std::nullopt and the value should be decoded from request.
164130
std::vector<std::vector<std::optional<variant>>> constantBoundValues_;
131+
132+
// Cached row type for index bounds (column names and types from join
133+
// conditions).
134+
RowTypePtr indexBoundType_;
135+
136+
// Reusable column vectors for building index bounds.
137+
std::vector<VectorPtr> lowerBoundColumns_;
138+
std::vector<VectorPtr> upperBoundColumns_;
165139
};
166140

167141
} // namespace facebook::velox::connector::hive

0 commit comments

Comments
 (0)