Skip to content

Commit 2eb802c

Browse files
authored
feat(offline): support & test new SQLs from online mode (#3619)
* feat(offline): WINDOW without ORDER BY only work if SkewWindowOpt is off * feat(offline): support last join (lastjoin/window)
1 parent e540195 commit 2eb802c

File tree

15 files changed

+341
-132
lines changed

15 files changed

+341
-132
lines changed

cases/query/window_query.yaml

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -904,6 +904,9 @@ cases:
904904
905905
# ======================================================================
906906
# WINDOW without ORDER BY
907+
#
908+
# batch mode tests skipped since ordering in WINDOW is undefined, we only
909+
# verify result for request mode, that's implmentation defined order, not SQL standard
907910
# ======================================================================
908911
- id: 24
909912
desc: ROWS WINDOW WITHOUT ORDER BY
@@ -1132,3 +1135,29 @@ cases:
11321135
3, 1, 0, 3, 3
11331136
4, 2, 1, 3, 3
11341137
5, 3, 2, 3, 3
1138+
- id: 28
1139+
# simple case verify it compile & run for batch mode
1140+
desc: RANGE WINDOW WITHOUT ORDER BY
1141+
inputs:
1142+
- name: t1
1143+
columns:
1144+
- id int
1145+
- gp int
1146+
- ts timestamp
1147+
indexs:
1148+
- idx:gp:ts
1149+
data: |
1150+
1, 100, 20000
1151+
2, 100, 10000
1152+
3, 400, 20000
1153+
4, 400, 10
1154+
5, 400, 15000
1155+
sql: |
1156+
select id, count(ts) over w as agg
1157+
from t1
1158+
window w as (
1159+
partition by gp
1160+
rows_range between unbounded preceding and current row
1161+
)
1162+
expect:
1163+
success: true

hybridse/include/vm/mem_catalog.h

Lines changed: 13 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -258,9 +258,13 @@ class Window : public MemTimeTableHandler {
258258
bool exclude_current_time() const { return exclude_current_time_; }
259259
void set_exclude_current_time(bool flag) { exclude_current_time_ = flag; }
260260

261+
bool without_order_by() const { return without_order_by_; }
262+
void set_without_order_by(bool flag) { without_order_by_ = flag; }
263+
261264
protected:
262265
bool exclude_current_time_ = false;
263266
bool instance_not_in_window_ = false;
267+
bool without_order_by_ = false;
264268
};
265269
class WindowRange {
266270
public:
@@ -356,44 +360,13 @@ class HistoryWindow : public Window {
356360
PopFrontRow();
357361
}
358362
}
363+
bool BufferData(uint64_t key, const Row& row) override;
359364

360-
// aad newer row into window
361-
bool BufferData(uint64_t key, const Row& row) override {
362-
if (!table_.empty() && GetFrontRow().first > key) {
363-
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
364-
return false;
365-
}
366-
auto cur_size = table_.size();
367-
if (cur_size < window_range_.start_row_) {
368-
// current in the ROWS window
369-
int64_t sub = key + window_range_.start_offset_;
370-
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
371-
if (0 == window_range_.end_offset_) {
372-
return BufferCurrentTimeBuffer(key, row, start_ts);
373-
} else {
374-
return BufferEffectiveWindow(key, row, start_ts);
375-
}
376-
} else if (0 == window_range_.end_offset_) {
377-
// current in the ROWS_RANGE window
378-
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
379-
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
380-
return BufferCurrentTimeBuffer(key, row, start_ts);
381-
} else {
382-
// current row BeforeWindow
383-
int64_t sub = (key + window_range_.end_offset_);
384-
uint64_t end_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
385-
return BufferCurrentHistoryBuffer(key, row, end_ts);
386-
}
387-
}
365+
// add newer row into window
366+
bool BufferDataImpl(uint64_t key, const Row& row);
388367

389368
protected:
390-
bool BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts) {
391-
current_history_buffer_.emplace_front(key, row);
392-
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
393-
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
394-
SlideWindow(start_ts, end_ts);
395-
return true;
396-
}
369+
bool BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts);
397370

398371
// sliding rows data from `current_history_buffer_` into effective window
399372
// by giving the new start_ts and end_ts.
@@ -413,77 +386,18 @@ class HistoryWindow : public Window {
413386
// `start_ts_inclusive` and `end_ts_inclusive` can be empty, which effectively means less than 0.
414387
// if `start_ts_inclusive` is empty, no rows goes out of effective window
415388
// if `end_ts_inclusive` is empty, no rows goes out of history buffer and into effective window
416-
void SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive) {
417-
// always try to cleanup the stale rows out of effective window
418-
if (start_ts_inclusive.has_value()) {
419-
Slide(start_ts_inclusive);
420-
}
421-
422-
if (!end_ts_inclusive.has_value()) {
423-
return;
424-
}
425-
426-
while (!current_history_buffer_.empty() && current_history_buffer_.back().first <= end_ts_inclusive) {
427-
auto& back = current_history_buffer_.back();
428-
429-
BufferEffectiveWindow(back.first, back.second, start_ts_inclusive);
430-
current_history_buffer_.pop_back();
431-
}
432-
}
389+
void SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive);
433390

434391
// push the row to the start of window
435392
// - pop last elements in window if exceed max window size
436393
// - also pop last elements in window if there ts less than `start_ts`
437394
//
438395
// if `start_ts` is empty, no rows eliminated from window
439-
bool BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts) {
440-
AddFrontRow(key, row);
441-
return Slide(start_ts);
442-
}
396+
bool BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts);
443397

444-
bool Slide(std::optional<uint64_t> start_ts) {
445-
auto cur_size = table_.size();
446-
while (window_range_.max_size_ > 0 &&
447-
cur_size > window_range_.max_size_) {
448-
PopBackRow();
449-
--cur_size;
450-
}
398+
bool Slide(std::optional<uint64_t> start_ts);
451399

452-
// Slide window if window start bound >= rows/range preceding
453-
while (cur_size > 0) {
454-
const auto& pair = GetBackRow();
455-
if ((kFrameRows == window_range_.frame_type_ || kFrameRowsMergeRowsRange == window_range_.frame_type_) &&
456-
cur_size <= window_range_.start_row_ + 1) {
457-
// note it is always current rows window
458-
break;
459-
}
460-
if (kFrameRows == window_range_.frame_type_ || pair.first < start_ts) {
461-
PopBackRow();
462-
--cur_size;
463-
} else {
464-
break;
465-
}
466-
}
467-
return true;
468-
}
469-
470-
bool BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts) {
471-
if (exclude_current_time_) {
472-
// except `exclude current_row`, the current row is always added to the effective window
473-
// but for next buffer action, previous current row already buffered in `current_history_buffer_`
474-
// so the previous current row need eliminated for this next buf action
475-
PopEffectiveDataIfAny();
476-
if (key == 0) {
477-
SlideWindow(start_ts, {});
478-
} else {
479-
SlideWindow(start_ts, key - 1);
480-
}
481-
current_history_buffer_.emplace_front(key, row);
482-
}
483-
484-
// in queue the current row
485-
return BufferEffectiveWindow(key, row, start_ts);
486-
}
400+
bool BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts);
487401

488402
WindowRange window_range_;
489403
MemTimeTable current_history_buffer_;
@@ -512,20 +426,7 @@ class CurrentHistoryWindow : public HistoryWindow {
512426

513427
void PopFrontData() override { PopFrontRow(); }
514428

515-
bool BufferData(uint64_t key, const Row& row) override {
516-
if (!table_.empty() && GetFrontRow().first > key) {
517-
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
518-
return false;
519-
}
520-
int64_t sub = (key + window_range_.start_offset_);
521-
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
522-
523-
if (exclude_current_time_) {
524-
return BufferCurrentTimeBuffer(key, row, start_ts);
525-
} else {
526-
return BufferEffectiveWindow(key, row, start_ts);
527-
}
528-
}
429+
bool BufferData(uint64_t key, const Row& row) override;
529430
};
530431

531432
typedef std::map<std::string,

hybridse/src/vm/core_api.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
#include "vm/core_api.h"
1818
#include "base/sig_trace.h"
1919
#include "codec/fe_row_codec.h"
20-
#include "udf/default_udf_library.h"
21-
#include "udf/udf.h"
2220
#include "vm/jit_runtime.h"
23-
#include "vm/jit_wrapper.h"
2421
#include "vm/mem_catalog.h"
2522
#include "vm/runner.h"
2623
#include "vm/schemas_context.h"
@@ -30,14 +27,15 @@ namespace vm {
3027

3128
WindowInterface::WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool exclude_current_row,
3229
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
33-
uint64_t rows_preceding, uint64_t max_size) {
30+
uint64_t rows_preceding, uint64_t max_size, bool without_order_by) {
3431
if (exclude_current_row && max_size > 0 && end_offset == 0) {
3532
max_size++;
3633
}
3734
window_impl_ = std::make_unique<HistoryWindow>(
3835
WindowRange(ExtractFrameType(frame_type_str), start_offset, end_offset, rows_preceding, max_size));
3936
window_impl_->set_instance_not_in_window(instance_not_in_window);
4037
window_impl_->set_exclude_current_time(exclude_current_time);
38+
window_impl_->set_without_order_by(without_order_by);
4139
}
4240

4341
bool WindowInterface::BufferData(uint64_t key, const Row& row) {

hybridse/src/vm/core_api.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#ifndef HYBRIDSE_SRC_VM_CORE_API_H_
1818
#define HYBRIDSE_SRC_VM_CORE_API_H_
1919

20-
#include <map>
2120
#include <memory>
2221
#include <string>
2322
#include "codec/fe_row_codec.h"
@@ -41,7 +40,13 @@ class WindowInterface {
4140
public:
4241
WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool execlude_current_row,
4342
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
44-
uint64_t rows_preceding, uint64_t max_size);
43+
uint64_t rows_preceding, uint64_t max_size) {
44+
WindowInterface(instance_not_in_window, exclude_current_time, execlude_current_row, frame_type_str,
45+
start_offset, end_offset, rows_preceding, max_size, false);
46+
}
47+
WindowInterface(bool instance_not_in_window, bool exclude_current_time, bool execlude_current_row,
48+
const std::string& frame_type_str, int64_t start_offset, int64_t end_offset,
49+
uint64_t rows_preceding, uint64_t max_size, bool without_order_by);
4550

4651
bool BufferData(uint64_t key, const Row& row);
4752

hybridse/src/vm/mem_catalog.cc

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,5 +431,123 @@ size_t RowGetSliceSize(int8_t* row_ptr, size_t idx) {
431431
auto row = reinterpret_cast<Row*>(row_ptr);
432432
return row->size(idx);
433433
}
434+
bool HistoryWindow::BufferData(uint64_t key, const Row& row) {
435+
if (without_order_by()) {
436+
return BufferDataImpl(0, row);
437+
}
438+
439+
return BufferDataImpl(key, row);
440+
}
441+
bool HistoryWindow::BufferDataImpl(uint64_t key, const Row& row) {
442+
if (!table_.empty() && GetFrontRow().first > key) {
443+
DLOG(WARNING) << "Fail BufferData: buffer key (" << key << ") less than latest key (" << GetFrontRow().first
444+
<< ")";
445+
return false;
446+
}
447+
auto cur_size = table_.size();
448+
if (cur_size < window_range_.start_row_) {
449+
// current in the ROWS window
450+
int64_t sub = key + window_range_.start_offset_;
451+
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
452+
if (0 == window_range_.end_offset_) {
453+
return BufferCurrentTimeBuffer(key, row, start_ts);
454+
} else {
455+
return BufferEffectiveWindow(key, row, start_ts);
456+
}
457+
} else if (0 == window_range_.end_offset_) {
458+
// current in the ROWS_RANGE window
459+
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
460+
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
461+
return BufferCurrentTimeBuffer(key, row, start_ts);
462+
} else {
463+
// current row BeforeWindow
464+
int64_t sub = (key + window_range_.end_offset_);
465+
uint64_t end_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
466+
return BufferCurrentHistoryBuffer(key, row, end_ts);
467+
}
468+
}
469+
bool HistoryWindow::BufferCurrentHistoryBuffer(uint64_t key, const Row& row, uint64_t end_ts) {
470+
current_history_buffer_.emplace_front(key, row);
471+
int64_t sub = (static_cast<int64_t>(key) + window_range_.start_offset_);
472+
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
473+
SlideWindow(start_ts, end_ts);
474+
return true;
475+
}
476+
void HistoryWindow::SlideWindow(std::optional<uint64_t> start_ts_inclusive, std::optional<uint64_t> end_ts_inclusive) {
477+
// always try to cleanup the stale rows out of effective window
478+
if (start_ts_inclusive.has_value()) {
479+
Slide(start_ts_inclusive);
480+
}
481+
482+
if (!end_ts_inclusive.has_value()) {
483+
return;
484+
}
485+
486+
while (!current_history_buffer_.empty() && current_history_buffer_.back().first <= end_ts_inclusive) {
487+
auto& back = current_history_buffer_.back();
488+
489+
BufferEffectiveWindow(back.first, back.second, start_ts_inclusive);
490+
current_history_buffer_.pop_back();
491+
}
492+
}
493+
bool HistoryWindow::BufferEffectiveWindow(uint64_t key, const Row& row, std::optional<uint64_t> start_ts) {
494+
AddFrontRow(key, row);
495+
return Slide(start_ts);
496+
}
497+
bool HistoryWindow::Slide(std::optional<uint64_t> start_ts) {
498+
auto cur_size = table_.size();
499+
while (window_range_.max_size_ > 0 && cur_size > window_range_.max_size_) {
500+
PopBackRow();
501+
--cur_size;
502+
}
503+
504+
// Slide window if window start bound >= rows/range preceding
505+
while (cur_size > 0) {
506+
const auto& pair = GetBackRow();
507+
if ((kFrameRows == window_range_.frame_type_ || kFrameRowsMergeRowsRange == window_range_.frame_type_) &&
508+
cur_size <= window_range_.start_row_ + 1) {
509+
// note it is always current rows window
510+
break;
511+
}
512+
if (kFrameRows == window_range_.frame_type_ || pair.first < start_ts) {
513+
PopBackRow();
514+
--cur_size;
515+
} else {
516+
break;
517+
}
518+
}
519+
return true;
520+
}
521+
bool HistoryWindow::BufferCurrentTimeBuffer(uint64_t key, const Row& row, uint64_t start_ts) {
522+
if (exclude_current_time_) {
523+
// except `exclude current_row`, the current row is always added to the effective window
524+
// but for next buffer action, previous current row already buffered in `current_history_buffer_`
525+
// so the previous current row need eliminated for this next buf action
526+
PopEffectiveDataIfAny();
527+
if (key == 0) {
528+
SlideWindow(start_ts, {});
529+
} else {
530+
SlideWindow(start_ts, key - 1);
531+
}
532+
current_history_buffer_.emplace_front(key, row);
533+
}
534+
535+
// in queue the current row
536+
return BufferEffectiveWindow(key, row, start_ts);
537+
}
538+
bool CurrentHistoryWindow::BufferData(uint64_t key, const Row& row) {
539+
if (!table_.empty() && GetFrontRow().first > key) {
540+
DLOG(WARNING) << "Fail BufferData: buffer key less than latest key";
541+
return false;
542+
}
543+
int64_t sub = (key + window_range_.start_offset_);
544+
uint64_t start_ts = sub < 0 ? 0u : static_cast<uint64_t>(sub);
545+
546+
if (exclude_current_time_) {
547+
return BufferCurrentTimeBuffer(key, row, start_ts);
548+
} else {
549+
return BufferEffectiveWindow(key, row, start_ts);
550+
}
551+
}
434552
} // namespace vm
435553
} // namespace hybridse

hybridse/src/vm/runner.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -702,6 +702,7 @@ void WindowAggRunner::RunWindowAggOnKey(
702702
HistoryWindow window(instance_window_gen_.range_gen_->window_range_);
703703
window.set_instance_not_in_window(instance_not_in_window_);
704704
window.set_exclude_current_time(exclude_current_time_);
705+
window.set_without_order_by(without_order_by());
705706

706707
while (instance_segment_iter->Valid()) {
707708
if (limit_cnt_.has_value() && cnt >= limit_cnt_) {

hybridse/src/vm/runner.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -502,6 +502,8 @@ class WindowAggRunner : public Runner {
502502
const bool instance_not_in_window_;
503503
const bool exclude_current_time_;
504504

505+
bool without_order_by() const { return !instance_window_gen_.sort_gen_.Valid(); }
506+
505507
// slice size outputed of the first producer node
506508
const size_t append_slices_;
507509
WindowGenerator instance_window_gen_;

0 commit comments

Comments
 (0)