Skip to content

Commit 5e0c87f

Browse files
authored
[improve](cloud) Report read/write conflict range (#59437)
1 parent c9b1819 commit 5e0c87f

File tree

2 files changed

+131
-1
lines changed

2 files changed

+131
-1
lines changed

cloud/src/meta-store/txn_kv.cpp

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -944,6 +944,94 @@ TxnErrorCode Transaction::get_conflicting_range(
944944
return TxnErrorCode::TXN_OK;
945945
}
946946

947+
TxnErrorCode Transaction::get_read_conflict_range(
948+
std::vector<std::pair<std::string, std::string>>* values) {
949+
constexpr std::string_view start = "\xff\xff/transaction/read_conflict_range/";
950+
constexpr std::string_view end = "\xff\xff/transaction/read_conflict_range/\xff";
951+
952+
int limit = 0;
953+
int target_bytes = 0;
954+
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
955+
int iteration = 0;
956+
fdb_bool_t snapshot = 0;
957+
fdb_bool_t reverse = 0;
958+
FDBFuture* future = fdb_transaction_get_range(
959+
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
960+
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
961+
target_bytes, mode, iteration, snapshot, reverse);
962+
963+
DORIS_CLOUD_DEFER {
964+
fdb_future_destroy(future);
965+
};
966+
967+
RETURN_IF_ERROR(await_future(future));
968+
969+
FDBKeyValue const* out_kvs;
970+
int out_kvs_count;
971+
fdb_bool_t out_more;
972+
do {
973+
fdb_error_t err =
974+
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
975+
if (err) {
976+
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
977+
<< fdb_get_error(err);
978+
return cast_as_txn_code(err);
979+
}
980+
for (int i = 0; i < out_kvs_count; i++) {
981+
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
982+
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
983+
key.remove_prefix(start.size());
984+
values->emplace_back(key, value);
985+
}
986+
} while (out_more);
987+
988+
return TxnErrorCode::TXN_OK;
989+
}
990+
991+
TxnErrorCode Transaction::get_write_conflict_range(
992+
std::vector<std::pair<std::string, std::string>>* values) {
993+
constexpr std::string_view start = "\xff\xff/transaction/write_conflict_range/";
994+
constexpr std::string_view end = "\xff\xff/transaction/write_conflict_range/\xff";
995+
996+
int limit = 0;
997+
int target_bytes = 0;
998+
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
999+
int iteration = 0;
1000+
fdb_bool_t snapshot = 0;
1001+
fdb_bool_t reverse = 0;
1002+
FDBFuture* future = fdb_transaction_get_range(
1003+
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
1004+
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
1005+
target_bytes, mode, iteration, snapshot, reverse);
1006+
1007+
DORIS_CLOUD_DEFER {
1008+
fdb_future_destroy(future);
1009+
};
1010+
1011+
RETURN_IF_ERROR(await_future(future));
1012+
1013+
FDBKeyValue const* out_kvs;
1014+
int out_kvs_count;
1015+
fdb_bool_t out_more;
1016+
do {
1017+
fdb_error_t err =
1018+
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
1019+
if (err) {
1020+
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
1021+
<< fdb_get_error(err);
1022+
return cast_as_txn_code(err);
1023+
}
1024+
for (int i = 0; i < out_kvs_count; i++) {
1025+
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
1026+
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
1027+
key.remove_prefix(start.size());
1028+
values->emplace_back(key, value);
1029+
}
1030+
} while (out_more);
1031+
1032+
return TxnErrorCode::TXN_OK;
1033+
}
1034+
9471035
TxnErrorCode Transaction::report_conflicting_range() {
9481036
if (!config::enable_logging_conflict_keys) {
9491037
return TxnErrorCode::TXN_OK;
@@ -969,7 +1057,45 @@ TxnErrorCode Transaction::report_conflicting_range() {
9691057
out += fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
9701058
}
9711059

972-
LOG(WARNING) << "conflicting key ranges: " << out;
1060+
key_values.clear();
1061+
RETURN_IF_ERROR(get_read_conflict_range(&key_values));
1062+
if (key_values.size() % 2 != 0) {
1063+
LOG(WARNING) << "the read conflict range is not well-formed, size=" << key_values.size();
1064+
return TxnErrorCode::TXN_INVALID_DATA;
1065+
}
1066+
std::string read_conflict_range_out;
1067+
for (size_t i = 0; i < key_values.size(); i += 2) {
1068+
std::string_view start = key_values[i].first;
1069+
std::string_view end = key_values[i + 1].first;
1070+
std::string_view conflict_count = key_values[i].second;
1071+
if (!read_conflict_range_out.empty()) {
1072+
read_conflict_range_out += ", ";
1073+
}
1074+
read_conflict_range_out +=
1075+
fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
1076+
}
1077+
1078+
key_values.clear();
1079+
RETURN_IF_ERROR(get_write_conflict_range(&key_values));
1080+
if (key_values.size() % 2 != 0) {
1081+
LOG(WARNING) << "the write conflict range is not well-formed, size=" << key_values.size();
1082+
return TxnErrorCode::TXN_INVALID_DATA;
1083+
}
1084+
std::string write_conflict_range_out;
1085+
for (size_t i = 0; i < key_values.size(); i += 2) {
1086+
std::string_view start = key_values[i].first;
1087+
std::string_view end = key_values[i + 1].first;
1088+
std::string_view conflict_count = key_values[i].second;
1089+
if (!write_conflict_range_out.empty()) {
1090+
write_conflict_range_out += ", ";
1091+
}
1092+
write_conflict_range_out +=
1093+
fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
1094+
}
1095+
1096+
LOG(WARNING) << "conflicting key ranges: " << out
1097+
<< ", read conflict range: " << read_conflict_range_out
1098+
<< ", write conflict range: " << write_conflict_range_out;
9731099

9741100
return TxnErrorCode::TXN_OK;
9751101
}

cloud/src/meta-store/txn_kv.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -841,6 +841,10 @@ class Transaction : public cloud::Transaction {
841841
// It only works when the report_conflicting_ranges option is enabled.
842842
TxnErrorCode get_conflicting_range(
843843
std::vector<std::pair<std::string, std::string>>* key_values);
844+
TxnErrorCode get_read_conflict_range(
845+
std::vector<std::pair<std::string, std::string>>* key_values);
846+
TxnErrorCode get_write_conflict_range(
847+
std::vector<std::pair<std::string, std::string>>* key_values);
844848
TxnErrorCode report_conflicting_range();
845849

846850
std::shared_ptr<Database> db_ {nullptr};

0 commit comments

Comments
 (0)