Skip to content

Commit bb6bc09

Browse files
authored
fix: fix gc coredump (#3561)
1 parent aa8e756 commit bb6bc09

File tree

2 files changed

+90
-19
lines changed

2 files changed

+90
-19
lines changed

src/storage/mem_table.cc

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,18 @@ bool MemTable::Put(uint64_t time, const std::string& value, const Dimensions& di
170170
PDLOG(WARNING, "invalid schema version %u, tid %u pid %u", version, id_, pid_);
171171
return false;
172172
}
173-
std::map<int32_t, uint64_t> ts_map;
173+
std::map<uint32_t, std::map<int32_t, uint64_t>> ts_value_map;
174174
for (const auto& kv : inner_index_key_map) {
175175
auto inner_index = table_index_.GetInnerIndex(kv.first);
176176
if (!inner_index) {
177177
PDLOG(WARNING, "invalid inner index pos %d. tid %u pid %u", kv.first, id_, pid_);
178178
return false;
179179
}
180+
std::map<int32_t, uint64_t> ts_map;
180181
for (const auto& index_def : inner_index->GetIndex()) {
182+
if (!index_def->IsReady()) {
183+
continue;
184+
}
181185
auto ts_col = index_def->GetTsColumn();
182186
if (ts_col) {
183187
int64_t ts = 0;
@@ -192,34 +196,28 @@ bool MemTable::Put(uint64_t time, const std::string& value, const Dimensions& di
192196
return false;
193197
}
194198
ts_map.emplace(ts_col->GetId(), ts);
195-
}
196-
if (index_def->IsReady()) {
197199
real_ref_cnt++;
198200
}
199201
}
202+
if (!ts_map.empty()) {
203+
ts_value_map.emplace(kv.first, std::move(ts_map));
204+
}
200205
}
201-
if (ts_map.empty()) {
206+
if (ts_value_map.empty()) {
202207
return false;
203208
}
204209
auto* block = new DataBlock(real_ref_cnt, value.c_str(), value.length());
205210
for (const auto& kv : inner_index_key_map) {
206-
auto inner_index = table_index_.GetInnerIndex(kv.first);
207-
bool need_put = false;
208-
for (const auto& index_def : inner_index->GetIndex()) {
209-
if (index_def->IsReady()) {
210-
// TODO(hw): if we don't find this ts(has_found_ts==false), but it's ready, will put too?
211-
need_put = true;
212-
break;
213-
}
211+
auto iter = ts_value_map.find(kv.first);
212+
if (iter == ts_value_map.end()) {
213+
continue;
214214
}
215-
if (need_put) {
216-
uint32_t seg_idx = 0;
217-
if (seg_cnt_ > 1) {
218-
seg_idx = ::openmldb::base::hash(kv.second.data(), kv.second.size(), SEED) % seg_cnt_;
219-
}
220-
Segment* segment = segments_[kv.first][seg_idx];
221-
segment->Put(::openmldb::base::Slice(kv.second), ts_map, block);
215+
uint32_t seg_idx = 0;
216+
if (seg_cnt_ > 1) {
217+
seg_idx = ::openmldb::base::hash(kv.second.data(), kv.second.size(), SEED) % seg_cnt_;
222218
}
219+
Segment* segment = segments_[kv.first][seg_idx];
220+
segment->Put(::openmldb::base::Slice(kv.second), iter->second, block);
223221
}
224222
record_byte_size_.fetch_add(GetRecordSize(value.length()));
225223
return true;

src/storage/snapshot_test.cc

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,79 @@ TEST_F(SnapshotTest, Recover_only_snapshot) {
718718
ASSERT_FALSE(it->Valid());
719719
}
720720

721+
TEST_F(SnapshotTest, RecoverWithDeleteIndex) {
722+
uint32_t tid = 12;
723+
uint32_t pid = 0;
724+
::openmldb::api::TableMeta meta;
725+
meta.set_tid(tid);
726+
meta.set_pid(pid);
727+
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "userid", ::openmldb::type::kString);
728+
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "ts1", ::openmldb::type::kBigInt);
729+
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "ts2", ::openmldb::type::kBigInt);
730+
SchemaCodec::SetColumnDesc(meta.add_column_desc(), "val", ::openmldb::type::kString);
731+
SchemaCodec::SetIndex(meta.add_column_key(), "index1", "userid", "ts1", ::openmldb::type::kLatestTime, 0, 1);
732+
SchemaCodec::SetIndex(meta.add_column_key(), "index2", "userid", "ts2", ::openmldb::type::kLatestTime, 0, 1);
733+
734+
std::string snapshot_dir = absl::StrCat(FLAGS_db_root_path, "/", tid, "_", pid, "/snapshot");
735+
736+
::openmldb::base::MkdirRecur(snapshot_dir);
737+
std::string snapshot1 = "20231018.sdb";
738+
uint64_t offset = 0;
739+
{
740+
if (FLAGS_snapshot_compression != "off") {
741+
snapshot1.append(".");
742+
snapshot1.append(FLAGS_snapshot_compression);
743+
}
744+
std::string full_path = snapshot_dir + "/" + snapshot1;
745+
FILE* fd_w = fopen(full_path.c_str(), "ab+");
746+
ASSERT_TRUE(fd_w != NULL);
747+
::openmldb::log::WritableFile* wf = ::openmldb::log::NewWritableFile(snapshot1, fd_w);
748+
::openmldb::log::Writer writer(FLAGS_snapshot_compression, wf);
749+
::openmldb::codec::SDKCodec sdk_codec(meta);
750+
for (int i = 0; i < 5; i++) {
751+
uint32_t ts = 100 + i;
752+
for (int key_num = 0; key_num < 10; key_num++) {
753+
std::string userid = absl::StrCat("userid", key_num);
754+
std::string ts_str = std::to_string(ts);
755+
std::vector<std::string> row = {userid, ts_str, ts_str, "aa"};
756+
std::string result;
757+
sdk_codec.EncodeRow(row, &result);
758+
::openmldb::api::LogEntry entry;
759+
entry.set_log_index(offset++);
760+
entry.set_value(result);
761+
for (int k = 0; k < meta.column_key_size(); k++) {
762+
auto dimension = entry.add_dimensions();
763+
dimension->set_key(userid);
764+
dimension->set_idx(k);
765+
}
766+
entry.set_ts(ts);
767+
entry.set_term(1);
768+
std::string val;
769+
bool ok = entry.SerializeToString(&val);
770+
ASSERT_TRUE(ok);
771+
Slice sval(val.c_str(), val.size());
772+
::openmldb::log::Status status = writer.AddRecord(sval);
773+
ASSERT_TRUE(status.ok());
774+
}
775+
}
776+
writer.EndLog();
777+
}
778+
779+
auto index1 = meta.mutable_column_key(1);
780+
index1->set_flag(1);
781+
std::shared_ptr<MemTable> table = std::make_shared<MemTable>(meta);
782+
table->Init();
783+
LogParts* log_part = new LogParts(12, 4, scmp);
784+
MemTableSnapshot snapshot(tid, pid, log_part, FLAGS_db_root_path);
785+
ASSERT_TRUE(snapshot.Init());
786+
int ret = snapshot.GenManifest(snapshot1, 50, offset, 1);
787+
ASSERT_EQ(0, ret);
788+
uint64_t r_offset = 0;
789+
ASSERT_TRUE(snapshot.Recover(table, r_offset));
790+
ASSERT_EQ(r_offset, offset);
791+
table->SchedGc();
792+
}
793+
721794
TEST_F(SnapshotTest, MakeSnapshot) {
722795
LogParts* log_part = new LogParts(12, 4, scmp);
723796
MemTableSnapshot snapshot(1, 2, log_part, FLAGS_db_root_path);

0 commit comments

Comments
 (0)