diff --git a/src/flags.cc b/src/flags.cc index 5406b652d29..adc4e1712f4 100644 --- a/src/flags.cc +++ b/src/flags.cc @@ -135,6 +135,9 @@ DEFINE_int32(snapshot_pool_size, 1, "the size of tablet thread pool for making s DEFINE_uint32(load_index_max_wait_time, 120 * 60 * 1000, "config the max wait time of load index. unit is milliseconds"); +DEFINE_uint32(disk_stat_bloom_filter_bitset_size, 10000, "config the size of bitset in bloom filter"); +DEFINE_uint32(disk_stat_bloom_filter_hash_seed, 7, "config the count of hash seed in bloom filter, max 7"); + DEFINE_string(recycle_bin_root_path, "/tmp/recycle", "specify the root path of recycle bin"); DEFINE_string(recycle_bin_ssd_root_path, "", "specify the root path of recycle bin in ssd"); DEFINE_string(recycle_bin_hdd_root_path, "", "specify the root path of recycle bin in hdd"); diff --git a/src/storage/disk_table.cc b/src/storage/disk_table.cc index 3b66c176d1b..bb5d7276575 100644 --- a/src/storage/disk_table.cc +++ b/src/storage/disk_table.cc @@ -17,6 +17,7 @@ #include "storage/disk_table.h" #include #include +#include #include "base/file_util.h" #include "base/glog_wrapper.h" #include "base/hash.h" @@ -152,6 +153,14 @@ bool DiskTable::InitColumnFamilyDescriptor() { cf_ds_.push_back( rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions())); auto inner_indexs = table_index_.GetAllInnerIndex(); + for (uint32_t i = 0; i < inner_indexs->size(); i++) { + pk_cnt_vec_.push_back(std::make_shared>(0)); + bloom_filter_vec_.push_back(BloomFilter()); + } + auto indexs = table_index_.GetAllIndex(); + for (uint32_t i = 0; i < indexs.size(); i++) { + idx_cnt_vec_.push_back(std::make_shared>(0)); + } for (const auto& inner_index : *inner_indexs) { rocksdb::ColumnFamilyOptions cfo; if (storage_mode_ == ::openmldb::common::StorageMode::kSSD) { @@ -164,11 +173,15 @@ bool DiskTable::InitColumnFamilyDescriptor() { cfo.comparator = &cmp_; cfo.prefix_extractor.reset(new KeyTsPrefixTransform()); const auto& indexs = inner_index->GetIndex(); - auto index_def = indexs.front(); - if (index_def->GetTTLType() == ::openmldb::storage::TTLType::kAbsoluteTime || - index_def->GetTTLType() == ::openmldb::storage::TTLType::kAbsOrLat) { - cfo.compaction_filter_factory = std::make_shared(inner_index); + for (const auto& index_def : indexs) { + if (index_def->GetTTLType() == ::openmldb::storage::TTLType::kAbsoluteTime) { + cfo.compaction_filter_factory = std::make_shared( + inner_index, &idx_cnt_vec_, pk_cnt_vec_[inner_index->GetId()], + &bloom_filter_vec_[inner_index->GetId()]); + break; + } } + auto index_def = indexs.front(); cf_ds_.push_back(rocksdb::ColumnFamilyDescriptor(index_def->GetName(), cfo)); DEBUGLOG("add cf_name %s. tid %u pid %u", index_def->GetName().c_str(), id_, pid_); } @@ -209,6 +222,11 @@ bool DiskTable::Put(const std::string& pk, uint64_t time, const char* data, uint s = db_->Put(write_opts_, cf_hs_[1], spk, rocksdb::Slice(data, size)); if (s.ok()) { offset_.fetch_add(1, std::memory_order_relaxed); + idx_cnt_vec_[0]->fetch_add(1, std::memory_order_relaxed); + if (!bloom_filter_vec_[0].Valid(pk.c_str())) { + bloom_filter_vec_[0].Set(pk.c_str()); + pk_cnt_vec_[0]->fetch_add(1, std::memory_order_relaxed); + } return true; } else { DEBUGLOG("Put failed. tid %u pid %u msg %s", id_, pid_, s.ToString().c_str()); @@ -264,6 +282,14 @@ bool DiskTable::Put(uint64_t time, const std::string& value, const Dimensions& d } s = db_->Write(write_opts_, &batch); if (s.ok()) { + for (Dimensions::const_iterator it = dimensions.begin(); it != dimensions.end(); ++it) { + idx_cnt_vec_[it->idx()]->fetch_add(1, std::memory_order_relaxed); + int32_t inner_pos = table_index_.GetInnerIndexPos(it->idx()); + if (!bloom_filter_vec_[inner_pos].Valid(it->key().c_str())) { + bloom_filter_vec_[inner_pos].Set(it->key().c_str()); + pk_cnt_vec_[inner_pos]->fetch_add(1, std::memory_order_relaxed); + } + } offset_.fetch_add(1, std::memory_order_relaxed); return true; } else { @@ -279,6 +305,13 @@ bool DiskTable::Delete(const std::string& pk, uint32_t idx) { return false; } auto inner_index = table_index_.GetInnerIndex(index_def->GetInnerPos()); + rocksdb::ReadOptions ro = rocksdb::ReadOptions(); + const rocksdb::Snapshot* snapshot = db_->GetSnapshot(); + ro.snapshot = snapshot; + ro.pin_data = true; + rocksdb::Iterator* it = db_->NewIterator(ro, cf_hs_[index_def->GetInnerPos() + 1]); + std::map delete_idx_cnt; + if (inner_index && inner_index->GetIndex().size() > 1) { const auto& indexs = inner_index->GetIndex(); for (const auto& index : indexs) { @@ -288,16 +321,56 @@ bool DiskTable::Delete(const std::string& pk, uint32_t idx) { } std::string combine_key1 = CombineKeyTs(pk, UINT64_MAX, ts_col->GetId()); std::string combine_key2 = CombineKeyTs(pk, 0, ts_col->GetId()); - batch.DeleteRange(cf_hs_[idx + 1], rocksdb::Slice(combine_key1), rocksdb::Slice(combine_key2)); + it->Seek(rocksdb::Slice(combine_key1)); + while (it->Valid()) { + std::string cur_pk; + uint64_t cur_ts; + uint32_t cur_ts_idx; + ParseKeyAndTs(true, it->key(), cur_pk, cur_ts, cur_ts_idx); + if (pk.compare(cur_pk) != 0 || cur_ts_idx != ts_col->GetId()) { + break; + } + if (delete_idx_cnt.find(index->GetId()) != delete_idx_cnt.end()) { + delete_idx_cnt[index->GetId()]++; + } else { + delete_idx_cnt.emplace(index->GetId(), 1); + } + it->Next(); + } + batch.DeleteRange(cf_hs_[index_def->GetInnerPos() + 1], rocksdb::Slice(combine_key1), + rocksdb::Slice(combine_key2)); } } else { std::string combine_key1 = CombineKeyTs(pk, UINT64_MAX); std::string combine_key2 = CombineKeyTs(pk, 0); - batch.DeleteRange(cf_hs_[idx + 1], rocksdb::Slice(combine_key1), rocksdb::Slice(combine_key2)); + it->Seek(rocksdb::Slice(combine_key1)); + const auto& index = inner_index->GetIndex().front(); + while (it->Valid()) { + std::string cur_pk; + uint64_t cur_ts; + ParseKeyAndTs(it->key(), cur_pk, cur_ts); + if (pk.compare(cur_pk) != 0) { + break; + } + if (delete_idx_cnt.find(index->GetId()) != delete_idx_cnt.end()) { + delete_idx_cnt[index->GetId()]++; + } else { + delete_idx_cnt.emplace(index->GetId(), 1); + } + it->Next(); + } + rocksdb::Status s = batch.DeleteRange(cf_hs_[index_def->GetInnerPos() + 1], rocksdb::Slice(combine_key1), + rocksdb::Slice(combine_key2)); } rocksdb::Status s = db_->Write(write_opts_, &batch); if (s.ok()) { offset_.fetch_add(1, std::memory_order_relaxed); + for (auto ts_idx_iter = delete_idx_cnt.begin(); ts_idx_iter != delete_idx_cnt.end(); ts_idx_iter++) { + idx_cnt_vec_[ts_idx_iter->first]->fetch_sub(ts_idx_iter->second, std::memory_order_relaxed); + } + if (delete_idx_cnt.size() > 0) { + pk_cnt_vec_[index_def->GetInnerPos()]->fetch_sub(1); + } return true; } else { DEBUGLOG("Delete failed. tid %u pid %u msg %s", id_, pid_, s.ToString().c_str()); @@ -322,10 +395,24 @@ bool DiskTable::Get(uint32_t idx, const std::string& pk, uint64_t ts, std::strin bool DiskTable::Get(const std::string& pk, uint64_t ts, std::string& value) { return Get(0, pk, ts, value); } void DiskTable::SchedGc() { + ResetRecordCnt(); GcHead(); + GcTTL(); UpdateTTL(); } +void DiskTable::ResetRecordCnt() { + auto indexs = table_index_.GetAllIndex(); + for (const auto& index : indexs) { + idx_cnt_vec_[index->GetId()]->store(0, std::memory_order_relaxed); + } + auto inner_indexs = table_index_.GetAllInnerIndex(); + for (const auto& inner_index : *inner_indexs) { + pk_cnt_vec_[inner_index->GetId()]->store(0, std::memory_order_relaxed); + bloom_filter_vec_[inner_index->GetId()].Reset(); + } +} + void DiskTable::GcHead() { uint64_t start_time = ::baidu::common::timer::get_micros() / 1000; auto inner_indexs = table_index_.GetAllInnerIndex(); @@ -340,21 +427,23 @@ void DiskTable::GcHead() { it->SeekToFirst(); const auto& indexs = inner_index->GetIndex(); if (indexs.size() > 1) { - bool need_ttl = false; std::map ttl_map; + std::map idx_map; + std::set other_ttl_set; for (const auto& index : indexs) { auto ts_col = index->GetTsColumn(); if (ts_col) { auto lat_ttl = index->GetTTL()->lat_ttl; if (lat_ttl > 0) { ttl_map.emplace(ts_col->GetId(), lat_ttl); - need_ttl = true; } + auto TTL_type = index->GetTTLType(); + if (TTL_type != openmldb::storage::TTLType::kLatestTime) { + other_ttl_set.insert(ts_col->GetId()); + } + idx_map.emplace(ts_col->GetId(), index->GetId()); } } - if (!need_ttl) { - continue; - } std::map key_cnt; std::map delete_key_map; std::string last_pk; @@ -363,15 +452,19 @@ void DiskTable::GcHead() { uint64_t ts = 0; uint32_t ts_idx = 0; ParseKeyAndTs(true, it->key(), cur_pk, ts, ts_idx); + if (other_ttl_set.find(ts_idx) != other_ttl_set.end()) { + it->Next(); + continue; + } if (!last_pk.empty() && cur_pk == last_pk) { + auto key_cnt_iter = key_cnt.find(ts_idx); + if (key_cnt_iter == key_cnt.end()) { + key_cnt.insert(std::make_pair(ts_idx, 1)); + } else { + key_cnt_iter->second++; + } auto ttl_iter = ttl_map.find(ts_idx); if (ttl_iter != ttl_map.end() && ttl_iter->second > 0) { - auto key_cnt_iter = key_cnt.find(ts_idx); - if (key_cnt_iter == key_cnt.end()) { - key_cnt.insert(std::make_pair(ts_idx, 1)); - } else { - key_cnt_iter->second++; - } if (key_cnt_iter->second > ttl_iter->second && delete_key_map.find(ts_idx) == delete_key_map.end()) { delete_key_map.insert(std::make_pair(ts_idx, ts)); @@ -387,6 +480,24 @@ void DiskTable::GcHead() { PDLOG(WARNING, "Delete failed. tid %u pid %u msg %s", id_, pid_, s.ToString().c_str()); } } + for (auto ts_idx_iter = key_cnt.begin(); ts_idx_iter != key_cnt.end(); ts_idx_iter++) { + auto index_iterator = idx_map.find(ts_idx_iter->first); + auto ttl_iter = ttl_map.find(ts_idx_iter->first); + if (ttl_iter != ttl_map.end() && ttl_iter->second > 0 && + ts_idx_iter->second <= ttl_iter->second) { + idx_cnt_vec_[index_iterator->second]->fetch_add(ts_idx_iter->second, + std::memory_order_relaxed); + } else { + idx_cnt_vec_[index_iterator->second]->fetch_add(ttl_iter->second, + std::memory_order_relaxed); + } + } + if (key_cnt.size() > 0) { + if (!bloom_filter_vec_[idx].Valid(last_pk.c_str())) { + bloom_filter_vec_[idx].Set(last_pk.c_str()); + pk_cnt_vec_[idx]->fetch_add(1, std::memory_order_relaxed); + } + } delete_key_map.clear(); key_cnt.clear(); key_cnt.insert(std::make_pair(ts_idx, 1)); @@ -403,12 +514,32 @@ void DiskTable::GcHead() { PDLOG(WARNING, "Delete failed. tid %u pid %u msg %s", id_, pid_, s.ToString().c_str()); } } + for (auto ts_idx_iter = key_cnt.begin(); ts_idx_iter != key_cnt.end(); ts_idx_iter++) { + auto index_iterator = idx_map.find(ts_idx_iter->first); + auto ttl_iter = ttl_map.find(ts_idx_iter->first); + if (ttl_iter != ttl_map.end() && ttl_iter->second > 0 && ts_idx_iter->second <= ttl_iter->second) { + idx_cnt_vec_[index_iterator->second]->fetch_add(ts_idx_iter->second, std::memory_order_relaxed); + } else { + idx_cnt_vec_[index_iterator->second]->fetch_add(ttl_iter->second, std::memory_order_relaxed); + } + } + if (key_cnt.size() > 0) { + if (!bloom_filter_vec_[idx].Valid(last_pk.c_str())) { + bloom_filter_vec_[idx].Set(last_pk.c_str()); + pk_cnt_vec_[idx]->fetch_add(1, std::memory_order_relaxed); + } + } } else { auto index = indexs.front(); + uint32_t index_id = index->GetId(); auto ttl_num = index->GetTTL()->lat_ttl; if (ttl_num < 1) { continue; } + auto abs_ttl_num = index->GetTTL()->abs_ttl; + if (abs_ttl_num > 0) { + continue; + } std::string last_pk; uint64_t count = 0; while (it->Valid()) { @@ -431,11 +562,17 @@ void DiskTable::GcHead() { it->Seek(rocksdb::Slice(combine_key2)); } } else { + idx_cnt_vec_[index_id]->fetch_add(count); count = 1; last_pk = cur_pk; it->Next(); + if (!bloom_filter_vec_[idx].Valid(cur_pk.c_str())) { + bloom_filter_vec_[idx].Set(cur_pk.c_str()); + pk_cnt_vec_[idx]->fetch_add(1, std::memory_order_relaxed); + } } } + idx_cnt_vec_[index_id]->fetch_add(count); } delete it; db_->ReleaseSnapshot(snapshot); @@ -444,6 +581,16 @@ void DiskTable::GcHead() { PDLOG(INFO, "Gc used %lu second. tid %u pid %u", time_used / 1000, id_, pid_); } +void DiskTable::GcTTL() { + auto inner_indexs = table_index_.GetAllInnerIndex(); + for (uint32_t i = 0; i < inner_indexs->size(); i++) { + auto s = db_->CompactRange(rocksdb::CompactRangeOptions(), cf_hs_[i + 1], nullptr, nullptr); + if (!s.ok()) { + PDLOG(WARNING, "Manual Compaction failed"); + } + } +} + void DiskTable::GcTTLOrHead() {} void DiskTable::GcTTLAndHead() {} @@ -994,8 +1141,8 @@ DiskTableRowIterator::DiskTableRowIterator(rocksdb::DB* db, rocksdb::Iterator* i expire_value_(expire_time, expire_cnt, ttl_type), pk_(pk), row_pk_(pk), - ts_(ts), has_ts_idx_(has_ts_idx), + ts_(ts), ts_idx_(ts_idx), row_() {} @@ -1109,18 +1256,36 @@ bool DiskTable::DeleteIndex(const std::string& idx_name) { } uint64_t DiskTable::GetRecordIdxCnt() { - // TODO(litongxin) - return 0; + auto inner_indexs = table_index_.GetAllInnerIndex(); + uint64_t count = 0; + for (const auto& inner_index : *inner_indexs) { + count += idx_cnt_vec_[inner_index->GetIndex().front()->GetId()]->load(std::memory_order_relaxed); + } + return count; } bool DiskTable::GetRecordIdxCnt(uint32_t idx, uint64_t** stat, uint32_t* size) { - // TODO(litongxin) + if (stat == NULL) { + return false; + } + std::shared_ptr index_def = table_index_.GetIndex(idx); + if (!index_def || !index_def->IsReady()) { + return false; + } + auto* data_array = new uint64_t[1]; + data_array[0] = idx_cnt_vec_[idx]->load(std::memory_order_relaxed); + *stat = data_array; + *size = 1; return true; } uint64_t DiskTable::GetRecordPkCnt() { - // TODO(litongxin) - return 0; + auto inner_indexs = table_index_.GetAllInnerIndex(); + uint64_t count = 0; + for (uint32_t i = 0; i < inner_indexs->size(); i++) { + count += pk_cnt_vec_[i]->load(std::memory_order_relaxed); + } + return count; } uint64_t DiskTable::GetRecordIdxByteSize() { @@ -1176,9 +1341,57 @@ int DiskTable::GetCount(uint32_t index, const std::string& pk, uint64_t& count) break; } } - return 0; } +uint32_t BloomFilter::Hash(const char* str, uint32_t seed) { + uint a = 63689; + uint hash = 0; + + while (*str) { + hash = hash * a + (*str++); + a *= seed; + } + + return (hash & 0x7FFFFFFF); +} + +void BloomFilter::SetBit(uint32_t bit) { + uint32_t bits_num = bit / 64; + uint32_t bits_left = bit % 64; + + bits_[bits_num]->fetch_or((uint64_t)1 << bits_left, std::memory_order_relaxed); +} + +bool BloomFilter::GetBit(uint32_t bit) { + uint32_t bits_num = bit / 64; + uint32_t bits_left = bit % 64; + + return (bits_[bits_num]->load(std::memory_order_relaxed) >> bits_left) & 1; +} + +void BloomFilter::Set(const char* str) { + for (uint32_t i = 0; i < FLAGS_disk_stat_bloom_filter_hash_seed; ++i) { + uint32_t p = Hash(str, base_[i]) % FLAGS_disk_stat_bloom_filter_bitset_size; + SetBit(p); + } +} + +bool BloomFilter::Valid(const char* str) { + for (uint32_t i = 0; i < FLAGS_disk_stat_bloom_filter_hash_seed; ++i) { + uint32_t p = Hash(str, base_[i]) % FLAGS_disk_stat_bloom_filter_bitset_size; + if (!GetBit(p)) { + return false; + } + } + return true; +} + +void BloomFilter::Reset() { + for (uint32_t i = 0; i < bits_.size(); i++) { + bits_[i]->store(0, std::memory_order_relaxed); + } +} + } // namespace storage } // namespace openmldb diff --git a/src/storage/disk_table.h b/src/storage/disk_table.h index 2cab939a1e4..e5f6599dabd 100644 --- a/src/storage/disk_table.h +++ b/src/storage/disk_table.h @@ -39,6 +39,10 @@ #include "rocksdb/utilities/checkpoint.h" #include "storage/iterator.h" #include "storage/table.h" +#include "base/glog_wapper.h" // NOLINT + +DECLARE_uint32(disk_stat_bloom_filter_bitset_size); +DECLARE_uint32(disk_stat_bloom_filter_hash_seed); namespace openmldb { namespace storage { @@ -126,6 +130,28 @@ class KeyTSComparator : public rocksdb::Comparator { void FindShortSuccessor(std::string* /*key*/) const override {} }; +class BloomFilter { + public: + BloomFilter() { + for (uint32_t i = 0; i < FLAGS_disk_stat_bloom_filter_bitset_size; i++) { + bits_.push_back(std::make_shared>(0)); + } + } + virtual ~BloomFilter() {} + + void Set(const char* str); + bool Valid(const char* str); + void Reset(); + + private: + uint32_t Hash(const char* str, uint32_t seed); + void SetBit(uint32_t bit); + bool GetBit(uint32_t bit); + + std::vector>> bits_; + uint32_t base_[7] = {5, 7, 11, 13, 31, 37, 61}; +}; + class KeyTsPrefixTransform : public rocksdb::SliceTransform { public: const char* Name() const override { return "KeyTsPrefixTransform"; } @@ -198,17 +224,109 @@ class AbsoluteTTLCompactionFilter : public rocksdb::CompactionFilter { std::shared_ptr inner_index_; }; +class AbsoluteTTLAndCountCompactionFilter : public rocksdb::CompactionFilter { + public: + explicit AbsoluteTTLAndCountCompactionFilter(std::shared_ptr inner_index, + std::vector>>* idx_cnt_vec, + std::shared_ptr> pk_cnt, + BloomFilter* bloom_filter) + : inner_index_(inner_index), + idx_cnt_vec_(idx_cnt_vec), + pk_cnt_(pk_cnt), + bloom_filter_(bloom_filter) {} + virtual ~AbsoluteTTLAndCountCompactionFilter() {} + + const char* Name() const override { return "AbsoluteTTLAndCountCompactionFilter"; } + + bool Filter(int /*level*/, const rocksdb::Slice& key, const rocksdb::Slice& /*existing_value*/, + std::string* /*new_value*/, bool* /*value_changed*/) const override { + if (key.size() < TS_LEN) { + return false; + } + uint64_t real_ttl = 0; + const auto& indexs = inner_index_->GetIndex(); + uint32_t idx; + if (indexs.size() > 1) { + if (key.size() < TS_LEN + TS_POS_LEN) { + return false; + } + uint32_t ts_idx = *((uint32_t*)(key.data() + key.size() - TS_LEN - // NOLINT + TS_POS_LEN)); + bool has_found = false; + for (const auto index : indexs) { + auto ts_col = index->GetTsColumn(); + if (!ts_col) { + return false; + } + if (ts_col->GetId() == ts_idx) { + if (index->GetTTLType() != ::openmldb::storage::TTLType::kAbsoluteTime) { + return false; + } + real_ttl = index->GetTTL()->abs_ttl; + idx = index->GetId(); + has_found = true; + break; + } + } + if (!has_found) { + return false; + } + } else { + real_ttl = indexs.front()->GetTTL()->abs_ttl; + idx = indexs.front()->GetId(); + } + if (real_ttl < 1) { + return false; + } + uint64_t ts = 0; + memcpy(static_cast(&ts), key.data() + key.size() - TS_LEN, TS_LEN); + memrev64ifbe(static_cast(&ts)); + uint64_t cur_time = ::baidu::common::timer::get_micros() / 1000; + if (ts < cur_time - real_ttl) { + return true; + } + idx_cnt_vec_->at(idx)->fetch_add(1, std::memory_order_relaxed); + std::string pk; + if (indexs.size() > 1) { + pk.assign(key.data(), key.size() - TS_LEN - TS_POS_LEN); + } else { + pk.assign(key.data(), key.size() - TS_LEN); + } + if (!bloom_filter_->Valid(pk.c_str())) { + bloom_filter_->Set(pk.c_str()); + pk_cnt_->fetch_add(1, std::memory_order_relaxed); + } + return false; + } + + private: + std::shared_ptr inner_index_; + std::vector>>* idx_cnt_vec_; + std::shared_ptr> pk_cnt_; + BloomFilter* bloom_filter_; +}; + class AbsoluteTTLFilterFactory : public rocksdb::CompactionFilterFactory { public: - explicit AbsoluteTTLFilterFactory(const std::shared_ptr& inner_index) : inner_index_(inner_index) {} + explicit AbsoluteTTLFilterFactory(const std::shared_ptr& inner_index, + std::vector>>* idx_cnt_vec, + std::shared_ptr> pk_cnt, BloomFilter* bloom_filter) + : inner_index_(inner_index), idx_cnt_vec_(idx_cnt_vec), pk_cnt_(pk_cnt), bloom_filter_(bloom_filter) {} std::unique_ptr CreateCompactionFilter( const rocksdb::CompactionFilter::Context& context) override { + if (context.is_manual_compaction) { + return std::unique_ptr( + new AbsoluteTTLAndCountCompactionFilter(inner_index_, idx_cnt_vec_, pk_cnt_, bloom_filter_)); + } return std::unique_ptr(new AbsoluteTTLCompactionFilter(inner_index_)); } const char* Name() const override { return "AbsoluteTTLFilterFactory"; } private: std::shared_ptr inner_index_; + std::vector>>* idx_cnt_vec_; + std::shared_ptr> pk_cnt_; + BloomFilter* bloom_filter_; }; class DiskTableIterator : public TableIterator { @@ -402,7 +520,9 @@ class DiskTable : public Table { void SchedGc() override; + void ResetRecordCnt(); void GcHead(); + void GcTTL(); void GcTTLAndHead(); void GcTTLOrHead(); @@ -434,6 +554,9 @@ class DiskTable : public Table { KeyTSComparator cmp_; std::atomic offset_; std::string table_path_; + std::vector>> idx_cnt_vec_; + std::vector>> pk_cnt_vec_; + std::vector bloom_filter_vec_; }; } // namespace storage diff --git a/src/storage/disk_table_test.cc b/src/storage/disk_table_test.cc index 83c83ef5bbe..e670b79e419 100644 --- a/src/storage/disk_table_test.cc +++ b/src/storage/disk_table_test.cc @@ -120,10 +120,8 @@ TEST_F(DiskTableTest, MultiDimensionPut) { ::openmldb::common::StorageMode::kHDD, table_path); ASSERT_TRUE(table->Init()); ASSERT_EQ(3, (int64_t)table->GetIdxCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - // ASSERT_EQ(0, table->GetRecordIdxCnt()); - // ASSERT_EQ(0, table->GetRecordCnt()); + ASSERT_EQ(0, table->GetRecordIdxCnt()); + ASSERT_EQ(0, table->GetRecordCnt()); auto meta = ::openmldb::test::GetTableMeta({"idx0", "idx1", "idx2"}); ::openmldb::codec::SDKCodec sdk_codec(meta); @@ -143,9 +141,7 @@ TEST_F(DiskTableTest, MultiDimensionPut) { ASSERT_EQ(0, sdk_codec.EncodeRow(row, &value)); bool ok = table->Put(1, value, dimensions); ASSERT_TRUE(ok); - // some functions in disk table need to be implemented. - // refer to issue #1238 - // ASSERT_EQ(3, table->GetRecordIdxCnt()); + ASSERT_EQ(3, table->GetRecordIdxCnt()); Ticket ticket; TableIterator* it = table->NewIterator(0, "yjdim0", ticket); diff --git a/src/storage/table_test.cc b/src/storage/table_test.cc index 07e9440fc21..e9fb9dbaed1 100644 --- a/src/storage/table_test.cc +++ b/src/storage/table_test.cc @@ -200,11 +200,7 @@ TEST_P(TableTest, MultiDimissionPut0) { sdk_codec.EncodeRow({"d0", "d1", "d2"}, &result); bool ok = table->Put(1, result, dimensions); ASSERT_TRUE(ok); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(3, (int64_t)table->GetRecordIdxCnt()); - } + ASSERT_EQ(3, (int64_t)table->GetRecordIdxCnt()); ASSERT_EQ(1, (int64_t)table->GetRecordCnt()); delete table; } @@ -326,11 +322,6 @@ TEST_P(TableTest, Iterator_GetSize) { TEST_P(TableTest, SchedGcHead) { ::openmldb::common::StorageMode storageMode = GetParam(); - // some functions with disktable mode in this test have not been implemented. - // refer to issue #1238 - if (storageMode == openmldb::common::kHDD) { - return; - } std::map mapping; mapping.insert(std::make_pair("idx0", 0)); std::string table_path = ""; @@ -349,12 +340,8 @@ TEST_P(TableTest, SchedGcHead) { value = ::openmldb::test::EncodeKV("test", "test2"); table->Put("test", 1, value.data(), value.size()); ASSERT_EQ(2, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(1, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(1, (int64_t)table->GetRecordPkCnt()); table->SchedGc(); { ::openmldb::api::LogEntry entry; @@ -380,7 +367,9 @@ TEST_P(TableTest, SchedGcHead) { ASSERT_FALSE(table->IsExpire(entry)); } } - ASSERT_EQ(1, (int64_t)table->GetRecordCnt()); + if (storageMode == ::openmldb::common::StorageMode::kMemory) { + ASSERT_EQ(1, (int64_t)table->GetRecordCnt()); + } ASSERT_EQ(1, (int64_t)table->GetRecordIdxCnt()); ASSERT_EQ(bytes, table->GetRecordByteSize()); ASSERT_EQ(record_idx_bytes, table->GetRecordIdxByteSize()); @@ -436,11 +425,6 @@ TEST_P(TableTest, SchedGcHead1) { TEST_P(TableTest, SchedGc) { ::openmldb::common::StorageMode storageMode = GetParam(); - // some functions with disktable mode in this test have not been implemented. - // refer to issue #1238 - if (storageMode == openmldb::common::kHDD) { - return; - } std::map mapping; mapping.insert(std::make_pair("idx0", 0)); std::string table_path = ""; @@ -459,17 +443,13 @@ TEST_P(TableTest, SchedGc) { uint64_t record_idx_bytes = table->GetRecordIdxByteSize(); table->Put("test", 9527, "test", 4); ASSERT_EQ(2, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(1, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(1, (int64_t)table->GetRecordPkCnt()); table->SchedGc(); - ASSERT_EQ(1, (int64_t)table->GetRecordCnt()); - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(1, (int64_t)table->GetRecordIdxCnt()); + if (storageMode == ::openmldb::common::kMemory) { + ASSERT_EQ(1, (int64_t)table->GetRecordCnt()); } + ASSERT_EQ(1, (int64_t)table->GetRecordIdxCnt()); ASSERT_EQ(bytes, table->GetRecordByteSize()); ASSERT_EQ(record_idx_bytes, table->GetRecordIdxByteSize()); @@ -487,11 +467,6 @@ TEST_P(TableTest, SchedGc) { TEST_P(TableTest, TableDataCnt) { ::openmldb::common::StorageMode storageMode = GetParam(); - // some functions with disktable mode in this test have not been implemented. - // refer to issue #1238 - if (storageMode == openmldb::common::kHDD) { - return; - } std::map mapping; mapping.insert(std::make_pair("idx0", 0)); std::string table_path = ""; @@ -508,11 +483,7 @@ TEST_P(TableTest, TableDataCnt) { table->Put("test", 9527, "test", 4); table->Put("test", now, "tes2", 4); ASSERT_EQ((int64_t)table->GetRecordCnt(), 2); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ((int64_t)table->GetRecordIdxCnt(), 2); - } + ASSERT_EQ((int64_t)table->GetRecordIdxCnt(), 2); table->SchedGc(); { ::openmldb::api::LogEntry entry; @@ -1223,11 +1194,11 @@ TEST_P(TableTest, AbsOrLatSetGet) { TEST_P(TableTest, GcAbsOrLat) { ::openmldb::common::StorageMode storageMode = GetParam(); - // some functions with disktable mode in this test have not been implemented. - // refer to issue #1238 - if (storageMode == openmldb::common::kHDD) { - return; + // RecordIdxCnt in disktable only support abs TTL and lat TTL + if (storageMode != openmldb::common::kMemory) { + GTEST_SKIP(); } + ::openmldb::api::TableMeta table_meta; table_meta.set_name("table1"); std::string table_path = ""; @@ -1260,44 +1231,30 @@ TEST_P(TableTest, GcAbsOrLat) { table->Put("test2", now - 2 * (60 * 1000) - 1000, "value5", 6); table->Put("test2", now - 1 * (60 * 1000) - 1000, "value6", 6); ASSERT_EQ(7, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(7, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(7, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); ::openmldb::storage::UpdateTTLMeta update_ttl( ::openmldb::storage::TTLSt(3 * 60 * 1000, 0, ::openmldb::storage::kAbsOrLat)); table->SetTTL(update_ttl); table->SchedGc(); - ASSERT_EQ(5, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(5, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); + if (storageMode == openmldb::common::kMemory) { + ASSERT_EQ(5, (int64_t)table->GetRecordCnt()); } + ASSERT_EQ(5, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); update_ttl = ::openmldb::storage::UpdateTTLMeta(::openmldb::storage::TTLSt(0, 1, ::openmldb::storage::kAbsOrLat)); table->SetTTL(update_ttl); table->SchedGc(); ASSERT_EQ(4, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(4, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(4, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); update_ttl = ::openmldb::storage::UpdateTTLMeta( ::openmldb::storage::TTLSt(1 * 60 * 1000, 1, ::openmldb::storage::kAbsOrLat)); table->SetTTL(update_ttl); table->SchedGc(); ASSERT_EQ(2, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); { ::openmldb::api::LogEntry entry; entry.set_log_index(0); @@ -1348,12 +1305,8 @@ TEST_P(TableTest, GcAbsOrLat) { } table->SchedGc(); ASSERT_EQ(0, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(0, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(0, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); { ::openmldb::api::LogEntry entry; entry.set_log_index(0); @@ -1374,11 +1327,11 @@ TEST_P(TableTest, GcAbsOrLat) { TEST_P(TableTest, GcAbsAndLat) { ::openmldb::common::StorageMode storageMode = GetParam(); - // some functions with disktable mode in this test have not been implemented. - // refer to issue #1238 - if (storageMode == openmldb::common::kHDD) { - return; + // RecordIdxCnt in disktable only support abs TTL and lat TTL + if (storageMode != openmldb::common::kMemory) { + GTEST_SKIP(); } + ::openmldb::api::TableMeta table_meta; table_meta.set_name("table1"); std::string table_path = ""; @@ -1411,23 +1364,15 @@ TEST_P(TableTest, GcAbsAndLat) { table->Put("test2", now - 3 * (60 * 1000) - 1000, "value5", 6); table->Put("test2", now - 2 * (60 * 1000) - 1000, "value6", 6); ASSERT_EQ(7, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(7, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(7, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); ::openmldb::storage::UpdateTTLMeta update_ttl( ::openmldb::storage::TTLSt(1 * 60 * 1000, 0, ::openmldb::storage::kAbsAndLat)); table->SetTTL(update_ttl); table->SchedGc(); ASSERT_EQ(6, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(6, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(6, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); { ::openmldb::api::LogEntry entry; entry.set_log_index(0); @@ -1468,31 +1413,19 @@ TEST_P(TableTest, GcAbsAndLat) { table->SetTTL(update_ttl); table->SchedGc(); ASSERT_EQ(6, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(6, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(6, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); update_ttl = ::openmldb::storage::UpdateTTLMeta( ::openmldb::storage::TTLSt(1 * 60 * 1000, 1, ::openmldb::storage::kAbsAndLat)); table->SetTTL(update_ttl); table->SchedGc(); ASSERT_EQ(6, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(6, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(6, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); table->SchedGc(); ASSERT_EQ(2, (int64_t)table->GetRecordCnt()); - // some functions in disk table need to be implemented. - // refer to issue #1238 - if (storageMode == ::openmldb::common::StorageMode::kMemory) { - ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); - ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); - } + ASSERT_EQ(2, (int64_t)table->GetRecordIdxCnt()); + ASSERT_EQ(2, (int64_t)table->GetRecordPkCnt()); { ::openmldb::api::LogEntry entry; entry.set_log_index(0); @@ -1932,6 +1865,363 @@ TEST_P(TableTest, AbsAndLat) { delete table; } +TEST_F(TableTest, GetRecordAbsTTL) { + ::openmldb::common::StorageMode storageMode = openmldb::common::StorageMode::kHDD; + ::openmldb::api::TableMeta table_meta; + table_meta.set_name("table1"); + std::string table_path = ""; + int id = 1; + if (storageMode == ::openmldb::common::kHDD) { + id = ++counter; + table_path = GetDBPath(FLAGS_hdd_root_path, id, 1); + } + table_meta.set_tid(id); + table_meta.set_pid(1); + table_meta.set_seg_cnt(1); + table_meta.set_mode(::openmldb::api::TableMode::kTableLeader); + table_meta.set_key_entry_max_height(8); + table_meta.set_storage_mode(storageMode); + table_meta.set_format_version(1); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "test", ::openmldb::type::kString); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "testnew", ::openmldb::type::kString); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts1", ::openmldb::type::kBigInt); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts2", ::openmldb::type::kBigInt); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts3", ::openmldb::type::kBigInt); + + SchemaCodec::SetIndex(table_meta.add_column_key(), "index0", "test", "ts1", ::openmldb::type::kAbsoluteTime, 90, 0); + SchemaCodec::SetIndex(table_meta.add_column_key(), "index1", "testnew", "ts2", ::openmldb::type::kAbsoluteTime, 50, + 0); + SchemaCodec::SetIndex(table_meta.add_column_key(), "index2", "testnew", "ts3", ::openmldb::type::kAbsoluteTime, 40, + 0); + + Table* table = CreateTable(table_meta, table_path); + table->Init(); + codec::SDKCodec codec(table_meta); + uint64_t now = ::baidu::common::timer::get_micros() / 1000; + + for (int i = 0; i < 100; i++) { + uint64_t ts = now - (99 - i) * 60 * 1000; + std::string ts_str = std::to_string(ts); + + std::vector row = {"test"+ std::to_string(i / 10), + "testnew"+ std::to_string(i / 10), + ts_str, + ts_str, + ts_str}; + ::openmldb::api::PutRequest request; + ::openmldb::api::Dimension* dim = request.add_dimensions(); + dim->set_idx(0); + dim->set_key(row[0]); + ::openmldb::api::Dimension* dim1 = request.add_dimensions(); + dim1->set_idx(1); + dim1->set_key(row[1]); + ::openmldb::api::Dimension* dim2 = request.add_dimensions(); + dim2->set_idx(2); + dim2->set_key(row[1]); + std::string value; + ASSERT_EQ(0, codec.EncodeRow(row, &value)); + table->Put(0, value, request.dimensions()); + } + + for (int i = 0; i <= 2; i++) { + TableIterator* it = table->NewTraverseIterator(i); + it->SeekToFirst(); + int count = 0; + while (it->Valid()) { + it->Next(); + count++; + } + + if (i == 0) { + EXPECT_EQ(90, count); + } else if (i == 1) { + EXPECT_EQ(50, count); + } else if (i == 2) { + EXPECT_EQ(40, count); + } + } + + + EXPECT_EQ(200, (int64_t)table->GetRecordIdxCnt()); + EXPECT_EQ(20, (int64_t)table->GetRecordPkCnt()); + + uint64_t* stats = NULL; + uint32_t size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(1, &stats, &size)); + int ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(100, ts_count); + + stats = NULL; + size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(2, &stats, &size)); + ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(100, ts_count); + + table->SchedGc(); + + EXPECT_EQ(140, (int64_t)table->GetRecordIdxCnt()); + EXPECT_EQ(14, (int64_t)table->GetRecordPkCnt()); + + stats = NULL; + size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(1, &stats, &size)); + ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(50, ts_count); + + stats = NULL; + size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(2, &stats, &size)); + ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(40, ts_count); + + delete table; +} + +TEST_F(TableTest, GetRecordLatTTL) { + ::openmldb::common::StorageMode storageMode = openmldb::common::StorageMode::kHDD; + ::openmldb::api::TableMeta table_meta; + table_meta.set_name("table1"); + std::string table_path = ""; + int id = 1; + if (storageMode == ::openmldb::common::kHDD) { + id = ++counter; + table_path = GetDBPath(FLAGS_hdd_root_path, id, 1); + } + table_meta.set_tid(id); + table_meta.set_pid(1); + table_meta.set_seg_cnt(1); + table_meta.set_mode(::openmldb::api::TableMode::kTableLeader); + table_meta.set_key_entry_max_height(8); + table_meta.set_storage_mode(storageMode); + table_meta.set_format_version(1); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "test", ::openmldb::type::kString); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "testnew", ::openmldb::type::kString); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts1", ::openmldb::type::kBigInt); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts2", ::openmldb::type::kBigInt); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts3", ::openmldb::type::kBigInt); + + SchemaCodec::SetIndex(table_meta.add_column_key(), "index0", "test", "ts1", ::openmldb::type::kLatestTime, 0, 7); + SchemaCodec::SetIndex(table_meta.add_column_key(), "index1", "testnew", "ts2", ::openmldb::type::kLatestTime, 0, 5); + SchemaCodec::SetIndex(table_meta.add_column_key(), "index2", "testnew", "ts3", ::openmldb::type::kLatestTime, 0, 4); + + + Table* table = CreateTable(table_meta, table_path); + table->Init(); + codec::SDKCodec codec(table_meta); + uint64_t now = ::baidu::common::timer::get_micros() / 1000; + + for (int i = 0; i < 100; i++) { + uint64_t ts = now - (99 - i) * 60 * 1000; + std::string ts_str = std::to_string(ts); + + std::vector row = {"test"+ std::to_string(i % 10), + "testnew"+ std::to_string(i % 10), + ts_str, + ts_str, + ts_str}; + ::openmldb::api::PutRequest request; + ::openmldb::api::Dimension* dim = request.add_dimensions(); + dim->set_idx(0); + dim->set_key(row[0]); + ::openmldb::api::Dimension* dim1 = request.add_dimensions(); + dim1->set_idx(1); + dim1->set_key(row[1]); + ::openmldb::api::Dimension* dim2 = request.add_dimensions(); + dim2->set_idx(2); + dim2->set_key(row[1]); + std::string value; + ASSERT_EQ(0, codec.EncodeRow(row, &value)); + table->Put(0, value, request.dimensions()); + } + + for (int i = 0; i <= 2; i++) { + TableIterator* it = table->NewTraverseIterator(i); + it->SeekToFirst(); + int count = 0; + while (it->Valid()) { + it->Next(); + count++; + } + + if (i == 0) { + EXPECT_EQ(70, count); + } else if (i == 1) { + EXPECT_EQ(50, count); + } else if (i == 2) { + EXPECT_EQ(40, count); + } + } + + + EXPECT_EQ(20, (int64_t)table->GetRecordPkCnt()); + EXPECT_EQ(200, (int64_t)table->GetRecordIdxCnt()); + + table->SchedGc(); + EXPECT_EQ(20, (int64_t)table->GetRecordPkCnt()); + EXPECT_EQ(120, (int64_t)table->GetRecordIdxCnt()); + + uint64_t* stats = NULL; + uint32_t size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(1, &stats, &size)); + int ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(50, ts_count); + + stats = NULL; + size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(2, &stats, &size)); + ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(40, ts_count); + + stats = NULL; + size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(0, &stats, &size)); + ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(70, ts_count); + + table->Delete("test0", 0); + table->Delete("testnew0", 1); + EXPECT_EQ(18, (int64_t)table->GetRecordPkCnt()); + EXPECT_EQ(108, (int64_t)table->GetRecordIdxCnt()); + + + delete table; +} + +TEST_F(TableTest, GetRecordAbsAndLatTTL) { + ::openmldb::common::StorageMode storageMode = openmldb::common::StorageMode::kHDD; + ::openmldb::api::TableMeta table_meta; + table_meta.set_name("table1"); + std::string table_path = ""; + int id = 1; + if (storageMode == ::openmldb::common::kHDD) { + id = ++counter; + table_path = GetDBPath(FLAGS_hdd_root_path, id, 1); + } + table_meta.set_tid(id); + table_meta.set_pid(1); + table_meta.set_seg_cnt(1); + table_meta.set_mode(::openmldb::api::TableMode::kTableLeader); + table_meta.set_key_entry_max_height(8); + table_meta.set_storage_mode(storageMode); + table_meta.set_format_version(1); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "test", ::openmldb::type::kString); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "testnew", ::openmldb::type::kString); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts1", ::openmldb::type::kBigInt); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts2", ::openmldb::type::kBigInt); + SchemaCodec::SetColumnDesc(table_meta.add_column_desc(), "ts3", ::openmldb::type::kBigInt); + + SchemaCodec::SetIndex(table_meta.add_column_key(), "index0", "test", "ts1", ::openmldb::type::kLatestTime, 0, 7); + SchemaCodec::SetIndex(table_meta.add_column_key(), "index1", "testnew", "ts2", ::openmldb::type::kAbsoluteTime, 50, + 0); + SchemaCodec::SetIndex(table_meta.add_column_key(), "index2", "testnew", "ts3", ::openmldb::type::kLatestTime, 0, 4); + + + Table* table = CreateTable(table_meta, table_path); + table->Init(); + codec::SDKCodec codec(table_meta); + uint64_t now = ::baidu::common::timer::get_micros() / 1000; + + for (int i = 0; i < 100; i++) { + uint64_t ts = now - (99 - i) * 60 * 1000; + std::string ts_str = std::to_string(ts); + + std::vector row = {"test"+ std::to_string(i / 10), + "testnew"+ std::to_string(i / 10), + ts_str, + ts_str, + ts_str}; + ::openmldb::api::PutRequest request; + ::openmldb::api::Dimension* dim = request.add_dimensions(); + dim->set_idx(0); + dim->set_key(row[0]); + ::openmldb::api::Dimension* dim1 = request.add_dimensions(); + dim1->set_idx(1); + dim1->set_key(row[1]); + ::openmldb::api::Dimension* dim2 = request.add_dimensions(); + dim2->set_idx(2); + dim2->set_key(row[1]); + std::string value; + ASSERT_EQ(0, codec.EncodeRow(row, &value)); + table->Put(0, value, request.dimensions()); + } + + for (int i = 0; i <= 2; i++) { + TableIterator* it = table->NewTraverseIterator(i); + it->SeekToFirst(); + int count = 0; + while (it->Valid()) { + it->Next(); + count++; + } + + if (i == 0) { + EXPECT_EQ(70, count); + } else if (i == 1) { + EXPECT_EQ(50, count); + } else if (i == 2) { + EXPECT_EQ(40, count); + } + } + + + EXPECT_EQ(20, (int64_t)table->GetRecordPkCnt()); + EXPECT_EQ(200, (int64_t)table->GetRecordIdxCnt()); + + table->SchedGc(); + EXPECT_EQ(20, (int64_t)table->GetRecordPkCnt()); + EXPECT_EQ(120, (int64_t)table->GetRecordIdxCnt()); + + uint64_t* stats = NULL; + uint32_t size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(1, &stats, &size)); + int ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(50, ts_count); + + stats = NULL; + size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(2, &stats, &size)); + ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(40, ts_count); + + stats = NULL; + size = 0; + ASSERT_TRUE(table->GetRecordIdxCnt(0, &stats, &size)); + ts_count = 0; + for (uint32_t i = 0; i < size; i++) { + ts_count += stats[i]; + } + EXPECT_EQ(70, ts_count); + + delete table; +} + INSTANTIATE_TEST_CASE_P(TestMemAndHDD, TableTest, ::testing::Values(::openmldb::common::kMemory, ::openmldb::common::kHDD));