Skip to content

Commit 0520373

Browse files
visualYJDrock-git
authored andcommitted
[feat][store] Impl document scan filter
1 parent b111fc2 commit 0520373

17 files changed

+351
-83
lines changed

src/document/document_index.cc

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ butil::Status DocumentIndex::Delete(const std::vector<int64_t>& delete_ids) {
364364
}
365365

366366
butil::Status DocumentIndex::Search(uint32_t topk, const std::string& query_string, bool use_range_filter,
367-
int64_t start_id, int64_t end_id, bool use_id_filter,
367+
int64_t start_id, int64_t end_id, bool use_id_filter, bool query_unlimited,
368368
const std::vector<uint64_t>& alive_ids,
369369
const std::vector<std::string>& column_names,
370370
std::vector<pb::common::DocumentWithScore>& results) {
@@ -376,7 +376,7 @@ butil::Status DocumentIndex::Search(uint32_t topk, const std::string& query_stri
376376
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, err_msg);
377377
}
378378

379-
if (topk == 0) {
379+
if (!query_unlimited && topk == 0) {
380380
return butil::Status(pb::error::EILLEGAL_PARAMTETERS, "topk must be greater than 0");
381381
}
382382

@@ -393,8 +393,9 @@ butil::Status DocumentIndex::Search(uint32_t topk, const std::string& query_stri
393393
// }
394394
// }
395395

396-
auto search_result = ffi_bm25_search_with_column_names(index_path_, query_string, topk, alive_ids, use_id_filter,
397-
use_range_filter, start_id, end_id, column_names);
396+
auto search_result =
397+
ffi_bm25_search_with_column_names(index_path_, query_string, topk, alive_ids, use_id_filter, use_range_filter,
398+
start_id, end_id, column_names, query_unlimited);
398399

399400
if (search_result.error_code == 0) {
400401
for (const auto& row_id_with_score : search_result.result) {
@@ -1096,15 +1097,16 @@ butil::Status DocumentIndexWrapper::Search(const pb::common::Range& region_range
10961097
if (sibling_document_index != nullptr) {
10971098
DINGO_LOG(INFO) << fmt::format("[document_index.wrapper][id({})] search document in sibling document index.", Id());
10981099
std::vector<pb::common::DocumentWithScore> results_1;
1099-
auto status = sibling_document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX,
1100-
use_id_filter, alive_ids, column_names, results_1);
1100+
auto status =
1101+
sibling_document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, use_id_filter,
1102+
parameter.query_unlimited(), alive_ids, column_names, results_1);
11011103
if (!status.ok()) {
11021104
return status;
11031105
}
11041106

11051107
std::vector<pb::common::DocumentWithScore> results_2;
11061108
status = document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, use_id_filter,
1107-
alive_ids, column_names, results_2);
1109+
parameter.query_unlimited(), alive_ids, column_names, results_2);
11081110
if (!status.ok()) {
11091111
return status;
11101112
}
@@ -1120,21 +1122,23 @@ butil::Status DocumentIndexWrapper::Search(const pb::common::Range& region_range
11201122

11211123
DINGO_LOG(INFO) << fmt::format(
11221124
"[document_index.wrapper][id({})] search document in document index with range_filter, range({}) "
1123-
"query_string({}) top_n({}) min_document_id({}) max_document_id({})",
1125+
"query_string({}) top_n({}, query_unlimited({})) min_document_id({}) max_document_id({})",
11241126
Id(), DocumentCodec::DebugRange(false, region_range), parameter.query_string(), parameter.top_n(),
1125-
min_document_id, max_document_id);
1127+
parameter.query_unlimited(), min_document_id, max_document_id);
11261128

11271129
// use range filter
11281130
return document_index->Search(parameter.top_n(), parameter.query_string(), true, min_document_id, max_document_id,
1129-
use_id_filter, alive_ids, column_names, results);
1131+
use_id_filter, parameter.query_unlimited(), alive_ids, column_names, results);
11301132
}
11311133

11321134
DINGO_LOG(INFO) << fmt::format(
1133-
"[document_index.wrapper][id({})] search document in document index, range({}) query_string({}) top_n({})", Id(),
1134-
DocumentCodec::DebugRange(false, region_range), parameter.query_string(), parameter.top_n());
1135+
"[document_index.wrapper][id({})] search document in document index, range({}) query_string({}) top_n({}), "
1136+
"query_unlimited({})",
1137+
Id(), DocumentCodec::DebugRange(false, region_range), parameter.query_string(), parameter.top_n(),
1138+
parameter.query_unlimited());
11351139

11361140
return document_index->Search(parameter.top_n(), parameter.query_string(), false, 0, INT64_MAX, use_id_filter,
1137-
alive_ids, column_names, results);
1141+
parameter.query_unlimited(), alive_ids, column_names, results);
11381142
}
11391143

11401144
// For document index, all node need to hold the index, so this function always return true.

src/document/document_index.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ class DocumentIndex {
6767
butil::Status Load(const std::string& path);
6868

6969
butil::Status Search(uint32_t topk, const std::string& query_string, bool use_range_filter, int64_t start_id,
70-
int64_t end_id, bool use_id_filter, const std::vector<uint64_t>& alive_ids,
70+
int64_t end_id, bool use_id_filter, bool query_unlimited,
71+
const std::vector<uint64_t>& alive_ids,
7172
const std::vector<std::string>& column_names,
7273
std::vector<pb::common::DocumentWithScore>& results);
7374

src/document/document_reader.cc

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232

3333
namespace dingodb {
3434

35+
DECLARE_int64(stream_message_max_limit_size);
36+
3537
butil::Status DocumentReader::QueryDocumentWithId(int64_t ts, const pb::common::Range& region_range,
3638
int64_t partition_id, int64_t document_id, bool with_scalar_data,
3739
bool with_table_data, std::vector<std::string>& selected_scalar_keys,
@@ -129,6 +131,28 @@ butil::Status DocumentReader::DocumentSearch(std::shared_ptr<Engine::DocumentRea
129131
return butil::Status();
130132
}
131133

134+
butil::Status DocumentReader::DocumentSearchAll(std::shared_ptr<Engine::DocumentReader::Context> ctx, bool& has_more,
135+
std::vector<pb::common::DocumentWithScore>& results) {
136+
auto status = butil::Status();
137+
auto stream = ctx->stream;
138+
auto stream_state =
139+
std::dynamic_pointer_cast<DocumentSearchAllStreamState>(stream->GetOrNewStreamState([&]() -> StreamStatePtr {
140+
std::vector<pb::common::DocumentWithScore> all_results;
141+
status = SearchDocument(ctx->ts, ctx->partition_id, ctx->document_index, ctx->region_range, ctx->parameter,
142+
all_results);
143+
if (!status.ok()) {
144+
return nullptr;
145+
}
146+
return DocumentSearchAllStreamState::New(std::move(all_results));
147+
}));
148+
if (!status.ok()) {
149+
DINGO_LOG(ERROR) << "Document search all failed: " << Helper::PrintStatus(status);
150+
return status;
151+
}
152+
has_more = stream_state->Batch(stream->Limit(), results);
153+
return status;
154+
}
155+
132156
butil::Status DocumentReader::DocumentBatchQuery(std::shared_ptr<Engine::DocumentReader::Context> ctx,
133157
std::vector<pb::common::DocumentWithId>& document_with_ids) {
134158
for (auto document_id : ctx->document_ids) {
@@ -355,4 +379,19 @@ butil::Status DocumentReader::ScanDocumentId(std::shared_ptr<Engine::DocumentRea
355379
return butil::Status::OK();
356380
}
357381

382+
bool DocumentSearchAllStreamState::Batch(int32_t limit, std::vector<pb::common::DocumentWithScore>& results) {
383+
results.reserve(limit);
384+
size_t count = 0;
385+
size_t total_bytes = 0;
386+
bool has_more;
387+
while (current_ != end_ && count < limit && total_bytes < FLAGS_stream_message_max_limit_size) {
388+
results.push_back(*current_);
389+
total_bytes += current_->ByteSizeLong();
390+
++current_;
391+
++count;
392+
}
393+
has_more = (current_ != end_);
394+
return has_more;
395+
}
396+
358397
} // namespace dingodb

src/document/document_reader.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <vector>
2323

2424
#include "butil/status.h"
25+
#include "common/stream.h"
2526
#include "engine/engine.h"
2627
#include "engine/raw_engine.h"
2728
#include "mvcc/reader.h"
@@ -41,6 +42,8 @@ class DocumentReader {
4142

4243
butil::Status DocumentSearch(std::shared_ptr<Engine::DocumentReader::Context> ctx,
4344
std::vector<pb::common::DocumentWithScore>& results);
45+
butil::Status DocumentSearchAll(std::shared_ptr<Engine::DocumentReader::Context> ctx, bool& has_more,
46+
std::vector<pb::common::DocumentWithScore>& results);
4447

4548
butil::Status DocumentBatchQuery(std::shared_ptr<Engine::DocumentReader::Context> ctx,
4649
std::vector<pb::common::DocumentWithId>& document_with_ids);
@@ -72,6 +75,28 @@ class DocumentReader {
7275
mvcc::ReaderPtr reader_;
7376
};
7477

78+
class DocumentSearchAllStreamState;
79+
using DocumentSearchAllStreamStatePtr = std::shared_ptr<DocumentSearchAllStreamState>;
80+
81+
class DocumentSearchAllStreamState : public StreamState {
82+
public:
83+
DocumentSearchAllStreamState(std::vector<pb::common::DocumentWithScore>& vec) {
84+
results_ = vec;
85+
current_ = results_.begin();
86+
end_ = results_.end();
87+
}
88+
~DocumentSearchAllStreamState() override = default;
89+
bool Batch(int32_t limit, std::vector<pb::common::DocumentWithScore>& results);
90+
static DocumentSearchAllStreamStatePtr New(std::vector<pb::common::DocumentWithScore> vec) {
91+
return std::make_shared<DocumentSearchAllStreamState>(vec);
92+
}
93+
94+
private:
95+
std::vector<pb::common::DocumentWithScore>::iterator current_;
96+
std::vector<pb::common::DocumentWithScore>::iterator end_;
97+
std::vector<pb::common::DocumentWithScore> results_;
98+
};
99+
75100
} // namespace dingodb
76101

77102
#endif // DINGODB_DOCUMENT_READER_H_

src/engine/engine.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
#include "butil/status.h"
2525
#include "common/context.h"
26+
#include "common/stream.h"
2627
#include "config/config.h"
2728
#include "document/document_index.h"
2829
#include "engine/raw_engine.h"
@@ -225,10 +226,16 @@ class Engine : public std::enable_shared_from_this<Engine> {
225226

226227
DocumentIndexWrapperPtr document_index;
227228
pb::common::ScalarSchema scalar_schema;
229+
230+
StreamPtr Stream() { return stream; }
231+
void SetStream(StreamPtr streamptr) { stream = streamptr; }
232+
StreamPtr stream;
228233
};
229234

230235
virtual butil::Status DocumentSearch(std::shared_ptr<DocumentReader::Context> ctx,
231236
std::vector<pb::common::DocumentWithScore>& results) = 0;
237+
virtual butil::Status DocumentSearchAll(std::shared_ptr<DocumentReader::Context> ctx, bool& has_more,
238+
std::vector<pb::common::DocumentWithScore>& results) = 0;
232239

233240
virtual butil::Status DocumentBatchQuery(std::shared_ptr<DocumentReader::Context> ctx,
234241
std::vector<pb::common::DocumentWithId>& doc_with_ids) = 0;

src/engine/mono_store_engine.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,13 @@ butil::Status MonoStoreEngine::DocumentReader::DocumentSearch(std::shared_ptr<Do
304304
return vector_reader->DocumentSearch(ctx, results);
305305
}
306306

307+
butil::Status MonoStoreEngine::DocumentReader::DocumentSearchAll(std::shared_ptr<DocumentReader::Context> ctx,
308+
bool& has_more,
309+
std::vector<pb::common::DocumentWithScore>& results) {
310+
auto vector_reader = dingodb::DocumentReader::New(reader_);
311+
return vector_reader->DocumentSearchAll(ctx, has_more, results);
312+
}
313+
307314
butil::Status MonoStoreEngine::DocumentReader::DocumentBatchQuery(
308315
std::shared_ptr<DocumentReader::Context> ctx, std::vector<pb::common::DocumentWithId>& document_with_ids) {
309316
auto vector_reader = dingodb::DocumentReader::New(reader_);

src/engine/mono_store_engine.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,8 @@ class MonoStoreEngine : public Engine {
155155

156156
butil::Status DocumentSearch(std::shared_ptr<DocumentReader::Context> ctx,
157157
std::vector<pb::common::DocumentWithScore>& results) override;
158+
butil::Status DocumentSearchAll(std::shared_ptr<DocumentReader::Context> ctx, bool& has_more,
159+
std::vector<pb::common::DocumentWithScore>& results) override;
158160
butil::Status DocumentBatchQuery(std::shared_ptr<DocumentReader::Context> ctx,
159161
std::vector<pb::common::DocumentWithId>& document_with_ids) override;
160162
butil::Status DocumentGetBorderId(int64_t ts, const pb::common::Range& region_range, bool get_min,

src/engine/raft_store_engine.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,13 @@ butil::Status RaftStoreEngine::DocumentReader::DocumentSearch(std::shared_ptr<Do
527527
return document_reader->DocumentSearch(ctx, results);
528528
}
529529

530+
butil::Status RaftStoreEngine::DocumentReader::DocumentSearchAll(std::shared_ptr<DocumentReader::Context> ctx,
531+
bool& has_more,
532+
std::vector<pb::common::DocumentWithScore>& results) {
533+
auto document_reader = dingodb::DocumentReader::New(reader_);
534+
return document_reader->DocumentSearchAll(ctx, has_more, results);
535+
}
536+
530537
butil::Status RaftStoreEngine::DocumentReader::DocumentBatchQuery(
531538
std::shared_ptr<DocumentReader::Context> ctx, std::vector<pb::common::DocumentWithId>& document_with_ids) {
532539
auto document_reader = dingodb::DocumentReader::New(reader_);

src/engine/raft_store_engine.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,8 @@ class RaftStoreEngine : public Engine, public RaftControlAble {
204204

205205
butil::Status DocumentSearch(std::shared_ptr<DocumentReader::Context> ctx,
206206
std::vector<pb::common::DocumentWithScore>& results) override;
207+
butil::Status DocumentSearchAll(std::shared_ptr<DocumentReader::Context> ctx, bool& has_more,
208+
std::vector<pb::common::DocumentWithScore>& results) override;
207209
butil::Status DocumentBatchQuery(std::shared_ptr<DocumentReader::Context> ctx,
208210
std::vector<pb::common::DocumentWithId>& document_with_ids) override;
209211
butil::Status DocumentGetBorderId(int64_t ts, const pb::common::Range& region_range, bool get_min,

src/engine/storage.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,42 @@ butil::Status Storage::DocumentSearch(std::shared_ptr<Engine::DocumentReader::Co
10491049
return butil::Status();
10501050
}
10511051

1052+
butil::Status Storage::DocumentSearchAll(std::shared_ptr<Engine::DocumentReader::Context> ctx,
1053+
const pb::stream::StreamRequestMeta& req_stream_meta, bool& has_more,
1054+
std::vector<pb::common::DocumentWithScore>& results) {
1055+
auto status = ValidateLeader(ctx->region_id);
1056+
if (BAIDU_UNLIKELY(!status.ok())) {
1057+
return status;
1058+
}
1059+
// after validate leader
1060+
auto stream_meta = req_stream_meta;
1061+
auto stream = Server::GetInstance().GetStreamManager()->GetOrNew(stream_meta);
1062+
if (stream == nullptr) {
1063+
return butil::Status(pb::error::ESTREAM_EXPIRED, fmt::format("stream({}) is expired.", stream_meta.stream_id()));
1064+
}
1065+
ctx->SetStream(stream);
1066+
1067+
DINGO_LOG(DEBUG) << "DocumentSearchAll region_id: " << ctx->region_id << ", stream limit: " << stream_meta.limit()
1068+
<< ", has_more: " << has_more;
1069+
1070+
auto document_reader = GetEngineDocumentReader(ctx->store_engine_type, ctx->raw_engine_type);
1071+
1072+
status = document_reader->DocumentSearchAll(ctx, has_more, results);
1073+
if (BAIDU_UNLIKELY(!status.ok())) {
1074+
if (pb::error::EKEY_NOT_FOUND == status.error_code()) {
1075+
// return OK if not found
1076+
return butil::Status::OK();
1077+
}
1078+
Server::GetInstance().GetStreamManager()->RemoveStream(stream);
1079+
return status;
1080+
}
1081+
if (!has_more || stream_meta.close()) {
1082+
Server::GetInstance().GetStreamManager()->RemoveStream(stream);
1083+
}
1084+
1085+
return butil::Status();
1086+
}
1087+
10521088
butil::Status Storage::DocumentGetBorderId(store::RegionPtr region, bool get_min, int64_t ts, int64_t& document_id) {
10531089
auto status = ValidateLeader(region);
10541090
if (BAIDU_UNLIKELY(!status.ok())) {

0 commit comments

Comments
 (0)