Skip to content

Commit 0ea3180

Browse files
committed
[#28317] YSQL: Start transactions with correct locality in AcquireObjectLocks RPC handler
Summary: Given the following transaction: ```sql BEGIN; INSERT INTO global_table ...; COMMIT; ``` Without object locking, the first RPC of this transaction is the Perform. This sets force_global_transaction in PgPerformOptionsPB based on the force_global_transaction GUC, and also contains tablespace oids from which we are able to decide the correct locality to start the transaction as (region-local, tablespace-local(tablespace), global). But with object locking, the first RPC is to acquire an object lock, and this RPC today does not set force_global_transaction, nor does it have any tablespace information. Instead, we just always start as region-local, and then later promoted to global during the Perform RPC as needed. Without object locking, we mask the latency of the CREATED heartbeat by use of the transaction pool, but with object locking, we cannot mask the latency of the PROMOTED heartbeat, so this results in an extra global roundtrip for global transactions. Additionally, in the context of #11268, it also means that every transaction will be promoted to global when tablespace locality is used, since region-local cannot promote to any tablespace-local, and there is also no good default tablespace-local to use instead. This is also the cause of several geo-partitioning unit tests failure when object locking is enabled, for exactly the above reasons. This revision passes force_global_transaction and tablespace oid for AcquireObjectLocks RPCs. This information is then used to start transactions (if needed) under the appropriate locality. **Upgrade/Downgrade safety** N/A, only modifies protobufs for local TS <=> PG communication. Jira: DB-17999 Test Plan: Jenkins Also see D47261 for Jenkins run with object locking on. Reviewers: myang Reviewed By: myang Subscribers: bkolagani, yql, ybase Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D47260
1 parent 321857c commit 0ea3180

File tree

12 files changed

+136
-52
lines changed

12 files changed

+136
-52
lines changed

src/yb/tserver/pg_client.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1303,6 +1303,7 @@ message PgAcquireObjectLockRequestPB {
13031303
uint64 relation_oid = 2;
13041304
uint64 object_oid = 3;
13051305
uint64 object_sub_oid = 4;
1306+
uint32 tablespace_oid = 5;
13061307
}
13071308
LockOId lock_oid = 3;
13081309

src/yb/tserver/pg_client_session.cc

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ DEFINE_test_flag(bool, perform_ignore_pg_is_region_local, false,
123123
"everything to work when this field is not set, as the field is only left in for upgrade from "
124124
"older versions and to be removed in the future.");
125125

126+
DEFINE_test_flag(bool, force_initial_region_local, false,
127+
"Force transaction to start as region-local initially.");
128+
126129
DECLARE_bool(vector_index_dump_stats);
127130
DECLARE_bool(yb_enable_cdc_consistent_snapshot_streams);
128131
DECLARE_bool(ysql_enable_db_catalog_version_mode);
@@ -1183,7 +1186,10 @@ class TransactionProvider {
11831186
if (!next_plain_) {
11841187
return std::nullopt;
11851188
}
1186-
auto res = NextTxnMetaForPlain(deadline, true /* is_for_release */);
1189+
// next_plain_ exists, so the locality passed here doesn't matter as we won't be creating
1190+
// a new transaction.
1191+
auto res = NextTxnMetaForPlain(
1192+
TransactionFullLocality::RegionLocal(), deadline, true /* is_for_release */);
11871193
if (!res.ok()) {
11881194
LOG(DFATAL) << "Unexpected error while fetching existing plain re-usable txn";
11891195
return std::nullopt;
@@ -1192,13 +1198,11 @@ class TransactionProvider {
11921198
}
11931199

11941200
Result<TransactionMetadata> NextTxnMetaForPlain(
1195-
CoarseTimePoint deadline, bool is_for_release = false) {
1201+
TransactionFullLocality locality, CoarseTimePoint deadline, bool is_for_release = false) {
11961202
client::internal::InFlightOpsGroupsWithMetadata ops_info;
11971203
if (!next_plain_) {
1198-
VLOG_WITH_FUNC(1) << "requesting new transaction";
1199-
// TODO(#28317): We should figure out the correct locality and use it here. With table-level
1200-
// locks, this will prevent transactions from being started as global or tablespace-local(X).
1201-
auto txn = Build(TransactionFullLocality::RegionLocal(), deadline, {});
1204+
VLOG_WITH_FUNC(1) << "requesting new transaction of locality " << locality;
1205+
auto txn = Build(locality, deadline, {});
12021206
// Don't execute txn->GetMetadata() here since the transaction is not iniatialized with
12031207
// its full metadata yet, like isolation level.
12041208
Synchronizer synchronizer;
@@ -2627,9 +2631,10 @@ class PgClientSession::Impl {
26272631
if (setup_session_result.is_plain && setup_session_result.session_data.transaction) {
26282632
RETURN_NOT_OK(setup_session_result.session_data.transaction->GetMetadata(deadline).get());
26292633
}
2634+
auto locality = GetTargetTransactionLocality(data->req);
26302635
auto txn_meta_res = setup_session_result.session_data.transaction
26312636
? setup_session_result.session_data.transaction->GetMetadata(deadline).get()
2632-
: NextObjectLockingTxnMeta(deadline);
2637+
: NextObjectLockingTxnMeta(locality, deadline);
26332638
RETURN_NOT_OK(txn_meta_res);
26342639
const auto lock_type = static_cast<TableLockType>(data->req.lock_type());
26352640
VLOG_WITH_PREFIX_AND_FUNC(1)
@@ -2666,12 +2671,12 @@ class PgClientSession::Impl {
26662671
txn_meta_res->status_tablet);
26672672
AcquireObjectLockLocallyWithRetries(
26682673
ts_lock_manager(), std::move(lock_req), deadline, std::move(callback),
2669-
[session_impl = this, txn = setup_session_result.session_data.transaction]
2674+
[session_impl = this, txn = setup_session_result.session_data.transaction, locality]
26702675
(CoarseTimePoint deadline) -> Status {
26712676
if (txn) {
26722677
RETURN_NOT_OK(txn->metadata());
26732678
} else {
2674-
RETURN_NOT_OK(session_impl->NextObjectLockingTxnMeta(deadline));
2679+
RETURN_NOT_OK(session_impl->NextObjectLockingTxnMeta(locality, deadline));
26752680
}
26762681
return Status::OK();
26772682
});
@@ -2928,7 +2933,9 @@ class PgClientSession::Impl {
29282933
std::tie(data->ops, data->vector_index_query) = VERIFY_RESULT(PrepareOperations(
29292934
&data->req, session, &data->sidecars, tables, vector_index_query_data_,
29302935
data->transaction != nullptr /* has_distributed_txn */,
2931-
make_lw_function([this, deadline] { return NextObjectLockingTxnMeta(deadline); }),
2936+
make_lw_function([this, locality, deadline] {
2937+
return NextObjectLockingTxnMeta(locality, deadline);
2938+
}),
29322939
IsObjectLockingEnabled()));
29332940
session->FlushAsync([this, data, trace, trace_created_locally,
29342941
start_time](client::FlushStatus* flush_status) {
@@ -3677,8 +3684,9 @@ class PgClientSession::Impl {
36773684

36783685
[[nodiscard]] bool IsObjectLockingEnabled() const { return ts_lock_manager() != nullptr; }
36793686

3680-
Result<TransactionMetadata> NextObjectLockingTxnMeta(CoarseTimePoint deadline) {
3681-
auto txn_meta = VERIFY_RESULT(transaction_provider_.NextTxnMetaForPlain(deadline));
3687+
Result<TransactionMetadata> NextObjectLockingTxnMeta(
3688+
TransactionFullLocality locality, CoarseTimePoint deadline) {
3689+
auto txn_meta = VERIFY_RESULT(transaction_provider_.NextTxnMetaForPlain(locality, deadline));
36823690
RegisterLockOwner(txn_meta.transaction_id, txn_meta.status_tablet);
36833691
return txn_meta;
36843692
}
@@ -3691,14 +3699,27 @@ class PgClientSession::Impl {
36913699
}
36923700

36933701
TransactionFullLocality GetTargetTransactionLocality(const PgPerformRequestPB& request) const {
3694-
auto tablespace_oids = request.ops() | std::views::transform(GetOpTablespaceOid);
3702+
return GetTargetTransactionLocality(
3703+
request.options(), request.ops() | std::views::transform(GetOpTablespaceOid));
3704+
}
3705+
3706+
TransactionFullLocality GetTargetTransactionLocality(
3707+
const PgAcquireObjectLockRequestPB& request) const {
3708+
return GetTargetTransactionLocality(
3709+
request.options(), std::views::single(request.lock_oid().tablespace_oid()));
3710+
}
3711+
3712+
TransactionFullLocality GetTargetTransactionLocality(
3713+
const PgPerformOptionsPB& options, std::ranges::range auto&& tablespace_oids) const {
3714+
if (PREDICT_FALSE(FLAGS_TEST_force_initial_region_local)) {
3715+
return TransactionFullLocality::RegionLocal();
3716+
}
36953717

3696-
if (FLAGS_use_tablespace_based_transaction_placement ||
3697-
request.options().force_tablespace_locality()) {
3698-
if (auto oid = request.options().force_tablespace_locality_oid()) {
3718+
if (FLAGS_use_tablespace_based_transaction_placement || options.force_tablespace_locality()) {
3719+
if (auto oid = options.force_tablespace_locality_oid()) {
36993720
return TransactionFullLocality::TablespaceLocal(oid);
37003721
}
3701-
return CalculateTablespaceBasedLocality(tablespace_oids);
3722+
return CalculateTablespaceBasedLocality(std::move(tablespace_oids));
37023723
}
37033724

37043725
// TODO: is_all_region_local() handles exactly two cases that tablespace oid check does not:
@@ -3711,10 +3732,10 @@ class PgClientSession::Impl {
37113732
// from use setting up some unit tests.
37123733
// Once these are no longer of concern, is_all_region_local and corresponding code in
37133734
// pggate/pg can be removed.
3714-
if (!FLAGS_TEST_perform_ignore_pg_is_region_local && request.options().is_all_region_local()) {
3735+
if (!FLAGS_TEST_perform_ignore_pg_is_region_local && options.is_all_region_local()) {
37153736
return TransactionFullLocality::RegionLocal();
37163737
}
3717-
return CalculateRegionBasedLocality(tablespace_oids);
3738+
return CalculateRegionBasedLocality(std::move(tablespace_oids));
37183739
}
37193740

37203741
TransactionFullLocality CalculateRegionBasedLocality(

src/yb/yql/pggate/pg_client.cc

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -493,10 +493,12 @@ class PgClient::Impl : public BigDataFetcher {
493493
public:
494494
Impl(
495495
std::reference_wrapper<const WaitEventWatcher> wait_event_watcher,
496-
std::atomic<uint64_t>& next_perform_op_serial_no)
496+
std::atomic<uint64_t>& next_perform_op_serial_no,
497+
const TablespaceMap& tablespace_map)
497498
: heartbeat_poller_(std::bind(&Impl::Heartbeat, this, false)),
498499
wait_event_watcher_(wait_event_watcher),
499-
next_perform_op_serial_no_(next_perform_op_serial_no) {
500+
next_perform_op_serial_no_(next_perform_op_serial_no),
501+
tablespace_map_(tablespace_map) {
500502
tablet_server_count_cache_.fill(0);
501503
}
502504

@@ -932,6 +934,15 @@ class PgClient::Impl : public BigDataFetcher {
932934
lock_oid->set_relation_oid(lock_id.relation_oid);
933935
lock_oid->set_object_oid(lock_id.object_oid);
934936
lock_oid->set_object_sub_oid(lock_id.object_sub_oid);
937+
if (lock_id.relation_oid >= kPgFirstNormalObjectId) {
938+
auto tablespace_itr = tablespace_map_.find(PgObjectId(lock_id.db_oid, lock_id.relation_oid));
939+
if (tablespace_itr == tablespace_map_.end()) {
940+
LOG(WARNING) << "Tablespace not found for db_oid=" << lock_id.db_oid
941+
<< " relation_oid=" << lock_id.relation_oid;
942+
} else {
943+
lock_oid->set_tablespace_oid(tablespace_itr->second);
944+
}
945+
}
935946
req.set_lock_type(static_cast<tserver::ObjectLockMode>(mode));
936947

937948
auto result_future = PrepareAndSend<AcquireObjectLockData>(
@@ -1744,6 +1755,8 @@ class PgClient::Impl : public BigDataFetcher {
17441755
InterprocessMappedRegion big_mapped_region_;
17451756
ThreadSafeArena object_locks_arena_;
17461757
std::atomic<uint64_t>& next_perform_op_serial_no_;
1758+
1759+
const TablespaceMap& tablespace_map_;
17471760
};
17481761

17491762
std::string DdlMode::ToString() const {
@@ -1761,8 +1774,9 @@ void DdlMode::ToPB(tserver::PgFinishTransactionRequestPB_DdlModePB* dest) const
17611774

17621775
PgClient::PgClient(
17631776
std::reference_wrapper<const WaitEventWatcher> wait_event_watcher,
1764-
std::atomic<uint64_t>& seq_number)
1765-
: impl_(new Impl(wait_event_watcher, seq_number)) {}
1777+
std::atomic<uint64_t>& seq_number,
1778+
const TablespaceMap& tablespace_map)
1779+
: impl_(new Impl(wait_event_watcher, seq_number, tablespace_map)) {}
17661780

17671781
PgClient::~PgClient() = default;
17681782

src/yb/yql/pggate/pg_client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,8 @@ class PgClient {
167167
public:
168168
PgClient(
169169
std::reference_wrapper<const WaitEventWatcher> wait_event_watcher,
170-
std::atomic<uint64_t>& next_perform_op_serial_no);
170+
std::atomic<uint64_t>& next_perform_op_serial_no,
171+
const TablespaceMap& tablespace_map);
171172
~PgClient();
172173

173174
Status Start(rpc::ProxyCache* proxy_cache,

src/yb/yql/pggate/pg_session.cc

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -933,9 +933,6 @@ Result<PerformFuture> PgSession::Perform(BufferableOperations&& ops, PerformOpti
933933
}
934934
}
935935

936-
options.set_force_global_transaction(yb_force_global_transaction);
937-
options.set_force_tablespace_locality(yb_force_tablespace_locality);
938-
options.set_force_tablespace_locality_oid(yb_force_tablespace_locality_oid);
939936
options.set_is_all_region_local(std::all_of(
940937
ops.operations().begin(), ops.operations().end(),
941938
[](const auto& op) { return op->is_region_local(); }));

src/yb/yql/pggate/pg_txn_manager.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,10 @@ Status PgTxnManager::SetupPerformOptions(
789789
if (read_time_action && *read_time_action == ReadTimeAction::RESET) {
790790
options->mutable_read_time()->Clear();
791791
}
792+
793+
options->set_force_global_transaction(yb_force_global_transaction);
794+
options->set_force_tablespace_locality(yb_force_tablespace_locality);
795+
options->set_force_tablespace_locality_oid(yb_force_tablespace_locality_oid);
792796
return Status::OK();
793797
}
794798

src/yb/yql/pggate/pggate.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -666,7 +666,8 @@ PgApiImpl::PgApiImpl(
666666
}),
667667
pg_shared_data_(
668668
*init_postgres_info.shared_data, !init_postgres_info.parallel_leader_session_id),
669-
pg_client_(wait_event_watcher_, pg_shared_data_->next_perform_op_serial_no),
669+
pg_client_(
670+
wait_event_watcher_, pg_shared_data_->next_perform_op_serial_no, tablespace_map_),
670671
clock_(new server::HybridClock()),
671672
enable_table_locking_(ShouldEnableTableLocks()),
672673
pg_txn_manager_(new PgTxnManager(&pg_client_, clock_, pg_callbacks_, enable_table_locking_)),

src/yb/yql/pggate/pggate.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -952,6 +952,8 @@ class PgApiImpl {
952952

953953
const WaitEventWatcher wait_event_watcher_;
954954

955+
TablespaceMap tablespace_map_;
956+
955957
PgSharedDataHolder pg_shared_data_;
956958

957959
// TODO Rename to client_ when YBClient is removed.
@@ -973,7 +975,6 @@ class PgApiImpl {
973975
TupleIdBuilder tuple_id_builder_;
974976
BufferingSettings buffering_settings_;
975977
YbctidReaderProvider ybctid_reader_provider_;
976-
TablespaceMap tablespace_map_;
977978
PgFKReferenceCache fk_reference_cache_;
978979
ExplicitRowLockBuffer explicit_row_lock_buffer_;
979980

src/yb/yql/pgwrapper/geo_transactions-test.cc

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,10 @@ class GeoTransactionsTest : public GeoTransactionsTestBase {
114114
}
115115
auto insert_value = NextInsertValue();
116116
ASSERT_OK(conn.ExecuteFormat("SET force_global_transaction = $0", ToString(session_var)));
117+
if (local_table) {
118+
ASSERT_OK(WarmupTablespaceCache(conn, *local_table));
119+
}
120+
ASSERT_OK(WarmupTablespaceCache(conn, target_table));
117121
ASSERT_OK(conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
118122
if (local_table) {
119123
ASSERT_OK(conn.ExecuteFormat(
@@ -178,6 +182,10 @@ class GeoTransactionsTest : public GeoTransactionsTestBase {
178182
ASSERT_OK(init_conn(conn));
179183
}
180184
ASSERT_OK(conn.ExecuteFormat("SET force_global_transaction = $0", ToString(session_var)));
185+
if (local_table) {
186+
ASSERT_OK(WarmupTablespaceCache(conn, *local_table));
187+
}
188+
ASSERT_OK(WarmupTablespaceCache(conn, target_table));
181189
for (size_t i = 0; i < num_aborts; ++i) {
182190
ASSERT_OK(conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION));
183191
auto insert_value = NextInsertValue();
@@ -329,10 +337,7 @@ class GeoTransactionsTestTableLocksDisabled : public GeoTransactionsTest {
329337
}
330338
};
331339

332-
// Fails when table-level locks are enabled due to #28317.
333-
TEST_F_EX(
334-
GeoTransactionsTest, YB_DISABLE_TEST_IN_TSAN(TestTransactionTabletSelection),
335-
GeoTransactionsTestTableLocksDisabled) {
340+
TEST_F(GeoTransactionsTest, YB_DISABLE_TEST_IN_TSAN(TestTransactionTabletSelection)) {
336341
constexpr int tables_per_region = 1;
337342

338343
ANNOTATE_UNPROTECTED_WRITE(FLAGS_auto_create_local_transaction_tables) = false;
@@ -510,10 +515,7 @@ TEST_F(GeoTransactionsTest, YB_DISABLE_TEST_IN_TSAN(TestMultiRegionTransactionTa
510515
InsertToLocalFirst::kTrue, ExpectedLocality::kGlobal);
511516
}
512517

513-
// Fails when table-level locks are enabled due to #28317.
514-
TEST_F_EX(
515-
GeoTransactionsTest, YB_DISABLE_TEST_IN_TSAN(TestAutomaticLocalTransactionTableCreation),
516-
GeoTransactionsTestTableLocksDisabled) {
518+
TEST_F(GeoTransactionsTest, YB_DISABLE_TEST_IN_TSAN(TestAutomaticLocalTransactionTableCreation)) {
517519
constexpr int tables_per_region = 1;
518520

519521
ANNOTATE_UNPROTECTED_WRITE(FLAGS_auto_create_local_transaction_tables) = true;
@@ -1194,11 +1196,13 @@ TEST_F(GeoTransactionsTablespaceLocalityTest, TestSessionVariableOverride) {
11941196
SetGlobalTransactionsGFlag::kTrue, SetGlobalTransactionSessionVar::kTrue,
11951197
ExpectedLocality::kGlobal, force_tablespace_auto_select);
11961198

1197-
auto force_tablespace_bad_select = [](pgwrapper::PGConn& conn) -> Status {
1199+
auto force_tablespace_bad_select = [this](pgwrapper::PGConn& conn) -> Status {
11981200
RETURN_NOT_OK(conn.Execute("SET yb_force_tablespace_locality = true"));
11991201
// Normal user oids start at 16384, so this should not map to anything, and we can test
12001202
// that it becomes global.
1201-
return conn.Execute("SET yb_force_tablespace_locality_oid = 1");
1203+
RETURN_NOT_OK(conn.Execute("SET yb_force_tablespace_locality_oid = 1"));
1204+
// Discard current transaction.
1205+
return WarmupTablespaceCache(conn, kTableName);
12021206
};
12031207

12041208
CheckSuccess(
@@ -1375,8 +1379,7 @@ TEST_F(GeoTransactionsTablespaceLocalityTest, TestAlterSetTablespace) {
13751379
ExpectedLocality::kGlobal);
13761380
}
13771381

1378-
// Fails when table-level locks are enabled due to #28317.
1379-
class GeoTransactionsWildcardTest : public GeoTransactionsTestTableLocksDisabled {
1382+
class GeoTransactionsWildcardTest : public GeoTransactionsTest {
13801383
protected:
13811384
void SetupTablespaces() override {
13821385
ANNOTATE_UNPROTECTED_WRITE(FLAGS_force_global_transactions) = true;

0 commit comments

Comments
 (0)