Skip to content

Commit 7392ad1

Browse files
craig[bot]DrewKimball
andcommitted
Merge #144427
144427: sql,kv: set the admission control header for leaf transactions r=michae2 a=DrewKimball #### kv: set the admission control header for leaf transactions This commit propagates the admission control header from the root transaction to any leaf transactions it creates. This ensures that users of leaf transactions (e.g. streamer, DistSQL, FK checks) correctly respect the user's requested quality of service. Fixes #144421 Release note (bug fix): Fixed a bug that could cause queries that perform work in parallel to ignore the requested quality-of-service level. Affected operations include lookup joins, DistSQL execution, and foreign-key checks. #### sql: add session seting for leaf txn admission control behavior This commit adds a `propagate_admission_header_to_leaf_transactions` session setting to control whether leaf txns inherit the admission header from root txns. The setting is on by default. Informs #144421 Release note: None Co-authored-by: Drew Kimball <[email protected]>
2 parents 4a882bf + 2e0325c commit 7392ad1

File tree

22 files changed

+158
-16
lines changed

22 files changed

+158
-16
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,7 @@ func TestTxnMultipleCoord(t *testing.T) {
10651065
// New create a second, leaf coordinator.
10661066
leafInputState, err := txn.GetLeafTxnInputState(ctx)
10671067
require.NoError(t, err)
1068-
txn2 := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)
1068+
txn2 := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
10691069

10701070
// Start the second transaction.
10711071
key2 := roachpb.Key("b")
@@ -2802,7 +2802,7 @@ func TestLeafTxnClientRejectError(t *testing.T) {
28022802
require.NoError(t, err)
28032803

28042804
// New create a second, leaf coordinator.
2805-
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)
2805+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
28062806

28072807
if _, err := leafTxn.Get(ctx, errKey); !testutils.IsError(err, "TransactionAbortedError") {
28082808
t.Fatalf("expected injected err, got: %v", err)
@@ -2832,7 +2832,7 @@ func TestUpdateRootWithLeafFinalStateInAbortedTxn(t *testing.T) {
28322832
txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
28332833
leafInputState, err := txn.GetLeafTxnInputState(ctx)
28342834
require.NoError(t, err)
2835-
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0, leafInputState)
2835+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0, leafInputState, nil /* header */)
28362836

28372837
finalState, err := leafTxn.GetLeafTxnFinalState(ctx)
28382838
if err != nil {
@@ -3038,7 +3038,7 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) {
30383038
rootTxn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
30393039
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx)
30403040
require.NoError(t, err)
3041-
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)
3041+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
30423042

30433043
// A LeafTxn is not compatible with locking requests.
30443044
// 1. Locking Get requests.

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1624,7 +1624,7 @@ func TestTxnBasicBufferedWrites(t *testing.T) {
16241624
}
16251625
for _, txnForRead := range []*kv.Txn{
16261626
txn,
1627-
kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, tis),
1627+
kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, tis, nil /* header */),
16281628
} {
16291629
if err = readFn(txnForRead); err != nil {
16301630
return err
@@ -2287,3 +2287,42 @@ func TestTxnBufferedWriteReadYourOwnWrites(t *testing.T) {
22872287
})
22882288
require.NoError(t, err)
22892289
}
2290+
2291+
// TestLeafTransactionAdmissionHeader tests that the admission control header is
2292+
// correctly set for a new leaf txn.
2293+
func TestLeafTransactionAdmissionHeader(t *testing.T) {
2294+
defer leaktest.AfterTest(t)()
2295+
defer log.Scope(t).Close(t)
2296+
2297+
s := createTestDB(t)
2298+
defer s.Stop()
2299+
priorityOptions := []admissionpb.WorkPriority{
2300+
admissionpb.LowPri,
2301+
admissionpb.BulkLowPri,
2302+
admissionpb.UserLowPri,
2303+
admissionpb.BulkNormalPri,
2304+
admissionpb.NormalPri,
2305+
admissionpb.LockingNormalPri,
2306+
admissionpb.UserHighPri,
2307+
admissionpb.LockingUserHighPri,
2308+
admissionpb.HighPri,
2309+
}
2310+
rnd, _ := randutil.NewTestRand()
2311+
priority := priorityOptions[rnd.Intn(len(priorityOptions))]
2312+
2313+
ctx := context.Background()
2314+
rootTxn := kv.NewTxnWithAdmissionControl(
2315+
ctx, s.DB, 0 /* gatewayNodeID */, kvpb.AdmissionHeader_FROM_SQL, priority)
2316+
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx)
2317+
require.NoError(t, err)
2318+
2319+
rootAdmissionHeader := rootTxn.AdmissionHeader()
2320+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, &rootAdmissionHeader)
2321+
leafHeader := leafTxn.AdmissionHeader()
2322+
expectedLeafHeader := kvpb.AdmissionHeader{
2323+
Priority: int32(priority),
2324+
CreateTime: rootAdmissionHeader.CreateTime,
2325+
Source: kvpb.AdmissionHeader_FROM_SQL,
2326+
}
2327+
require.Equal(t, expectedLeafHeader, leafHeader)
2328+
}

pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestStreamerMemoryAccounting(t *testing.T) {
8080
if err != nil {
8181
panic(err)
8282
}
83-
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
83+
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState, nil /* header */)
8484
metrics := MakeMetrics()
8585
s := NewStreamer(
8686
s.DistSenderI().(*kvcoord.DistSender),

pkg/kv/kvclient/kvstreamer/streamer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func getStreamer(
5050
if err != nil {
5151
panic(err)
5252
}
53-
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
53+
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState, nil /* header */)
5454
metrics := kvstreamer.MakeMetrics()
5555
return kvstreamer.NewStreamer(
5656
s.DistSenderI().(*kvcoord.DistSender),

pkg/kv/txn.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,11 @@ func NewTxnFromProto(
231231

232232
// NewLeafTxn instantiates a new leaf transaction.
233233
func NewLeafTxn(
234-
ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID, tis *roachpb.LeafTxnInputState,
234+
ctx context.Context,
235+
db *DB,
236+
gatewayNodeID roachpb.NodeID,
237+
tis *roachpb.LeafTxnInputState,
238+
header *kvpb.AdmissionHeader,
235239
) *Txn {
236240
if db == nil {
237241
panic(errors.WithContextTags(
@@ -246,6 +250,19 @@ func NewLeafTxn(
246250
txn.mu.ID = tis.Txn.ID
247251
txn.mu.userPriority = roachpb.NormalUserPriority
248252
txn.mu.sender = db.factory.LeafTransactionalSender(tis)
253+
if header != nil {
254+
if admissionpb.WorkPriority(header.Priority) != admissionpb.NormalPri {
255+
log.VEventf(ctx, 2,
256+
"initializing leaf txn admission control header with priority: %v",
257+
admissionpb.WorkPriority(header.Priority),
258+
)
259+
}
260+
txn.admissionHeader = kvpb.AdmissionHeader{
261+
CreateTime: header.CreateTime,
262+
Priority: header.Priority,
263+
Source: header.Source,
264+
}
265+
}
249266
return txn
250267
}
251268

pkg/kv/txn_external_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,7 @@ func TestUpdateRootWithLeafFinalStateReadsBelowRefreshTimestamp(t *testing.T) {
11001100
// Fork off a leaf transaction before the root is refreshed.
11011101
leafInputState, err := txn.GetLeafTxnInputState(ctx)
11021102
require.NoError(t, err)
1103-
leafTxn := kv.NewLeafTxn(ctx, db, 0, leafInputState)
1103+
leafTxn := kv.NewLeafTxn(ctx, db, 0, leafInputState, nil /* header */)
11041104

11051105
writeTS, err := performConflictingWrite(ctx, keyB)
11061106
require.NoError(t, err)

pkg/sql/colflow/colbatch_scan_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestColBatchScanMeta(t *testing.T) {
6565
if err != nil {
6666
t.Fatal(err)
6767
}
68-
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
68+
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState, nil /* header */)
6969
flowCtx := execinfra.FlowCtx{
7070
EvalCtx: &evalCtx,
7171
Mon: evalCtx.TestingMon,

pkg/sql/distsql/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ func (ds *ServerImpl) setupFlow(
260260
}
261261
// The flow will run in a LeafTxn because we do not want each distributed
262262
// Txn to heartbeat the transaction.
263-
return kv.NewLeafTxn(ctx, ds.DB.KV(), roachpb.NodeID(req.Flow.Gateway), tis), nil
263+
nodeID := roachpb.NodeID(req.Flow.Gateway)
264+
return kv.NewLeafTxn(ctx, ds.DB.KV(), nodeID, tis, &req.LeafTxnAdmissionHeader), nil
264265
}
265266

266267
var evalCtx *eval.Context

pkg/sql/distsql_running.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,11 @@ func (dsp *DistSQLPlanner) setupFlows(
470470
setupReq.JobTag = jobTag.ValueStr()
471471
}
472472
}
473+
if evalCtx.SessionData().PropagateAdmissionHeaderToLeafTransactions && localState.Txn != nil {
474+
// Propagate the admission control header so that leaf transactions
475+
// correctly inherit it.
476+
setupReq.LeafTxnAdmissionHeader = localState.Txn.AdmissionHeader()
477+
}
473478

474479
var isVectorized bool
475480
if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff {

pkg/sql/exec_util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4178,6 +4178,10 @@ func (m *sessionDataMutator) SetVectorSearchBeamSize(val int32) {
41784178
m.data.VectorSearchBeamSize = val
41794179
}
41804180

4181+
func (m *sessionDataMutator) SetPropagateAdmissionHeaderToLeafTransactions(val bool) {
4182+
m.data.PropagateAdmissionHeaderToLeafTransactions = val
4183+
}
4184+
41814185
// Utility functions related to scrubbing sensitive information on SQL Stats.
41824186

41834187
// quantizeCounts ensures that the Count field in the

0 commit comments

Comments
 (0)