Skip to content

Commit a10f565

Browse files
branch-4.0: [improve](cloud) Report read/write conflict range #59437 (#59577)
Cherry-picked from #59437 Co-authored-by: walter <[email protected]>
1 parent 19fa931 commit a10f565

File tree

2 files changed

+131
-1
lines changed

2 files changed

+131
-1
lines changed

cloud/src/meta-store/txn_kv.cpp

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -875,6 +875,94 @@ TxnErrorCode Transaction::get_conflicting_range(
875875
return TxnErrorCode::TXN_OK;
876876
}
877877

878+
TxnErrorCode Transaction::get_read_conflict_range(
879+
std::vector<std::pair<std::string, std::string>>* values) {
880+
constexpr std::string_view start = "\xff\xff/transaction/read_conflict_range/";
881+
constexpr std::string_view end = "\xff\xff/transaction/read_conflict_range/\xff";
882+
883+
int limit = 0;
884+
int target_bytes = 0;
885+
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
886+
int iteration = 0;
887+
fdb_bool_t snapshot = 0;
888+
fdb_bool_t reverse = 0;
889+
FDBFuture* future = fdb_transaction_get_range(
890+
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
891+
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
892+
target_bytes, mode, iteration, snapshot, reverse);
893+
894+
DORIS_CLOUD_DEFER {
895+
fdb_future_destroy(future);
896+
};
897+
898+
RETURN_IF_ERROR(await_future(future));
899+
900+
FDBKeyValue const* out_kvs;
901+
int out_kvs_count;
902+
fdb_bool_t out_more;
903+
do {
904+
fdb_error_t err =
905+
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
906+
if (err) {
907+
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
908+
<< fdb_get_error(err);
909+
return cast_as_txn_code(err);
910+
}
911+
for (int i = 0; i < out_kvs_count; i++) {
912+
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
913+
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
914+
key.remove_prefix(start.size());
915+
values->emplace_back(key, value);
916+
}
917+
} while (out_more);
918+
919+
return TxnErrorCode::TXN_OK;
920+
}
921+
922+
TxnErrorCode Transaction::get_write_conflict_range(
923+
std::vector<std::pair<std::string, std::string>>* values) {
924+
constexpr std::string_view start = "\xff\xff/transaction/write_conflict_range/";
925+
constexpr std::string_view end = "\xff\xff/transaction/write_conflict_range/\xff";
926+
927+
int limit = 0;
928+
int target_bytes = 0;
929+
FDBStreamingMode mode = FDB_STREAMING_MODE_WANT_ALL;
930+
int iteration = 0;
931+
fdb_bool_t snapshot = 0;
932+
fdb_bool_t reverse = 0;
933+
FDBFuture* future = fdb_transaction_get_range(
934+
txn_, FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)start.data(), start.size()),
935+
FDB_KEYSEL_FIRST_GREATER_OR_EQUAL((uint8_t*)end.data(), end.size()), limit,
936+
target_bytes, mode, iteration, snapshot, reverse);
937+
938+
DORIS_CLOUD_DEFER {
939+
fdb_future_destroy(future);
940+
};
941+
942+
RETURN_IF_ERROR(await_future(future));
943+
944+
FDBKeyValue const* out_kvs;
945+
int out_kvs_count;
946+
fdb_bool_t out_more;
947+
do {
948+
fdb_error_t err =
949+
fdb_future_get_keyvalue_array(future, &out_kvs, &out_kvs_count, &out_more);
950+
if (err) {
951+
LOG(WARNING) << "get_conflicting_range get keyvalue array error: "
952+
<< fdb_get_error(err);
953+
return cast_as_txn_code(err);
954+
}
955+
for (int i = 0; i < out_kvs_count; i++) {
956+
std::string_view key((char*)out_kvs[i].key, out_kvs[i].key_length);
957+
std::string_view value((char*)out_kvs[i].value, out_kvs[i].value_length);
958+
key.remove_prefix(start.size());
959+
values->emplace_back(key, value);
960+
}
961+
} while (out_more);
962+
963+
return TxnErrorCode::TXN_OK;
964+
}
965+
878966
TxnErrorCode Transaction::report_conflicting_range() {
879967
if (!config::enable_logging_conflict_keys) {
880968
return TxnErrorCode::TXN_OK;
@@ -900,7 +988,45 @@ TxnErrorCode Transaction::report_conflicting_range() {
900988
out += fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
901989
}
902990

903-
LOG(WARNING) << "conflicting key ranges: " << out;
991+
key_values.clear();
992+
RETURN_IF_ERROR(get_read_conflict_range(&key_values));
993+
if (key_values.size() % 2 != 0) {
994+
LOG(WARNING) << "the read conflict range is not well-formed, size=" << key_values.size();
995+
return TxnErrorCode::TXN_INVALID_DATA;
996+
}
997+
std::string read_conflict_range_out;
998+
for (size_t i = 0; i < key_values.size(); i += 2) {
999+
std::string_view start = key_values[i].first;
1000+
std::string_view end = key_values[i + 1].first;
1001+
std::string_view conflict_count = key_values[i].second;
1002+
if (!read_conflict_range_out.empty()) {
1003+
read_conflict_range_out += ", ";
1004+
}
1005+
read_conflict_range_out +=
1006+
fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
1007+
}
1008+
1009+
key_values.clear();
1010+
RETURN_IF_ERROR(get_write_conflict_range(&key_values));
1011+
if (key_values.size() % 2 != 0) {
1012+
LOG(WARNING) << "the write conflict range is not well-formed, size=" << key_values.size();
1013+
return TxnErrorCode::TXN_INVALID_DATA;
1014+
}
1015+
std::string write_conflict_range_out;
1016+
for (size_t i = 0; i < key_values.size(); i += 2) {
1017+
std::string_view start = key_values[i].first;
1018+
std::string_view end = key_values[i + 1].first;
1019+
std::string_view conflict_count = key_values[i].second;
1020+
if (!write_conflict_range_out.empty()) {
1021+
write_conflict_range_out += ", ";
1022+
}
1023+
write_conflict_range_out +=
1024+
fmt::format("[{}, {}): {}", hex(start), hex(end), conflict_count);
1025+
}
1026+
1027+
LOG(WARNING) << "conflicting key ranges: " << out
1028+
<< ", read conflict range: " << read_conflict_range_out
1029+
<< ", write conflict range: " << write_conflict_range_out;
9041030

9051031
return TxnErrorCode::TXN_OK;
9061032
}

cloud/src/meta-store/txn_kv.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,10 @@ class Transaction : public cloud::Transaction {
827827
// It only works when the report_conflicting_ranges option is enabled.
828828
TxnErrorCode get_conflicting_range(
829829
std::vector<std::pair<std::string, std::string>>* key_values);
830+
TxnErrorCode get_read_conflict_range(
831+
std::vector<std::pair<std::string, std::string>>* key_values);
832+
TxnErrorCode get_write_conflict_range(
833+
std::vector<std::pair<std::string, std::string>>* key_values);
830834
TxnErrorCode report_conflicting_range();
831835

832836
std::shared_ptr<Database> db_ {nullptr};

0 commit comments

Comments
 (0)