@@ -1364,6 +1364,184 @@ TEST_F(TxnPreWriteTest, RepeatedPreWriteWithOnePC) {
13641364 }
13651365}
13661366
1367+ TEST_F (TxnPreWriteTest, RepeatedComittedPreWriteWithOnePC) {
1368+ // create region for test
1369+ auto region_id = 373 ;
1370+ auto region = store::Region::New (region_id);
1371+ region->SetState (pb::common::StoreRegionState::NORMAL);
1372+ auto store_region_meta = mono_engine->GetStoreMetaManager ()->GetStoreRegionMeta ();
1373+ store_region_meta->AddRegion (region);
1374+ auto region_metrics = StoreRegionMetrics::NewMetrics (region->Id ());
1375+ mono_engine->GetStoreMetricsManager ()->GetStoreRegionMetrics ()->AddMetrics (region_metrics);
1376+
1377+ // optismistic reapeated prewrite1pc
1378+ {
1379+ // txn 1
1380+ butil::Status ok;
1381+ pb::store::TxnPrewriteResponse response;
1382+ auto ctx = std::make_shared<Context>();
1383+ ctx->SetRegionId (region_id);
1384+ ctx->SetCfName (Constant::kStoreDataCF );
1385+ ctx->SetResponse (&response);
1386+ std::vector<pb::store::Mutation> mutations;
1387+ std::string key = " maru_test_key1" ;
1388+ std::string value = " value1" ;
1389+ std::string primary_lock = key;
1390+ int64_t target_start_ts = 10 ;
1391+ int64_t target_lock_ttl = lock_ttl; // 2034-07-11 14:21:21
1392+ int64_t txn_size = 1 ;
1393+ bool try_one_pc = true ;
1394+ int64_t min_commit_ts = 12 ;
1395+ int64_t max_commit_ts = 0 ;
1396+ std::vector<int64_t > pessimistic_checks;
1397+ std::map<int64_t , int64_t > for_update_ts_checks;
1398+ std::map<int64_t , std::string> lock_extra_datas;
1399+ pb::store::Mutation mutation;
1400+ mutation.set_op (::dingodb::pb::store::Op::Put);
1401+ mutation.set_key (key);
1402+ mutation.set_value (value);
1403+ mutations.emplace_back (mutation);
1404+ for_update_ts_checks.insert_or_assign (0 , 0 );
1405+ lock_extra_datas.insert_or_assign (0 , " " );
1406+ pessimistic_checks.push_back (0 );
1407+ std::vector<std::string> secondaries;
1408+ // normal prewrite
1409+ auto status = TxnEngineHelper::Prewrite (engine, mono_engine, ctx, region, mutations, primary_lock, target_start_ts,
1410+ target_lock_ttl, txn_size, try_one_pc, min_commit_ts, max_commit_ts,
1411+ pessimistic_checks, for_update_ts_checks, lock_extra_datas, secondaries);
1412+ EXPECT_EQ (status.ok (), true );
1413+ EXPECT_EQ (response.one_pc_commit_ts (), min_commit_ts);
1414+ EXPECT_EQ (response.txn_result_size (), 0 );
1415+ MustUnlock (key);
1416+ MustGet (key, min_commit_ts, value);
1417+ MustGetCommitTs (key, min_commit_ts, min_commit_ts);
1418+
1419+ {
1420+ // txn 2
1421+ // normal prewrite
1422+ int64_t target_start_ts_2 = 21 ;
1423+ int64_t min_commit_ts_2 = 22 ;
1424+ try_one_pc = false ;
1425+ std::string value2 = " value2" ;
1426+ std::vector<pb::store::Mutation> mutations2;
1427+ pb::store::Mutation mutation2;
1428+ mutation2.set_op (::dingodb::pb::store::Op::Put);
1429+ mutation2.set_key (key);
1430+ mutation2.set_value (value2);
1431+ mutations2.emplace_back (mutation2);
1432+
1433+ auto status =
1434+ TxnEngineHelper::Prewrite (engine, mono_engine, ctx, region, mutations2, primary_lock, target_start_ts_2,
1435+ target_lock_ttl, txn_size, try_one_pc, min_commit_ts_2, max_commit_ts,
1436+ pessimistic_checks, for_update_ts_checks, lock_extra_datas, secondaries);
1437+ EXPECT_EQ (status.ok (), true );
1438+ EXPECT_EQ (response.txn_result_size (), 0 );
1439+ }
1440+
1441+ {
1442+ // 1pc repeated commited prewrite
1443+ try_one_pc = true ;
1444+ auto status =
1445+ TxnEngineHelper::Prewrite (engine, mono_engine, ctx, region, mutations, primary_lock, target_start_ts,
1446+ target_lock_ttl, txn_size, try_one_pc, min_commit_ts, max_commit_ts,
1447+ pessimistic_checks, for_update_ts_checks, lock_extra_datas, secondaries);
1448+
1449+ EXPECT_EQ (status.ok (), true );
1450+ EXPECT_EQ (response.one_pc_commit_ts (), min_commit_ts);
1451+ EXPECT_EQ (response.txn_result_size (), 0 );
1452+ MustGet (key, min_commit_ts + 1 , value);
1453+ MustGetCommitTs (key, min_commit_ts, min_commit_ts);
1454+ }
1455+ DeleteRange ();
1456+ }
1457+
1458+ // optismistic reapeated prewrite1pc
1459+ {
1460+ // txn 1
1461+ butil::Status ok;
1462+ pb::store::TxnPrewriteResponse response;
1463+ auto ctx = std::make_shared<Context>();
1464+ ctx->SetRegionId (region_id);
1465+ ctx->SetCfName (Constant::kStoreDataCF );
1466+ ctx->SetResponse (&response);
1467+ std::vector<pb::store::Mutation> mutations;
1468+ std::string key = " maru_test_key1" ;
1469+ std::string value = " value1" ;
1470+ std::string primary_lock = key;
1471+ int64_t target_start_ts = 10 ;
1472+ int64_t target_lock_ttl = lock_ttl; // 2034-07-11 14:21:21
1473+ int64_t txn_size = 1 ;
1474+ bool try_one_pc = true ;
1475+ int64_t min_commit_ts = 12 ;
1476+ int64_t max_commit_ts = 0 ;
1477+ std::vector<int64_t > pessimistic_checks;
1478+ std::map<int64_t , int64_t > for_update_ts_checks;
1479+ std::map<int64_t , std::string> lock_extra_datas;
1480+ pb::store::Mutation mutation;
1481+ mutation.set_op (::dingodb::pb::store::Op::Put);
1482+ mutation.set_key (key);
1483+ mutation.set_value (value);
1484+ mutations.emplace_back (mutation);
1485+ for_update_ts_checks.insert_or_assign (0 , 0 );
1486+ lock_extra_datas.insert_or_assign (0 , " " );
1487+ pessimistic_checks.push_back (0 );
1488+ std::vector<std::string> secondaries;
1489+ // 1pc prewrite
1490+ auto status = TxnEngineHelper::Prewrite (engine, mono_engine, ctx, region, mutations, primary_lock, target_start_ts,
1491+ target_lock_ttl, txn_size, try_one_pc, min_commit_ts, max_commit_ts,
1492+ pessimistic_checks, for_update_ts_checks, lock_extra_datas, secondaries);
1493+ std::cout << " tnx1 first write." << std::endl;
1494+ EXPECT_EQ (status.ok (), true );
1495+ EXPECT_EQ (response.one_pc_commit_ts (), min_commit_ts);
1496+ EXPECT_EQ (response.txn_result_size (), 0 );
1497+ MustUnlock (key);
1498+ MustGet (key, min_commit_ts, value);
1499+ MustGetCommitTs (key, min_commit_ts, min_commit_ts);
1500+
1501+ {
1502+ // txn 2
1503+ // 1pc prewrite
1504+ int64_t target_start_ts_2 = 21 ;
1505+ int64_t min_commit_ts_2 = 22 ;
1506+ try_one_pc = true ;
1507+ std::string value2 = " value2" ;
1508+ std::vector<pb::store::Mutation> mutations2;
1509+ pb::store::Mutation mutation2;
1510+ mutation2.set_op (::dingodb::pb::store::Op::Put);
1511+ mutation2.set_key (key);
1512+ mutation2.set_value (value2);
1513+ mutations2.emplace_back (mutation2);
1514+
1515+ auto status =
1516+ TxnEngineHelper::Prewrite (engine, mono_engine, ctx, region, mutations2, primary_lock, target_start_ts_2,
1517+ target_lock_ttl, txn_size, try_one_pc, min_commit_ts_2, max_commit_ts,
1518+ pessimistic_checks, for_update_ts_checks, lock_extra_datas, secondaries);
1519+ EXPECT_EQ (status.ok (), true );
1520+ EXPECT_EQ (response.one_pc_commit_ts (), min_commit_ts_2);
1521+ EXPECT_EQ (response.txn_result_size (), 0 );
1522+ MustUnlock (key);
1523+ MustGet (key, min_commit_ts_2, value2);
1524+ MustGetCommitTs (key, min_commit_ts_2, min_commit_ts_2);
1525+ }
1526+
1527+ {
1528+ // 1pc repeated commited prewrite
1529+ try_one_pc = true ;
1530+ auto status =
1531+ TxnEngineHelper::Prewrite (engine, mono_engine, ctx, region, mutations, primary_lock, target_start_ts,
1532+ target_lock_ttl, txn_size, try_one_pc, min_commit_ts, max_commit_ts,
1533+ pessimistic_checks, for_update_ts_checks, lock_extra_datas, secondaries);
1534+
1535+ EXPECT_EQ (status.ok (), true );
1536+ EXPECT_EQ (response.one_pc_commit_ts (), min_commit_ts);
1537+ EXPECT_EQ (response.txn_result_size (), 0 );
1538+ MustGet (key, min_commit_ts + 1 , value);
1539+ MustGetCommitTs (key, min_commit_ts, min_commit_ts);
1540+ }
1541+ DeleteRange ();
1542+ }
1543+ }
1544+
13671545TEST_F (TxnPreWriteTest, KvDeleteRange) { DeleteRange (); }
13681546
13691547} // namespace dingodb
0 commit comments