Skip to content

Commit 8a64c50

Browse files
yuhaijun999ketor
authored andcommitted
[fix][store] Fix TxnScan crash bug.
1 parent cbb47eb commit 8a64c50

File tree

2 files changed

+160
-30
lines changed

2 files changed

+160
-30
lines changed

src/coprocessor/coprocessor_v2.cc

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ butil::Status CoprocessorV2::Execute(IteratorPtr iter, bool key_only, size_t max
241241
butil::Status status;
242242
has_more = false;
243243

244-
if(forAggCount_) {
244+
if (forAggCount_) {
245245
long count = 0;
246246
while (iter->Valid()) {
247247
count++;
@@ -254,7 +254,7 @@ butil::Status CoprocessorV2::Execute(IteratorPtr iter, bool key_only, size_t max
254254

255255
result_record.push_back(std::make_any<long>(count));
256256
status = GetKvFromExpr(result_record, &has_result_kv, &result_kv);
257-
if(has_result_kv) {
257+
if (has_result_kv) {
258258
kvs->emplace_back(std::move(result_kv));
259259
}
260260
} else {
@@ -348,11 +348,31 @@ butil::Status CoprocessorV2::Execute(TxnIteratorPtr iter, bool key_only, bool /*
348348

349349
size_t bytes = 0;
350350

351-
if(forAggCount_) {
351+
if (forAggCount_) {
352352
long count = 0;
353353
while (iter->Valid(txn_result_info)) {
354354
count++;
355-
iter->Next();
355+
status = iter->Next();
356+
if (!status.ok()) {
357+
if (status.error_code() == pb::error::Errno::ETXN_LOCK_CONFLICT) {
358+
DINGO_LOG(INFO) << fmt::format(
359+
"[txn] CoprocessorV2::Execute TxnIteratorPtr iter->Next() meet lock conflict, status: {}.",
360+
status.error_str());
361+
362+
CHECK(!iter->Valid(txn_result_info)) << fmt::format(
363+
"[txn] CoprocessorV2::Execute TxnIteratorPtr iter->Next() meet pb::error::Errno::ETXN_LOCK_CONFLICT, but "
364+
"iter->Valid = true. txn_result_info: "
365+
"{}",
366+
txn_result_info.DebugString());
367+
break;
368+
369+
} else {
370+
std::string s = fmt::format("[txn] CoprocessorV2::Execute TxnIteratorPtr iter->Next() failed, error: {}",
371+
status.error_str());
372+
DINGO_LOG(ERROR) << s;
373+
return butil::Status(status.error_code(), s);
374+
}
375+
}
356376
}
357377

358378
std::vector<std::any> result_record;
@@ -361,7 +381,7 @@ butil::Status CoprocessorV2::Execute(TxnIteratorPtr iter, bool key_only, bool /*
361381

362382
result_record.push_back(std::make_any<long>(count));
363383
status = GetKvFromExpr(result_record, &has_result_kv, &result_kv);
364-
if(has_result_kv) {
384+
if (has_result_kv) {
365385
kvs.emplace_back(std::move(result_kv));
366386
}
367387
} else {
@@ -407,7 +427,27 @@ butil::Status CoprocessorV2::Execute(TxnIteratorPtr iter, bool key_only, bool /*
407427
iter_next_spend_time_ms += lambda_time_diff_microseconds_function(next_start, next_end);
408428
});
409429
}
410-
iter->Next();
430+
status = iter->Next();
431+
if (!status.ok()) {
432+
if (status.error_code() == pb::error::Errno::ETXN_LOCK_CONFLICT) {
433+
DINGO_LOG(INFO) << fmt::format(
434+
"[txn] CoprocessorV2::Execute TxnIteratorPtr iter->Next() meet lock conflict, status: {}.",
435+
status.error_str());
436+
437+
CHECK(!iter->Valid(txn_result_info)) << fmt::format(
438+
"[txn] CoprocessorV2::Execute TxnIteratorPtr iter->Next() meet pb::error::Errno::ETXN_LOCK_CONFLICT, but "
439+
"iter->Valid = true. txn_result_info: "
440+
"{}",
441+
txn_result_info.DebugString());
442+
break;
443+
444+
} else {
445+
std::string s = fmt::format("[txn] CoprocessorV2::Execute TxnIteratorPtr iter->Next() failed, error: {}",
446+
status.error_str());
447+
DINGO_LOG(ERROR) << s;
448+
return butil::Status(status.error_code(), s);
449+
}
450+
}
411451
}
412452

413453
status = GetKvFromExprEndOfFinish(&kvs);
@@ -647,8 +687,8 @@ butil::Status CoprocessorV2::GetKvFromExprEndOfFinish(std::vector<pb::common::Ke
647687
});
648688
}
649689
// int codec_version = GetCodecVersion(kv.key());
650-
status = RelExprHelper::TransFromOperandWrapper(coprocessor_.codec_version(), result_operand_ptr, result_serial_schemas_,
651-
result_column_indexes_, result_record);
690+
status = RelExprHelper::TransFromOperandWrapper(coprocessor_.codec_version(), result_operand_ptr,
691+
result_serial_schemas_, result_column_indexes_, result_record);
652692
if (!status.ok()) {
653693
DINGO_LOG(ERROR) << status.error_cstr();
654694
return status;

src/engine/txn_engine_helper.cc

Lines changed: 112 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -830,8 +830,16 @@ butil::Status TxnIterator::GetCurrentValue() {
830830
// if lock_key == write_key, then we can get data from write_cf
831831
if (last_lock_key_ == last_write_key_) {
832832
bool is_value_found = false;
833-
GetUserValueInWriteIter(write_iter_, reader_, isolation_level_, seek_ts_, start_ts_, key_, last_write_key_,
834-
is_value_found, value_);
833+
butil::Status status = GetUserValueInWriteIter(write_iter_, reader_, isolation_level_, seek_ts_, start_ts_, key_,
834+
last_write_key_, is_value_found, value_);
835+
if (!status.ok()) {
836+
key_.clear();
837+
value_.clear();
838+
std::string s = fmt::format("[txn]Scan get user value in write_iter failed, key: {}, status: {}",
839+
Helper::StringToHex(key_), status.error_str());
840+
DINGO_LOG(ERROR) << s;
841+
return status;
842+
}
835843

836844
if (is_value_found) {
837845
return butil::Status::OK();
@@ -849,8 +857,16 @@ butil::Status TxnIterator::GetCurrentValue() {
849857
key_ = last_write_key_;
850858

851859
bool is_value_found = false;
852-
GetUserValueInWriteIter(write_iter_, reader_, isolation_level_, seek_ts_, start_ts_, key_, last_write_key_,
853-
is_value_found, value_);
860+
butil::Status status = GetUserValueInWriteIter(write_iter_, reader_, isolation_level_, seek_ts_, start_ts_, key_,
861+
last_write_key_, is_value_found, value_);
862+
if (!status.ok()) {
863+
key_.clear();
864+
value_.clear();
865+
std::string s = fmt::format("[txn]Scan get user value in write_iter failed, key: {}, status: {}",
866+
Helper::StringToHex(key_), status.error_str());
867+
DINGO_LOG(ERROR) << s;
868+
return status;
869+
}
854870

855871
if (is_value_found) {
856872
return butil::Status::OK();
@@ -1281,15 +1297,45 @@ butil::Status TxnEngineHelper::Scan(StreamPtr stream, RawEnginePtr raw_engine,
12811297
}
12821298

12831299
// get or new TxnIterator.
1284-
auto stream_state =
1285-
std::dynamic_pointer_cast<TxnScanStreamState>(stream->GetOrNewStreamState([&]() -> StreamStatePtr {
1286-
auto iter = std::make_shared<TxnIterator>(raw_engine, range, start_ts, isolation_level, resolved_locks);
1287-
auto ret = iter->Init();
1288-
CHECK(ret.ok()) << fmt::format("[txn][{}] Scan init txn_iter failed, start_ts: {} range: {} status: {}.",
1289-
stream->StreamId(), start_ts, Helper::RangeToString(range), ret.error_str());
1290-
iter->Seek(range.start_key());
1291-
return TxnScanStreamState::New(iter);
1292-
}));
1300+
StreamStatePtr current_stream_state = stream->StreamState();
1301+
if (!current_stream_state) {
1302+
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_detail) << fmt::format(
1303+
"[txn][{}] Scan current_stream_state is null, need to create new TxnIterator.", stream->StreamId());
1304+
1305+
auto iter = std::make_shared<TxnIterator>(raw_engine, range, start_ts, isolation_level, resolved_locks);
1306+
butil::Status status = iter->Init();
1307+
if (!status.ok()) {
1308+
std::string s = fmt::format("[txn][{}] Scan init txn_iter failed, start_ts: {} range: {} status: {}.",
1309+
stream->StreamId(), start_ts, Helper::RangeToString(range), status.error_str());
1310+
DINGO_LOG(ERROR) << s;
1311+
return butil::Status(status.error_code(), s);
1312+
}
1313+
1314+
status = iter->Seek(range.start_key());
1315+
if (!status.ok()) {
1316+
if (status.error_code() == pb::error::Errno::ETXN_LOCK_CONFLICT) {
1317+
DINGO_LOG(INFO) << fmt::format("[txn][{}] Scan seek meet lock conflict, start_ts: {} range: {} status: {}.",
1318+
stream->StreamId(), start_ts, Helper::RangeToString(range), status.error_str());
1319+
1320+
CHECK(!iter->Valid(txn_result_info)) << fmt::format(
1321+
"[txn][{}] Scan Seek meet pb::error::Errno::ETXN_LOCK_CONFLICT, but iter->Valid = true. txn_result_info: "
1322+
"{}",
1323+
stream->StreamId(), txn_result_info.DebugString());
1324+
return butil::Status::OK();
1325+
1326+
} else {
1327+
std::string s = fmt::format("[txn][{}] Scan seek txn_iter failed, start_ts: {} range: {} status: {}.",
1328+
stream->StreamId(), start_ts, Helper::RangeToString(range), status.error_str());
1329+
DINGO_LOG(ERROR) << s;
1330+
return butil::Status(status.error_code(), s);
1331+
}
1332+
}
1333+
1334+
StreamStatePtr new_stream_state = TxnScanStreamState::New(iter);
1335+
stream->SetStreamState(new_stream_state);
1336+
} // if (!current_stream_state)
1337+
1338+
TxnScanStreamStatePtr stream_state = std::dynamic_pointer_cast<TxnScanStreamState>(stream->StreamState());
12931339
TxnIteratorPtr iter = stream_state->iter;
12941340
CHECK(iter != nullptr) << fmt::format("[txn][{}] Scan stream_state->iter is nullptr.", stream->StreamId());
12951341

@@ -1340,13 +1386,50 @@ butil::Status TxnEngineHelper::Scan(StreamPtr stream, RawEnginePtr raw_engine,
13401386
break;
13411387
}
13421388

1343-
iter->Next();
1389+
butil::Status status = iter->Next();
1390+
if (!status.ok()) {
1391+
if (status.error_code() == pb::error::Errno::ETXN_LOCK_CONFLICT) {
1392+
DINGO_LOG(INFO) << fmt::format("[txn][{}] Scan Next meet lock conflict, start_ts: {} range: {} status: {}.",
1393+
stream->StreamId(), start_ts, Helper::RangeToString(range),
1394+
status.error_str());
1395+
1396+
CHECK(!iter->Valid(txn_result_info)) << fmt::format(
1397+
"[txn][{}] Scan Next meet pb::error::Errno::ETXN_LOCK_CONFLICT, but iter->Valid = true. txn_result_info: "
1398+
"{}",
1399+
stream->StreamId(), txn_result_info.DebugString());
1400+
return butil::Status::OK();
1401+
1402+
} else {
1403+
std::string s = fmt::format("[txn][{}] Scan iter->Next() failed, start_ts: {} range: {} status: {}.",
1404+
stream->StreamId(), start_ts, Helper::RangeToString(range), status.error_str());
1405+
DINGO_LOG(ERROR) << s;
1406+
return butil::Status(status.error_code(), s);
1407+
}
1408+
}
13441409
}
13451410
}
13461411

13471412
if (iter->Valid(txn_result_info)) {
13481413
end_scan_key = iter->Key();
1349-
iter->Next();
1414+
butil::Status status = iter->Next();
1415+
if (!status.ok()) {
1416+
if (status.error_code() == pb::error::Errno::ETXN_LOCK_CONFLICT) {
1417+
DINGO_LOG(INFO) << fmt::format("[txn][{}] Scan Next meet lock conflict, start_ts: {} range: {} status: {}.",
1418+
stream->StreamId(), start_ts, Helper::RangeToString(range), status.error_str());
1419+
1420+
CHECK(!iter->Valid(txn_result_info)) << fmt::format(
1421+
"[txn][{}] Scan Next meet pb::error::Errno::ETXN_LOCK_CONFLICT, but iter->Valid = true. txn_result_info: "
1422+
"{}",
1423+
stream->StreamId(), txn_result_info.DebugString());
1424+
return butil::Status::OK();
1425+
1426+
} else {
1427+
std::string s = fmt::format("[txn][{}] Scan iter->Next() failed, start_ts: {} range: {} status: {}.",
1428+
stream->StreamId(), start_ts, Helper::RangeToString(range), status.error_str());
1429+
DINGO_LOG(ERROR) << s;
1430+
return butil::Status(status.error_code(), s);
1431+
}
1432+
}
13501433
}
13511434

13521435
return butil::Status::OK();
@@ -4598,15 +4681,17 @@ butil::Status TxnEngineHelper::DoGcCoreNonTxn(RawEnginePtr raw_engine, std::shar
45984681
auto [internal_gc_stop, internal_safe_point_ts] = gc_safe_point->GetGcFlagAndSafePointTs();
45994682
if (internal_gc_stop) {
46004683
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_gc_detail) << fmt::format(
4601-
"[txn_gc][tenant({})][region({})][type({})][nontxn] gc_stop set stop. start_key : {} end_key : {}. return",
4684+
"[txn_gc][tenant({})][region({})][type({})][nontxn] gc_stop set stop. start_key : {} end_key : {}. "
4685+
"return",
46024686
gc_safe_point->GetTenantId(), ctx->RegionId(), pb::common::RegionType_Name(type),
46034687
Helper::StringToHex(region_start_key), Helper::StringToHex(region_end_key));
46044688
goto _interrupt1;
46054689
}
46064690

46074691
if (safe_point_ts < internal_safe_point_ts) {
46084692
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_gc_detail) << fmt::format(
4609-
"[txn_gc][tenant({})][region({})][type({})][nontxn] current safe_point_ts : {}. newest safe_point_ts : {}. "
4693+
"[txn_gc][tenant({})][region({})][type({})][nontxn] current safe_point_ts : {}. newest safe_point_ts : "
4694+
"{}. "
46104695
"Don't worry, we'll deal with it next time. ignore. start_key : {} end_key : {}",
46114696
gc_safe_point->GetTenantId(), ctx->RegionId(), pb::common::RegionType_Name(type), safe_point_ts,
46124697
internal_safe_point_ts, Helper::StringToHex(region_start_key), Helper::StringToHex(region_end_key));
@@ -4769,7 +4854,8 @@ butil::Status TxnEngineHelper::CheckLockForTxnGc(RawEngine::ReaderPtr reader, st
47694854
bool parse_success = lock_info.ParseFromString(std::string(lock_iter_value));
47704855
if (!parse_success) {
47714856
std::string s = fmt::format(
4772-
"[txn_gc][lock][tenant({})][region({})][type({})][txn] parse lock info failed, lock_iter_key : {} lock_key: "
4857+
"[txn_gc][lock][tenant({})][region({})][type({})][txn] parse lock info failed, lock_iter_key : {} "
4858+
"lock_key: "
47734859
"{} , lock_value : {} safe_point_ts : {}",
47744860
tenant_id, region_id, pb::common::RegionType_Name(type), Helper::StringToHex(lock_iter_key),
47754861
Helper::StringToHex(lambda_get_raw_key_function(lock_iter_key)), Helper::StringToHex(lock_iter_value),
@@ -4783,7 +4869,8 @@ butil::Status TxnEngineHelper::CheckLockForTxnGc(RawEngine::ReaderPtr reader, st
47834869
int64_t lock_ts = lock_info.lock_ts();
47844870
if (lock_ts <= safe_point_ts) {
47854871
std::string s = fmt::format(
4786-
"[txn_gc][lock][tenant({})][region({})][type({})][txn] find lock error. exist lock_ts : {} <= safe_point_ts "
4872+
"[txn_gc][lock][tenant({})][region({})][type({})][txn] find lock error. exist lock_ts : {} <= "
4873+
"safe_point_ts "
47874874
": {}, lock_iter_key : {} lock_key: {} , safe_point_ts : {} lock_value : {}",
47884875
tenant_id, region_id, pb::common::RegionType_Name(type), lock_ts, safe_point_ts,
47894876
Helper::StringToHex(lock_iter_key), Helper::StringToHex(lambda_get_raw_key_function(lock_iter_key)),
@@ -5022,7 +5109,8 @@ butil::Status TxnEngineHelper::DoFinalWorkForTxnGc(
50225109
RaftEngineWriteForTxnGc(raft_engine, ctx, kv_deletes_lock, kv_deletes_data, kv_deletes_write, tenant_id, type);
50235110
if (!status.ok()) {
50245111
std::string s = fmt::format(
5025-
"[txn_gc][write][tenant({})][region({})][type({})][txn] RaftEngineWriteForTxnGc failed. kv_deletes_lock size : "
5112+
"[txn_gc][write][tenant({})][region({})][type({})][txn] RaftEngineWriteForTxnGc failed. kv_deletes_lock size "
5113+
": "
50265114
"{} kv_deletes_data size : {} kv_deletes_write : {} safe_point_ts : {}. ignore.",
50275115
tenant_id, ctx->RegionId(), pb::common::RegionType_Name(type), kv_deletes_lock.size(), kv_deletes_data.size(),
50285116
kv_deletes_write.size(), safe_point_ts);
@@ -5284,7 +5372,8 @@ void TxnEngineHelper::RegularDoGcHandler(void * /*arg*/) {
52845372
} else {
52855373
if (pb::common::StoreRegionState::NORMAL != region_ptr->State()) {
52865374
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_gc_detail) << fmt::format(
5287-
"[txn_gc][tenant({})][region({})] is leader. but state is not normal : {}. start_key : {} end_key : {}. "
5375+
"[txn_gc][tenant({})][region({})] is leader. but state is not normal : {}. start_key : {} end_key : {}. "
5376+
" "
52885377
"ignore.",
52895378
tenant_id, region_ptr->Id(), static_cast<int>(region_ptr->State()),
52905379
Helper::StringToHex(region_ptr->Range().start_key()), Helper::StringToHex(region_ptr->Range().end_key()));
@@ -5308,7 +5397,8 @@ void TxnEngineHelper::RegularDoGcHandler(void * /*arg*/) {
53085397

53095398
if (safe_point_ts < internal_safe_point_ts) {
53105399
DINGO_LOG_IF(INFO, FLAGS_dingo_log_switch_txn_gc_detail) << fmt::format(
5311-
"[txn_gc][tenant({})][region({})] current safe_point_ts : {}. newest safe_point_ts : {}. Don't worry, we'll "
5400+
"[txn_gc][tenant({})][region({})] current safe_point_ts : {}. newest safe_point_ts : {}. Don't worry, "
5401+
"we'll "
53125402
"deal with it next time. ignore.",
53135403
tenant_id, region_ptr->Id(), safe_point_ts, internal_safe_point_ts);
53145404
}

0 commit comments

Comments
 (0)