Skip to content

Commit df267fe

Browse files
xiaoxmengmeta-codesync[bot]
authored andcommitted
feat: Resubmit add max scan rows limit for index lookup (facebookincubator#16400)
Summary: Pull Request resolved: facebookincubator#16400 X-link: facebookincubator/nimble#492 Original commit changeset: 5846b9ddaf6c Original Phabricator Diff: D93313029 Reviewed By: HuamengJiang Differential Revision: D93348953 fbshipit-source-id: a70b687917429542b3564993834cb1e61518eb7f
1 parent 7e43184 commit df267fe

File tree

8 files changed

+61
-9
lines changed

8 files changed

+61
-9
lines changed

velox/connectors/hive/HiveConfig.cpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,13 @@ bool HiveConfig::preserveFlatMapsInMemory(
259259
config_->get<bool>(kPreserveFlatMapsInMemory, false));
260260
}
261261

262+
uint32_t HiveConfig::maxRowsPerIndexRequest(
263+
const config::ConfigBase* session) const {
264+
return session->get<uint32_t>(
265+
kMaxRowsPerIndexRequestSession,
266+
config_->get<uint32_t>(kMaxRowsPerIndexRequest, 0));
267+
}
268+
262269
std::string HiveConfig::user(const config::ConfigBase* session) const {
263270
return session->get<std::string>(kUser, config_->get<std::string>(kUser, ""));
264271
}

velox/connectors/hive/HiveConfig.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,14 @@ class HiveConfig {
208208
static constexpr const char* kPreserveFlatMapsInMemorySession =
209209
"hive.preserve_flat_maps_in_memory";
210210

211+
/// Maximum number of output rows to return per index lookup request.
212+
/// The limit is applied to the actual output rows after filtering.
213+
/// 0 means no limit (default).
214+
static constexpr const char* kMaxRowsPerIndexRequest =
215+
"hive.max-rows-per-index-request";
216+
static constexpr const char* kMaxRowsPerIndexRequestSession =
217+
"hive.max_rows_per_index_request";
218+
211219
static constexpr const char* kUser = "user";
212220
static constexpr const char* kSource = "source";
213221
static constexpr const char* kSchema = "schema";
@@ -293,6 +301,10 @@ class HiveConfig {
293301
/// converting them to MapVectors.
294302
bool preserveFlatMapsInMemory(const config::ConfigBase* session) const;
295303

304+
/// Returns the maximum number of rows to read per index lookup request.
305+
/// 0 means no limit (default).
306+
uint32_t maxRowsPerIndexRequest(const config::ConfigBase* session) const;
307+
296308
/// User of the query. Used for storage logging.
297309
std::string user(const config::ConfigBase* session) const;
298310

velox/connectors/hive/HiveIndexReader.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,9 @@ HiveIndexReader::createIndexReader() {
302302
return fileReader_->createIndexReader(rowReaderOpts);
303303
}
304304

305-
void HiveIndexReader::startLookup(const Request& request) {
305+
void HiveIndexReader::startLookup(
306+
const Request& request,
307+
const Options& options) {
306308
VELOX_CHECK(
307309
!indexReader_->hasNext(),
308310
"Previous request not finished. Call next() first.");
@@ -311,7 +313,7 @@ void HiveIndexReader::startLookup(const Request& request) {
311313

312314
// Build index bounds from request and pass to the index reader.
313315
auto indexBounds = buildRequestIndexBounds(request.input);
314-
indexReader_->startLookup(indexBounds);
316+
indexReader_->startLookup(indexBounds, options);
315317
}
316318

317319
serializer::IndexBounds HiveIndexReader::buildRequestIndexBounds(

velox/connectors/hive/HiveIndexReader.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,13 +63,16 @@ class HiveIndexReader {
6363

6464
using Request = IndexSource::Request;
6565
using Result = IndexSource::Result;
66+
using Options = dwio::common::IndexReader::Options;
6667

6768
/// Sets the input for index lookup. Each row in 'input' will be converted
6869
/// to index bounds and passed to the format-specific IndexReader.
6970
///
7071
/// @param request The lookup request containing input row vector with lookup
7172
/// keys.
72-
void startLookup(const Request& request);
73+
/// @param options Options controlling index reader behavior (e.g.,
74+
/// maxRowsPerRequest). Defaults to no limit.
75+
void startLookup(const Request& request, const Options& options = {});
7376

7477
/// Returns true if there are more results to fetch from the current lookup.
7578
bool hasNext() const;

velox/connectors/hive/HiveIndexSource.cpp

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,10 +227,12 @@ class HiveLookupIterator : public IndexSource::ResultIterator {
227227
HiveLookupIterator(
228228
std::shared_ptr<HiveIndexSource> indexSource,
229229
HiveIndexReader* indexReader,
230-
IndexSource::Request request)
230+
IndexSource::Request request,
231+
vector_size_t maxRowsPerRequest)
231232
: indexSource_(std::move(indexSource)),
232233
indexReader_(indexReader),
233-
request_(std::move(request)) {}
234+
request_(std::move(request)),
235+
maxRowsPerRequest_(maxRowsPerRequest) {}
234236

235237
~HiveLookupIterator() override = default;
236238

@@ -247,7 +249,8 @@ class HiveLookupIterator : public IndexSource::ResultIterator {
247249

248250
// Set the request on first call.
249251
if (state_ == State::kInit) {
250-
indexReader_->startLookup(request_);
252+
indexReader_->startLookup(
253+
request_, {.maxRowsPerRequest = maxRowsPerRequest_});
251254
setState(State::kRead);
252255
}
253256

@@ -360,11 +363,11 @@ class HiveLookupIterator : public IndexSource::ResultIterator {
360363
emptyResult_->inputHits, emptyResult_->output);
361364
}
362365

363-
// Holds the index source to ensure HiveIndexReader lifetime.
364366
const std::shared_ptr<HiveIndexSource> indexSource_;
365367
// Raw pointer to index reader for lookup operations.
366368
HiveIndexReader* const indexReader_;
367369
const IndexSource::Request request_;
370+
const vector_size_t maxRowsPerRequest_;
368371

369372
State state_{State::kInit};
370373
// Cached empty result for reuse when no rows pass the remaining filter.
@@ -386,6 +389,8 @@ HiveIndexSource::HiveIndexSource(
386389
hiveConfig_(hiveConfig),
387390
pool_(connectorQueryCtx->memoryPool()),
388391
expressionEvaluator_(connectorQueryCtx->expressionEvaluator()),
392+
maxRowsPerIndexRequest_(hiveConfig_->maxRowsPerIndexRequest(
393+
connectorQueryCtx_->sessionProperties())),
389394
tableHandle_(std::move(tableHandle)),
390395
requestType_(requestType),
391396
outputType_(outputType),
@@ -653,7 +658,7 @@ std::shared_ptr<IndexSource::ResultIterator> HiveIndexSource::lookup(
653658
const Request& request) {
654659
VELOX_CHECK_NOT_NULL(indexReader_, "No index reader available for lookup");
655660
return std::make_shared<HiveLookupIterator>(
656-
shared_from_this(), indexReader_.get(), request);
661+
shared_from_this(), indexReader_.get(), request, maxRowsPerIndexRequest_);
657662
}
658663

659664
std::unordered_map<std::string, RuntimeMetric> HiveIndexSource::runtimeStats() {

velox/connectors/hive/HiveIndexSource.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ class HiveIndexSource : public IndexSource,
6868
return pool_;
6969
}
7070

71+
uint32_t maxRowsPerIndexRequest() const {
72+
return maxRowsPerIndexRequest_;
73+
}
74+
7175
const RowTypePtr& outputType() const {
7276
return outputType_;
7377
}
@@ -121,6 +125,7 @@ class HiveIndexSource : public IndexSource,
121125
const std::shared_ptr<HiveConfig> hiveConfig_;
122126
memory::MemoryPool* const pool_;
123127
core::ExpressionEvaluator* const expressionEvaluator_;
128+
const uint32_t maxRowsPerIndexRequest_;
124129

125130
const HiveTableHandlePtr tableHandle_;
126131
const RowTypePtr requestType_;

velox/docs/configs.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -791,6 +791,12 @@ Each query can override the config by setting corresponding query session proper
791791
- bool
792792
- false
793793
- Whether to preserve flat maps in memory as FlatMapVectors instead of converting them to MapVectors. This is only applied during data reading inside the DWRF and Nimble readers, not during downstream processing like expression evaluation etc.
794+
* - hive.max-rows-per-index-request
795+
- hive.max_rows_per_index_request
796+
- integer
797+
- 0
798+
- Maximum number of output rows to return per index lookup request. The limit is applied to the actual output rows
799+
after filtering. 0 means no limit (default).
794800

795801
``ORC File Format Configuration``
796802
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

velox/dwio/common/Reader.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,16 +229,28 @@ class IndexReader {
229229

230230
virtual ~IndexReader() = default;
231231

232+
/// Options for controlling index reader behavior.
233+
struct Options {
234+
/// Maximum number of rows to read per index lookup request.
235+
/// When set to non-zero, the index reader will stop fetching or truncate
236+
/// stripes once the total row range (before filtering) reaches this limit.
237+
/// 0 means no limit (default).
238+
vector_size_t maxRowsPerRequest{0};
239+
};
240+
232241
/// Starts a new batch lookup with the given index bounds.
233242
/// Each index bound in the vector represents a separate lookup request.
234243
/// After calling startLookup(), call next() repeatedly to get results.
235244
///
236245
/// @param indexBounds Index bounds for the lookup request. Contains
237246
/// column names and lower/upper bound values.
247+
/// @param options Options controlling index reader behavior (e.g.,
248+
/// maxRowsPerRequest). Defaults to no limit.
238249
/// @throws if lookup is not supported by the implementation or if any
239250
/// index bound is invalid.
240251
virtual void startLookup(
241-
const velox::serializer::IndexBounds& indexBounds) = 0;
252+
const velox::serializer::IndexBounds& indexBounds,
253+
const Options& options) = 0;
242254

243255
/// Returns true if there are more results to fetch from the current lookup.
244256
virtual bool hasNext() const = 0;

0 commit comments

Comments
 (0)