Skip to content

Commit d143abd

Browse files
committed
improve logic in epoch end
1 parent 28087c0 commit d143abd

File tree

3 files changed

+46
-9
lines changed

3 files changed

+46
-9
lines changed

LoggingEventProcessor.cpp

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -971,20 +971,26 @@ void verifyLoggingEvent()
971971
// exit all requesters
972972
// serve slower nodes 30 more minutes before officially switching epoch
973973
Logger::get()->info("Received END_EPOCH message. Serving 30 minutes and then closing BOB");
974-
SLEEP(1000ULL * gTimeToWaitEpochEnd); // 30 minutes
975-
gStopFlag.store(true);
976-
// the endTick tick is a virtual tick, we need to migrate its data to new keys:
977-
uint32_t endTick = lastQuorumTick + 1; // the system just "borrow" this tick index
978974

979-
db_rename("tick_log_range:" + std::to_string(endTick),
980-
"end_epoch:tick_log_range:"+std::to_string(gCurrentProcessingEpoch));
981-
db_rename("log_ranges:" + std::to_string(endTick),
982-
"end_epoch:log_ranges:"+std::to_string(gCurrentProcessingEpoch));
983975
std::string key = "end_epoch_tick:" + std::to_string(gCurrentProcessingEpoch);
976+
uint32_t endTick = lastQuorumTick + 1; // the system just "borrow" this tick index
984977
db_insert_u32(key, endTick);
978+
// copy log range struct
979+
db_copy("log_ranges:" + std::to_string(endTick), "end_epoch:log_ranges:"+std::to_string(gCurrentProcessingEpoch));
980+
// copy log range meta data per tick
981+
db_hcopy("tick_log_range:" + std::to_string(endTick), "end_epoch:tick_log_range:"+std::to_string(gCurrentProcessingEpoch));
985982
// end epoch tick is a virtual tick for logging, we set it back to lastQuorumTick
986983
db_update_field("db_status", "latest_event_tick", std::to_string(lastQuorumTick));
987984
db_insert_u32("verified_history:" + std::to_string(gCurrentProcessingEpoch), lastQuorumTick); // update historical tracker
985+
SLEEP(1000ULL * gTimeToWaitEpochEnd); // default: 30 minutes
986+
gStopFlag.store(true);
987+
// the endTick tick is a virtual tick, we need to migrate its data to new keys:
988+
// these operation are needed when it does seamless transition because in new epoch
989+
// the init tick will be the same as this end epoch tick
990+
db_rename("tick_log_range:" + std::to_string(endTick),
991+
"backup_end_epoch:tick_log_range:"+std::to_string(gCurrentProcessingEpoch));
992+
db_rename("log_ranges:" + std::to_string(endTick),
993+
"backup_end_epoch:log_ranges:"+std::to_string(gCurrentProcessingEpoch));
988994
}
989995
Logger::get()->info("verifyLoggingEvent stopping gracefully.");
990996
}

database/db.cpp

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1267,6 +1267,34 @@ bool db_rename(const std::string &key1, const std::string &key2) {
12671267
}
12681268
}
12691269

1270+
bool db_copy(const std::string &key1, const std::string &key2) {
1271+
if (!g_redis) return false;
1272+
try {
1273+
auto val = g_redis->get(key1);
1274+
if (!val) return false;
1275+
g_redis->set(key2, *val);
1276+
return true;
1277+
} catch (const sw::redis::Error &e) {
1278+
Logger::get()->error("Redis error in db_copy: {}\n", e.what());
1279+
return false;
1280+
}
1281+
}
1282+
1283+
bool db_hcopy(const std::string &key1, const std::string &key2) {
1284+
if (!g_redis) return false;
1285+
try {
1286+
std::unordered_map<std::string, std::string> fields;
1287+
g_redis->hgetall(key1, std::inserter(fields, fields.begin()));
1288+
if (!fields.empty()) {
1289+
g_redis->hmset(key2, fields.begin(), fields.end());
1290+
}
1291+
return true;
1292+
} catch (const sw::redis::Error &e) {
1293+
Logger::get()->error("Redis error in db_hcopy: {}\n", e.what());
1294+
return false;
1295+
}
1296+
}
1297+
12701298
bool db_insert_u32(const std::string key, uint32_t value) {
12711299
if (!g_redis) return false;
12721300
try {

database/db.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,4 +515,7 @@ bool db_move_logs_to_kvrocks_by_range(uint16_t epoch, long long fromLogId, long
515515
bool db_delete_transaction(std::string hash);
516516
bool db_delete_logs(uint16_t epoch, long long start, long long end);
517517

518-
bool db_get_endepoch_log_range_info(const uint16_t epoch, long long &start, long long &length, LogRangesPerTxInTick &lr);
518+
bool db_get_endepoch_log_range_info(const uint16_t epoch, long long &start, long long &length, LogRangesPerTxInTick &lr);
519+
520+
bool db_copy(const std::string &key1, const std::string &key2);
521+
bool db_hcopy(const std::string &key1, const std::string &key2);

0 commit comments

Comments
 (0)