Skip to content

Commit c2b7817

Browse files
authored
feat(online): support last join (window) (#3565)
1 parent 125483b commit c2b7817

30 files changed

+1351
-270
lines changed

cases/query/last_join_subquery_window.yml

Lines changed: 406 additions & 0 deletions
Large diffs are not rendered by default.

hybridse/examples/toydb/src/tablet/tablet_catalog.cc

Lines changed: 1 addition & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -136,22 +136,6 @@ RowIterator* TabletTableHandler::GetRawIterator() {
136136
return new storage::FullTableIterator(table_->GetSegments(),
137137
table_->GetSegCnt(), table_);
138138
}
139-
const uint64_t TabletTableHandler::GetCount() {
140-
auto iter = GetIterator();
141-
uint64_t cnt = 0;
142-
while (iter->Valid()) {
143-
iter->Next();
144-
cnt++;
145-
}
146-
return cnt;
147-
}
148-
Row TabletTableHandler::At(uint64_t pos) {
149-
auto iter = GetIterator();
150-
while (pos-- > 0 && iter->Valid()) {
151-
iter->Next();
152-
}
153-
return iter->Valid() ? iter->GetValue() : Row();
154-
}
155139

156140
TabletCatalog::TabletCatalog() : tables_(), db_() {}
157141

@@ -249,22 +233,6 @@ std::unique_ptr<WindowIterator> TabletSegmentHandler::GetWindowIterator(
249233
const std::string& idx_name) {
250234
return std::unique_ptr<WindowIterator>();
251235
}
252-
const uint64_t TabletSegmentHandler::GetCount() {
253-
auto iter = GetIterator();
254-
uint64_t cnt = 0;
255-
while (iter->Valid()) {
256-
cnt++;
257-
iter->Next();
258-
}
259-
return cnt;
260-
}
261-
Row TabletSegmentHandler::At(uint64_t pos) {
262-
auto iter = GetIterator();
263-
while (pos-- > 0 && iter->Valid()) {
264-
iter->Next();
265-
}
266-
return iter->Valid() ? iter->GetValue() : Row();
267-
}
268236

269237
const uint64_t TabletPartitionHandler::GetCount() {
270238
auto iter = GetWindowIterator();
@@ -275,5 +243,6 @@ const uint64_t TabletPartitionHandler::GetCount() {
275243
}
276244
return cnt;
277245
}
246+
278247
} // namespace tablet
279248
} // namespace hybridse

hybridse/examples/toydb/src/tablet/tablet_catalog.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,6 @@ class TabletSegmentHandler : public TableHandler {
6868
std::unique_ptr<vm::RowIterator> GetIterator() override;
6969
RowIterator* GetRawIterator() override;
7070
std::unique_ptr<codec::WindowIterator> GetWindowIterator(const std::string& idx_name) override;
71-
const uint64_t GetCount() override;
72-
Row At(uint64_t pos) override;
7371
const std::string GetHandlerTypeName() override {
7472
return "TabletSegmentHandler";
7573
}
@@ -104,6 +102,7 @@ class TabletPartitionHandler
104102
std::unique_ptr<codec::WindowIterator> GetWindowIterator() override {
105103
return table_handler_->GetWindowIterator(index_name_);
106104
}
105+
107106
const uint64_t GetCount() override;
108107

109108
std::shared_ptr<TableHandler> GetSegment(const std::string& key) override {
@@ -152,8 +151,6 @@ class TabletTableHandler
152151
RowIterator* GetRawIterator() override;
153152
std::unique_ptr<codec::WindowIterator> GetWindowIterator(
154153
const std::string& idx_name);
155-
virtual const uint64_t GetCount();
156-
Row At(uint64_t pos) override;
157154

158155
virtual std::shared_ptr<PartitionHandler> GetPartition(
159156
const std::string& index_name) {

hybridse/examples/toydb/src/testing/toydb_engine_test_base.cc

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
*/
1616

1717
#include "testing/toydb_engine_test_base.h"
18+
19+
#include "absl/strings/str_join.h"
1820
#include "gtest/gtest.h"
19-
#include "gtest/internal/gtest-param-util.h"
2021

2122
using namespace llvm; // NOLINT (build/namespaces)
2223
using namespace llvm::orc; // NOLINT (build/namespaces)
@@ -141,18 +142,12 @@ std::shared_ptr<tablet::TabletCatalog> BuildOnePkTableStorage(
141142
}
142143
return catalog;
143144
}
144-
void BatchRequestEngineCheckWithCommonColumnIndices(
145-
const SqlCase& sql_case, const EngineOptions options,
146-
const std::set<size_t>& common_column_indices) {
147-
std::ostringstream oss;
148-
for (size_t index : common_column_indices) {
149-
oss << index << ",";
150-
}
151-
LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: "
152-
"common_column_indices = ["
153-
<< oss.str() << "]";
154-
ToydbBatchRequestEngineTestRunner engine_test(sql_case, options,
155-
common_column_indices);
145+
// Run check with common column index info
146+
void BatchRequestEngineCheckWithCommonColumnIndices(const SqlCase& sql_case, const EngineOptions options,
147+
const std::set<size_t>& common_column_indices) {
148+
LOG(INFO) << "BatchRequestEngineCheckWithCommonColumnIndices: common_column_indices = ["
149+
<< absl::StrJoin(common_column_indices, ",") << "]";
150+
ToydbBatchRequestEngineTestRunner engine_test(sql_case, options, common_column_indices);
156151
engine_test.RunCheck();
157152
}
158153

hybridse/include/codec/row.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class Row {
5454

5555
inline int32_t size() const { return slice_.size(); }
5656
inline int32_t size(int32_t pos) const {
57-
return 0 == pos ? slice_.size() : slices_[pos - 1].size();
57+
return 0 == pos ? slice_.size() : slices_.at(pos - 1).size();
5858
}
5959

6060
// Return true if the length of the referenced data is zero

hybridse/include/codec/row_iterator.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,14 @@ class WindowIterator {
7171
virtual bool Valid() = 0;
7272
/// Return the RowIterator of current segment
7373
/// of dataset if Valid() return `true`.
74-
virtual std::unique_ptr<RowIterator> GetValue() = 0;
74+
virtual std::unique_ptr<RowIterator> GetValue() {
75+
auto p = GetRawValue();
76+
if (!p) {
77+
return nullptr;
78+
}
79+
80+
return std::unique_ptr<RowIterator>(p);
81+
}
7582
/// Return the RowIterator of current segment
7683
/// of dataset if Valid() return `true`.
7784
virtual RowIterator *GetRawValue() = 0;

hybridse/include/codec/row_list.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class ListV {
7676
virtual const uint64_t GetCount() {
7777
auto iter = GetIterator();
7878
uint64_t cnt = 0;
79-
while (iter->Valid()) {
79+
while (iter && iter->Valid()) {
8080
iter->Next();
8181
cnt++;
8282
}

hybridse/include/vm/catalog.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ class TableHandler : public DataHandler {
217217
virtual ~TableHandler() {}
218218

219219
/// Return table column Types information.
220+
/// TODO: rm it, never used
220221
virtual const Types& GetTypes() = 0;
221222

222223
/// Return the index information

hybridse/include/vm/mem_catalog.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@
2525
#include <string>
2626
#include <utility>
2727
#include <vector>
28-
#include "base/fe_slice.h"
29-
#include "codec/list_iterator_codec.h"
3028
#include "glog/logging.h"
3129
#include "vm/catalog.h"
3230

@@ -674,6 +672,7 @@ class MemPartitionHandler
674672
IndexHint index_hint_;
675673
OrderType order_type_;
676674
};
675+
677676
class ConcatTableHandler : public MemTimeTableHandler {
678677
public:
679678
ConcatTableHandler(std::shared_ptr<TableHandler> left, size_t left_slices,
@@ -692,19 +691,19 @@ class ConcatTableHandler : public MemTimeTableHandler {
692691
status_ = SyncValue();
693692
return MemTimeTableHandler::At(pos);
694693
}
695-
std::unique_ptr<RowIterator> GetIterator() {
694+
std::unique_ptr<RowIterator> GetIterator() override {
696695
if (status_.isRunning()) {
697696
status_ = SyncValue();
698697
}
699698
return MemTimeTableHandler::GetIterator();
700699
}
701-
RowIterator* GetRawIterator() {
700+
RowIterator* GetRawIterator() override {
702701
if (status_.isRunning()) {
703702
status_ = SyncValue();
704703
}
705704
return MemTimeTableHandler::GetRawIterator();
706705
}
707-
virtual const uint64_t GetCount() {
706+
const uint64_t GetCount() override {
708707
if (status_.isRunning()) {
709708
status_ = SyncValue();
710709
}

hybridse/include/vm/physical_op.h

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -785,7 +785,11 @@ class PhysicalAggregationNode : public PhysicalProjectNode {
785785
public:
786786
PhysicalAggregationNode(PhysicalOpNode *node, const ColumnProjects &project, const node::ExprNode *condition)
787787
: PhysicalProjectNode(node, kAggregation, project, true), having_condition_(condition) {
788-
output_type_ = kSchemaTypeRow;
788+
if (node->GetOutputType() == kSchemaTypeGroup) {
789+
output_type_ = kSchemaTypeGroup;
790+
} else {
791+
output_type_ = kSchemaTypeRow;
792+
}
789793
fn_infos_.push_back(&having_condition_.fn_info());
790794
}
791795
virtual ~PhysicalAggregationNode() {}
@@ -1065,7 +1069,7 @@ class RequestWindowUnionList {
10651069
RequestWindowUnionList() : window_unions_() {}
10661070
virtual ~RequestWindowUnionList() {}
10671071
void AddWindowUnion(PhysicalOpNode *node, const RequestWindowOp &window) {
1068-
window_unions_.push_back(std::make_pair(node, window));
1072+
window_unions_.emplace_back(node, window);
10691073
}
10701074
const PhysicalOpNode *GetKey(uint32_t index) {
10711075
auto iter = window_unions_.begin();
@@ -1415,7 +1419,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
14151419
instance_not_in_window_(false),
14161420
exclude_current_time_(false),
14171421
output_request_row_(true) {
1418-
output_type_ = kSchemaTypeTable;
1422+
InitOuptput();
14191423

14201424
fn_infos_.push_back(&window_.partition_.fn_info());
14211425
fn_infos_.push_back(&window_.index_key_.fn_info());
@@ -1427,7 +1431,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
14271431
instance_not_in_window_(w_ptr->instance_not_in_window()),
14281432
exclude_current_time_(w_ptr->exclude_current_time()),
14291433
output_request_row_(true) {
1430-
output_type_ = kSchemaTypeTable;
1434+
InitOuptput();
14311435

14321436
fn_infos_.push_back(&window_.partition_.fn_info());
14331437
fn_infos_.push_back(&window_.sort_.fn_info());
@@ -1443,7 +1447,7 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
14431447
instance_not_in_window_(instance_not_in_window),
14441448
exclude_current_time_(exclude_current_time),
14451449
output_request_row_(output_request_row) {
1446-
output_type_ = kSchemaTypeTable;
1450+
InitOuptput();
14471451

14481452
fn_infos_.push_back(&window_.partition_.fn_info());
14491453
fn_infos_.push_back(&window_.sort_.fn_info());
@@ -1455,7 +1459,8 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
14551459
virtual void Print(std::ostream &output, const std::string &tab) const;
14561460
const bool Valid() { return true; }
14571461
static PhysicalRequestUnionNode *CastFrom(PhysicalOpNode *node);
1458-
bool AddWindowUnion(PhysicalOpNode *node) {
1462+
bool AddWindowUnion(PhysicalOpNode *node) { return AddWindowUnion(node, window_); }
1463+
bool AddWindowUnion(PhysicalOpNode *node, const RequestWindowOp& window) {
14591464
if (nullptr == node) {
14601465
LOG(WARNING) << "Fail to add window union : table is null";
14611466
return false;
@@ -1472,9 +1477,8 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
14721477
<< "Union Table and window input schema aren't consistent";
14731478
return false;
14741479
}
1475-
window_unions_.AddWindowUnion(node, window_);
1476-
RequestWindowOp &window_union =
1477-
window_unions_.window_unions_.back().second;
1480+
window_unions_.AddWindowUnion(node, window);
1481+
RequestWindowOp &window_union = window_unions_.window_unions_.back().second;
14781482
fn_infos_.push_back(&window_union.partition_.fn_info());
14791483
fn_infos_.push_back(&window_union.sort_.fn_info());
14801484
fn_infos_.push_back(&window_union.range_.fn_info());
@@ -1484,11 +1488,10 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
14841488

14851489
std::vector<PhysicalOpNode *> GetDependents() const override;
14861490

1487-
const bool instance_not_in_window() const {
1488-
return instance_not_in_window_;
1489-
}
1490-
const bool exclude_current_time() const { return exclude_current_time_; }
1491-
const bool output_request_row() const { return output_request_row_; }
1491+
bool instance_not_in_window() const { return instance_not_in_window_; }
1492+
bool exclude_current_time() const { return exclude_current_time_; }
1493+
bool output_request_row() const { return output_request_row_; }
1494+
void set_output_request_row(bool flag) { output_request_row_ = flag; }
14921495
const RequestWindowOp &window() const { return window_; }
14931496
const RequestWindowUnionList &window_unions() const {
14941497
return window_unions_;
@@ -1506,10 +1509,20 @@ class PhysicalRequestUnionNode : public PhysicalBinaryNode {
15061509
}
15071510

15081511
RequestWindowOp window_;
1509-
const bool instance_not_in_window_;
1510-
const bool exclude_current_time_;
1511-
const bool output_request_row_;
1512+
bool instance_not_in_window_;
1513+
bool exclude_current_time_;
1514+
bool output_request_row_;
15121515
RequestWindowUnionList window_unions_;
1516+
1517+
private:
1518+
void InitOuptput() {
1519+
auto left = GetProducer(0);
1520+
if (left->GetOutputType() == kSchemaTypeRow) {
1521+
output_type_ = kSchemaTypeTable;
1522+
} else {
1523+
output_type_ = kSchemaTypeGroup;
1524+
}
1525+
}
15131526
};
15141527

15151528
class PhysicalRequestAggUnionNode : public PhysicalOpNode {

0 commit comments

Comments
 (0)