Skip to content

Commit aadff50

Browse files
nokiaMSrock-git
authored andcommitted
[feat][serial] Optimize serial.
1 parent 67ab85e commit aadff50

File tree

5 files changed

+13
-3
lines changed

5 files changed

+13
-3
lines changed

src/coprocessor/coprocessor.cc

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,8 @@ butil::Status Coprocessor::DoExecute(const pb::common::KeyValue& kv, bool* has_r
312312
int ret = 0;
313313
try {
314314
// decode some column. not decode all
315-
ret = original_record_decoder.Decode(kv.key(), kv.value(), selection_column_indexes_, original_record);
315+
ret = original_record_decoder.Decode(kv.key(), kv.value(), selection_column_indexes_,
316+
selection_column_indexes_serial_, original_record);
316317
} catch (const std::exception& my_exception) {
317318
std::string error_message = fmt::format("serial::Decode failed exception : {}", my_exception.what());
318319
DINGO_LOG(ERROR) << error_message;
@@ -657,6 +658,7 @@ void Coprocessor::Close() {
657658

658659
original_column_indexes_.clear();
659660
selection_column_indexes_.clear();
661+
selection_column_indexes_serial_.clear();
660662

661663
if (original_serial_schemas_sorted_) {
662664
original_serial_schemas_sorted_.reset();
@@ -804,6 +806,7 @@ void Coprocessor::GetSelectionColumnIndexes() {
804806
int i = index;
805807
DINGO_LOG(DEBUG) << "i:" << i;
806808
selection_column_indexes_.push_back(i);
809+
selection_column_indexes_serial_[original_column_indexes_[i]] = i;
807810
}
808811
// sort and unique
809812
// std::sort(selection_column_indexes_.begin(), selection_column_indexes_.end(), [](int i, int j) { return i < j;
@@ -818,6 +821,7 @@ void Coprocessor::GetSelectionColumnIndexes() {
818821
int i = index;
819822
DINGO_LOG(DEBUG) << "index:" << i;
820823
selection_column_indexes_.push_back(original_column_indexes_[i]);
824+
selection_column_indexes_serial_[original_column_indexes_[i]] = i;
821825
}
822826
// sort and unique
823827
// std::sort(selection_column_indexes_.begin(), selection_column_indexes_.end(), [](int i, int j) { return i < j;

src/coprocessor/coprocessor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class Coprocessor : public RawCoprocessor {
9595
std::shared_ptr<AggregationIterator> aggregation_iterator_;
9696
std::vector<int> original_column_indexes_;
9797
std::vector<int> selection_column_indexes_;
98+
std::unordered_map<int, int> selection_column_indexes_serial_;
9899

99100
std::shared_ptr<std::vector<std::shared_ptr<BaseSchema>>> original_serial_schemas_sorted_;
100101
std::shared_ptr<std::vector<std::shared_ptr<BaseSchema>>> selection_serial_schemas_sorted_;

src/coprocessor/coprocessor_v2.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,7 @@ void CoprocessorV2::Close() {
413413
original_serial_schemas_.reset();
414414
original_column_indexes_.clear();
415415
selection_column_indexes_.clear();
416+
selection_column_indexes_serial_.clear();
416417
result_serial_schemas_.reset();
417418
result_record_encoder_.reset();
418419
original_record_decoder_.reset();
@@ -575,7 +576,8 @@ butil::Status CoprocessorV2::DoRelExprCoreWrapper(const std::string& key, const
575576
int ret = 0;
576577
try {
577578
// decode some column. not decode all
578-
ret = original_record_decoder_->Decode(key, value, selection_column_indexes_, original_record);
579+
ret = original_record_decoder_->Decode(key, value, selection_column_indexes_,
580+
selection_column_indexes_serial_, original_record);
579581
} catch (const std::exception& my_exception) {
580582
std::string error_message = fmt::format("serial::Decode failed exception : {}", my_exception.what());
581583
DINGO_LOG(ERROR) << error_message;
@@ -722,6 +724,7 @@ void CoprocessorV2::GetSelectionColumnIndexes() {
722724
int i = index;
723725
DINGO_LOG(DEBUG) << "index:" << i;
724726
selection_column_indexes_.push_back(original_column_indexes_[i]);
727+
selection_column_indexes_serial_[original_column_indexes_[i]] = i;
725728
}
726729
} else {
727730
DINGO_LOG(DEBUG) << "selection_columns empty()";

src/coprocessor/coprocessor_v2.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <memory>
2323
#include <string>
2424
#include <vector>
25+
#include <unordered_map>
2526

2627
#include "butil/status.h"
2728
#include "coprocessor/raw_coprocessor.h"
@@ -106,6 +107,7 @@ class CoprocessorV2 : public RawCoprocessor {
106107
std::vector<int> original_column_indexes_; // NOLINT
107108
// index = dummy ; value = original schema index
108109
std::vector<int> selection_column_indexes_; // NOLINT
110+
std::unordered_map<int, int> selection_column_indexes_serial_; // NOLINT
109111
std::shared_ptr<std::vector<std::shared_ptr<BaseSchema>>> result_serial_schemas_; // NOLINT
110112
std::shared_ptr<RecordEncoder> result_record_encoder_; // NOLINT
111113
std::shared_ptr<RecordDecoder> original_record_decoder_; // NOLINT

0 commit comments

Comments
 (0)