Skip to content

Commit 9401a5e

Browse files
authored
feat: support delete(aggregator) (#3327)
1 parent 354bcda commit 9401a5e

File tree

11 files changed

+797
-328
lines changed

11 files changed

+797
-328
lines changed

src/storage/aggregator.cc

Lines changed: 235 additions & 74 deletions
Large diffs are not rendered by default.

src/storage/aggregator.h

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,17 @@ struct AggrBufferLocked {
120120

121121
class Aggregator {
122122
public:
123-
Aggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
124-
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
125-
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
126-
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
123+
Aggregator(const ::openmldb::api::TableMeta& base_meta, std::shared_ptr<Table> base_table,
124+
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
125+
std::shared_ptr<LogReplicator> aggr_replicator,
126+
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
127+
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
127128

128129
~Aggregator();
129130

130131
bool Update(const std::string& key, const std::string& row, uint64_t offset, bool recover = false);
131132

132-
bool Delete(const std::string& key);
133+
bool Delete(const std::string& key, const std::optional<uint64_t>& start_ts, const std::optional<uint64_t>& end_ts);
133134

134135
bool FlushAll();
135136

@@ -158,13 +159,14 @@ class Aggregator {
158159

159160
protected:
160161
codec::Schema base_table_schema_;
161-
codec::Schema aggr_table_schema_;
162162

163163
using FilterMap = absl::flat_hash_map<std::string, AggrBufferLocked>; // filter_column -> aggregator buffer
164164
absl::flat_hash_map<std::string, FilterMap> aggr_buffer_map_; // key -> filter_map
165165
std::mutex mu_;
166166
DataType aggr_col_type_;
167167
DataType ts_col_type_;
168+
std::shared_ptr<Table> base_table_;
169+
codec::Schema aggr_table_schema_;
168170
std::shared_ptr<Table> aggr_table_;
169171
std::shared_ptr<LogReplicator> aggr_replicator_;
170172
std::atomic<AggrStat> status_;
@@ -176,11 +178,16 @@ class Aggregator {
176178
bool CheckBufferFilled(int64_t cur_ts, int64_t buffer_end, int32_t buffer_cnt);
177179

178180
private:
181+
bool DeleteData(const std::string& key, const std::optional<uint64_t>& start_ts,
182+
const std::optional<uint64_t>& end_ts);
183+
179184
virtual bool UpdateAggrVal(const codec::RowView& row_view, const int8_t* row_ptr, AggrBuffer* aggr_buffer) = 0;
180185
virtual bool EncodeAggrVal(const AggrBuffer& buffer, std::string* aggr_val) = 0;
181186
virtual bool DecodeAggrVal(const int8_t* row_ptr, AggrBuffer* buffer) = 0;
182187
bool EncodeAggrBuffer(const std::string& key, const std::string& filter_key,
183188
const AggrBuffer& buffer, const std::string& aggr_val, std::string* encoded_row);
189+
bool RebuildAggrBuffer(const std::string& key, AggrBuffer* aggr_buffer);
190+
bool RebuildFlushedAggrBuffer(const std::string& key, const int8_t* row_ptr);
184191
int64_t AlignedStart(int64_t ts) {
185192
if (window_type_ == WindowType::kRowsRange) {
186193
return ts / window_size_ * window_size_;
@@ -213,10 +220,11 @@ class Aggregator {
213220

214221
class SumAggregator : public Aggregator {
215222
public:
216-
SumAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
217-
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
218-
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
219-
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
223+
SumAggregator(const ::openmldb::api::TableMeta& base_meta, std::shared_ptr<Table> base_table,
224+
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
225+
std::shared_ptr<LogReplicator> aggr_replicator,
226+
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
227+
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
220228

221229
~SumAggregator() = default;
222230

@@ -230,10 +238,11 @@ class SumAggregator : public Aggregator {
230238

231239
class MinMaxBaseAggregator : public Aggregator {
232240
public:
233-
MinMaxBaseAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
234-
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
235-
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
236-
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
241+
MinMaxBaseAggregator(const ::openmldb::api::TableMeta& base_meta, std::shared_ptr<Table> base_table,
242+
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
243+
std::shared_ptr<LogReplicator> aggr_replicator,
244+
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
245+
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
237246

238247
~MinMaxBaseAggregator() = default;
239248

@@ -244,10 +253,11 @@ class MinMaxBaseAggregator : public Aggregator {
244253
};
245254
class MinAggregator : public MinMaxBaseAggregator {
246255
public:
247-
MinAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
248-
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
249-
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
250-
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
256+
MinAggregator(const ::openmldb::api::TableMeta& base_meta, std::shared_ptr<Table> base_table,
257+
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
258+
std::shared_ptr<LogReplicator> aggr_replicator,
259+
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
260+
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
251261

252262
~MinAggregator() = default;
253263

@@ -257,10 +267,11 @@ class MinAggregator : public MinMaxBaseAggregator {
257267

258268
class MaxAggregator : public MinMaxBaseAggregator {
259269
public:
260-
MaxAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
261-
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
262-
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
263-
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
270+
MaxAggregator(const ::openmldb::api::TableMeta& base_meta, std::shared_ptr<Table> base_table,
271+
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
272+
std::shared_ptr<LogReplicator> aggr_replicator,
273+
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
274+
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
264275

265276
~MaxAggregator() = default;
266277

@@ -270,10 +281,11 @@ class MaxAggregator : public MinMaxBaseAggregator {
270281

271282
class CountAggregator : public Aggregator {
272283
public:
273-
CountAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
274-
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
275-
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
276-
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
284+
CountAggregator(const ::openmldb::api::TableMeta& base_meta, std::shared_ptr<Table> base_table,
285+
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
286+
std::shared_ptr<LogReplicator> aggr_replicator,
287+
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
288+
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
277289

278290
~CountAggregator() = default;
279291

@@ -289,10 +301,11 @@ class CountAggregator : public Aggregator {
289301

290302
class AvgAggregator : public Aggregator {
291303
public:
292-
AvgAggregator(const ::openmldb::api::TableMeta& base_meta, const ::openmldb::api::TableMeta& aggr_meta,
293-
std::shared_ptr<Table> aggr_table, std::shared_ptr<LogReplicator> aggr_replicator,
294-
const uint32_t& index_pos, const std::string& aggr_col, const AggrType& aggr_type,
295-
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
304+
AvgAggregator(const ::openmldb::api::TableMeta& base_meta, std::shared_ptr<Table> base_table,
305+
const ::openmldb::api::TableMeta& aggr_meta, std::shared_ptr<Table> aggr_table,
306+
std::shared_ptr<LogReplicator> aggr_replicator,
307+
uint32_t index_pos, const std::string& aggr_col, const AggrType& aggr_type,
308+
const std::string& ts_col, WindowType window_tpye, uint32_t window_size);
296309

297310
~AvgAggregator() = default;
298311

@@ -305,6 +318,7 @@ class AvgAggregator : public Aggregator {
305318
};
306319

307320
std::shared_ptr<Aggregator> CreateAggregator(const ::openmldb::api::TableMeta& base_meta,
321+
std::shared_ptr<Table> base_table,
308322
const ::openmldb::api::TableMeta& aggr_meta,
309323
std::shared_ptr<Table> aggr_table,
310324
std::shared_ptr<LogReplicator> aggr_replicator, uint32_t index_pos,

src/storage/aggregator_test.cc

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,8 @@ bool GetUpdatedResult(const uint32_t& id, const std::string& aggr_col, const std
123123
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
124124
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
125125
replicator->Init();
126-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, aggr_col, aggr_type,
127-
"ts_col", bucket_size, "low_card");
126+
auto aggr = CreateAggregator(base_table_meta, table, aggr_table_meta, aggr_table, replicator, 0,
127+
aggr_col, aggr_type, "ts_col", bucket_size, "low_card");
128128
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
129129
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
130130
base_replicator->Init();
@@ -319,7 +319,8 @@ void CheckCountWhereAggrResult(std::shared_ptr<Table> aggr_table, std::shared_pt
319319
TEST_F(AggregatorTest, CreateAggregator) {
320320
// rows_num window type
321321
std::map<std::string, std::string> map;
322-
std::string folder = "/tmp/" + GenRand() + "/";
322+
::openmldb::test::TempPath tmp_path;
323+
std::string folder = tmp_path.GetTempPath();
323324
{
324325
uint32_t id = counter++;
325326
::openmldb::api::TableMeta base_table_meta;
@@ -334,8 +335,8 @@ TEST_F(AggregatorTest, CreateAggregator) {
334335
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
335336
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
336337
replicator->Init();
337-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum",
338-
"ts_col", "1000");
338+
auto aggr = CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
339+
"col3", "sum", "ts_col", "1000");
339340
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
340341
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
341342
base_replicator->Init();
@@ -360,8 +361,8 @@ TEST_F(AggregatorTest, CreateAggregator) {
360361
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
361362
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
362363
replicator->Init();
363-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum",
364-
"ts_col", "1d");
364+
auto aggr = CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
365+
"col3", "sum", "ts_col", "1d");
365366
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
366367
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
367368
base_replicator->Init();
@@ -385,8 +386,8 @@ TEST_F(AggregatorTest, CreateAggregator) {
385386
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
386387
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
387388
replicator->Init();
388-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum",
389-
"ts_col", "2s");
389+
auto aggr = CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
390+
"col3", "sum", "ts_col", "2s");
390391
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
391392
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
392393
base_replicator->Init();
@@ -410,8 +411,8 @@ TEST_F(AggregatorTest, CreateAggregator) {
410411
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
411412
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
412413
replicator->Init();
413-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum",
414-
"ts_col", "3m");
414+
auto aggr = CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
415+
"col3", "sum", "ts_col", "3m");
415416
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
416417
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
417418
base_replicator->Init();
@@ -435,8 +436,8 @@ TEST_F(AggregatorTest, CreateAggregator) {
435436
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
436437
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
437438
replicator->Init();
438-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum",
439-
"ts_col", "100h");
439+
auto aggr = CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
440+
"col3", "sum", "ts_col", "100h");
440441
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
441442
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
442443
base_replicator->Init();
@@ -471,7 +472,8 @@ TEST_F(AggregatorTest, SumAggregatorUpdate) {
471472
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
472473
replicator->Init();
473474
auto aggr =
474-
CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum", "ts_col", "2");
475+
CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
476+
"col3", "sum", "ts_col", "2");
475477
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
476478
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
477479
base_replicator->Init();
@@ -739,7 +741,8 @@ TEST_F(AggregatorTest, OutOfOrder) {
739741
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
740742
replicator->Init();
741743
auto aggr =
742-
CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "sum", "ts_col", "1s");
744+
CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
745+
"col3", "sum", "ts_col", "1s");
743746
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
744747
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
745748
base_replicator->Init();
@@ -808,7 +811,8 @@ TEST_F(AggregatorTest, OutOfOrder) {
808811

809812
TEST_F(AggregatorTest, OutOfOrderCountWhere) {
810813
std::map<std::string, std::string> map;
811-
std::string folder = "/tmp/" + GenRand() + "/";
814+
::openmldb::test::TempPath tmp_path;
815+
std::string folder = tmp_path.GetTempPath();
812816
uint32_t id = counter++;
813817
::openmldb::api::TableMeta base_table_meta;
814818
base_table_meta.set_tid(id);
@@ -822,8 +826,8 @@ TEST_F(AggregatorTest, OutOfOrderCountWhere) {
822826
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
823827
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
824828
replicator->Init();
825-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "count_where",
826-
"ts_col", "1s", "low_card");
829+
auto aggr = CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
830+
"col3", "count_where", "ts_col", "1s", "low_card");
827831
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
828832
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
829833
base_replicator->Init();
@@ -914,7 +918,8 @@ TEST_F(AggregatorTest, OutOfOrderCountWhere) {
914918

915919
TEST_F(AggregatorTest, AlignedCountWhere) {
916920
std::map<std::string, std::string> map;
917-
std::string folder = "/tmp/" + GenRand() + "/";
921+
::openmldb::test::TempPath tmp_path;
922+
std::string folder = tmp_path.GetTempPath();
918923
uint32_t id = counter++;
919924
::openmldb::api::TableMeta base_table_meta;
920925
base_table_meta.set_tid(id);
@@ -928,8 +933,8 @@ TEST_F(AggregatorTest, AlignedCountWhere) {
928933
std::shared_ptr<LogReplicator> replicator = std::make_shared<LogReplicator>(
929934
aggr_table->GetId(), aggr_table->GetPid(), folder, map, ::openmldb::replica::kLeaderNode);
930935
replicator->Init();
931-
auto aggr = CreateAggregator(base_table_meta, aggr_table_meta, aggr_table, replicator, 0, "col3", "count_where",
932-
"ts_col", "1s", "low_card");
936+
auto aggr = CreateAggregator(base_table_meta, nullptr, aggr_table_meta, aggr_table, replicator, 0,
937+
"col3", "count_where", "ts_col", "1s", "low_card");
933938
std::shared_ptr<LogReplicator> base_replicator = std::make_shared<LogReplicator>(
934939
base_table_meta.tid(), base_table_meta.pid(), folder, map, ::openmldb::replica::kLeaderNode);
935940
base_replicator->Init();

0 commit comments

Comments
 (0)