Skip to content

Commit c7159d5

Browse files
committed
kv: set the admission control header for leaf transactions
This commit propagates the admission control header from the root transaction to any leaf transactions it creates. This ensures that users of leaf transactions (e.g. streamer, DistSQL, FK checks) correctly respect the user's requested quality of service. Fixes #144421 Release note (bug fix): Fixed a bug that could cause queries that perform work in parallel to ignore the requested quality-of-service level. Affected operations include lookup joins, DistSQL execution, and foreign-key checks.
1 parent 4d33777 commit c7159d5

File tree

16 files changed

+109
-16
lines changed

16 files changed

+109
-16
lines changed

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1065,7 +1065,7 @@ func TestTxnMultipleCoord(t *testing.T) {
10651065
// New create a second, leaf coordinator.
10661066
leafInputState, err := txn.GetLeafTxnInputState(ctx)
10671067
require.NoError(t, err)
1068-
txn2 := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)
1068+
txn2 := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
10691069

10701070
// Start the second transaction.
10711071
key2 := roachpb.Key("b")
@@ -2802,7 +2802,7 @@ func TestLeafTxnClientRejectError(t *testing.T) {
28022802
require.NoError(t, err)
28032803

28042804
// New create a second, leaf coordinator.
2805-
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)
2805+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
28062806

28072807
if _, err := leafTxn.Get(ctx, errKey); !testutils.IsError(err, "TransactionAbortedError") {
28082808
t.Fatalf("expected injected err, got: %v", err)
@@ -2832,7 +2832,7 @@ func TestUpdateRootWithLeafFinalStateInAbortedTxn(t *testing.T) {
28322832
txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
28332833
leafInputState, err := txn.GetLeafTxnInputState(ctx)
28342834
require.NoError(t, err)
2835-
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0, leafInputState)
2835+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0, leafInputState, nil /* header */)
28362836

28372837
finalState, err := leafTxn.GetLeafTxnFinalState(ctx)
28382838
if err != nil {
@@ -3038,7 +3038,7 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) {
30383038
rootTxn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
30393039
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx)
30403040
require.NoError(t, err)
3041-
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)
3041+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
30423042

30433043
// A LeafTxn is not compatible with locking requests.
30443044
// 1. Locking Get requests.

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1624,7 +1624,7 @@ func TestTxnBasicBufferedWrites(t *testing.T) {
16241624
}
16251625
for _, txnForRead := range []*kv.Txn{
16261626
txn,
1627-
kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, tis),
1627+
kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, tis, nil /* header */),
16281628
} {
16291629
if err = readFn(txnForRead); err != nil {
16301630
return err
@@ -2287,3 +2287,42 @@ func TestTxnBufferedWriteReadYourOwnWrites(t *testing.T) {
22872287
})
22882288
require.NoError(t, err)
22892289
}
2290+
2291+
// TestLeafTransactionAdmissionHeader tests that the admission control header is
2292+
// correctly set for a new leaf txn.
2293+
func TestLeafTransactionAdmissionHeader(t *testing.T) {
2294+
defer leaktest.AfterTest(t)()
2295+
defer log.Scope(t).Close(t)
2296+
2297+
s := createTestDB(t)
2298+
defer s.Stop()
2299+
priorityOptions := []admissionpb.WorkPriority{
2300+
admissionpb.LowPri,
2301+
admissionpb.BulkLowPri,
2302+
admissionpb.UserLowPri,
2303+
admissionpb.BulkNormalPri,
2304+
admissionpb.NormalPri,
2305+
admissionpb.LockingNormalPri,
2306+
admissionpb.UserHighPri,
2307+
admissionpb.LockingUserHighPri,
2308+
admissionpb.HighPri,
2309+
}
2310+
rnd, _ := randutil.NewTestRand()
2311+
priority := priorityOptions[rnd.Intn(len(priorityOptions))]
2312+
2313+
ctx := context.Background()
2314+
rootTxn := kv.NewTxnWithAdmissionControl(
2315+
ctx, s.DB, 0 /* gatewayNodeID */, kvpb.AdmissionHeader_FROM_SQL, priority)
2316+
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx)
2317+
require.NoError(t, err)
2318+
2319+
rootAdmissionHeader := rootTxn.AdmissionHeader()
2320+
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, &rootAdmissionHeader)
2321+
leafHeader := leafTxn.AdmissionHeader()
2322+
expectedLeafHeader := kvpb.AdmissionHeader{
2323+
Priority: int32(priority),
2324+
CreateTime: rootAdmissionHeader.CreateTime,
2325+
Source: kvpb.AdmissionHeader_FROM_SQL,
2326+
}
2327+
require.Equal(t, expectedLeafHeader, leafHeader)
2328+
}

pkg/kv/kvclient/kvstreamer/streamer_accounting_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func TestStreamerMemoryAccounting(t *testing.T) {
8080
if err != nil {
8181
panic(err)
8282
}
83-
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
83+
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState, nil /* header */)
8484
metrics := MakeMetrics()
8585
s := NewStreamer(
8686
s.DistSenderI().(*kvcoord.DistSender),

pkg/kv/kvclient/kvstreamer/streamer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func getStreamer(
5050
if err != nil {
5151
panic(err)
5252
}
53-
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
53+
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState, nil /* header */)
5454
metrics := kvstreamer.MakeMetrics()
5555
return kvstreamer.NewStreamer(
5656
s.DistSenderI().(*kvcoord.DistSender),

pkg/kv/txn.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,11 @@ func NewTxnFromProto(
231231

232232
// NewLeafTxn instantiates a new leaf transaction.
233233
func NewLeafTxn(
234-
ctx context.Context, db *DB, gatewayNodeID roachpb.NodeID, tis *roachpb.LeafTxnInputState,
234+
ctx context.Context,
235+
db *DB,
236+
gatewayNodeID roachpb.NodeID,
237+
tis *roachpb.LeafTxnInputState,
238+
header *kvpb.AdmissionHeader,
235239
) *Txn {
236240
if db == nil {
237241
panic(errors.WithContextTags(
@@ -246,6 +250,19 @@ func NewLeafTxn(
246250
txn.mu.ID = tis.Txn.ID
247251
txn.mu.userPriority = roachpb.NormalUserPriority
248252
txn.mu.sender = db.factory.LeafTransactionalSender(tis)
253+
if header != nil {
254+
if admissionpb.WorkPriority(header.Priority) != admissionpb.NormalPri {
255+
log.VEventf(ctx, 2,
256+
"initializing leaf txn admission control header with priority: %v",
257+
admissionpb.WorkPriority(header.Priority),
258+
)
259+
}
260+
txn.admissionHeader = kvpb.AdmissionHeader{
261+
CreateTime: header.CreateTime,
262+
Priority: header.Priority,
263+
Source: header.Source,
264+
}
265+
}
249266
return txn
250267
}
251268

pkg/kv/txn_external_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1100,7 +1100,7 @@ func TestUpdateRootWithLeafFinalStateReadsBelowRefreshTimestamp(t *testing.T) {
11001100
// Fork off a leaf transaction before the root is refreshed.
11011101
leafInputState, err := txn.GetLeafTxnInputState(ctx)
11021102
require.NoError(t, err)
1103-
leafTxn := kv.NewLeafTxn(ctx, db, 0, leafInputState)
1103+
leafTxn := kv.NewLeafTxn(ctx, db, 0, leafInputState, nil /* header */)
11041104

11051105
writeTS, err := performConflictingWrite(ctx, keyB)
11061106
require.NoError(t, err)

pkg/sql/colflow/colbatch_scan_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestColBatchScanMeta(t *testing.T) {
6565
if err != nil {
6666
t.Fatal(err)
6767
}
68-
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState)
68+
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.DistSQLPlanningNodeID(), leafInputState, nil /* header */)
6969
flowCtx := execinfra.FlowCtx{
7070
EvalCtx: &evalCtx,
7171
Mon: evalCtx.TestingMon,

pkg/sql/distsql/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ func (ds *ServerImpl) setupFlow(
260260
}
261261
// The flow will run in a LeafTxn because we do not want each distributed
262262
// Txn to heartbeat the transaction.
263-
return kv.NewLeafTxn(ctx, ds.DB.KV(), roachpb.NodeID(req.Flow.Gateway), tis), nil
263+
nodeID := roachpb.NodeID(req.Flow.Gateway)
264+
return kv.NewLeafTxn(ctx, ds.DB.KV(), nodeID, tis, &req.LeafTxnAdmissionHeader), nil
264265
}
265266

266267
var evalCtx *eval.Context

pkg/sql/distsql_running.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,11 @@ func (dsp *DistSQLPlanner) setupFlows(
470470
setupReq.JobTag = jobTag.ValueStr()
471471
}
472472
}
473+
if localState.Txn != nil {
474+
// Propagate the admission control header so that leaf transactions
475+
// correctly inherit it.
476+
setupReq.LeafTxnAdmissionHeader = localState.Txn.AdmissionHeader()
477+
}
473478

474479
var isVectorized bool
475480
if vectorizeMode := evalCtx.SessionData().VectorizeMode; vectorizeMode != sessiondatapb.VectorizeOff {

pkg/sql/execinfrapb/api.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ option go_package = "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb";
1515
import "gogoproto/gogo.proto";
1616
import "google/protobuf/timestamp.proto";
1717

18+
import "kv/kvpb/api.proto";
1819
import "roachpb/data.proto";
1920
import "sql/execinfrapb/data.proto";
2021
import "sql/execinfrapb/processors.proto";
@@ -35,6 +36,10 @@ message SetupFlowRequest {
3536
// flows expect to run in a txn, but some, like backfills, don't.
3637
optional roachpb.LeafTxnInputState leaf_txn_input_state = 7;
3738

39+
// LeafTxnAdmissionHeader is used to initialize the admission control
40+
// header for the flow's txn, if LeafTxnInputState is set.
41+
optional roachpb.AdmissionHeader leaf_txn_admission_header = 12 [(gogoproto.nullable) = false];
42+
3843
// Version of distsqlrun protocol; a server accepts a certain range of
3944
// versions, up to its own version. See server.go for more details.
4045
optional uint32 version = 5 [(gogoproto.nullable) = false,

0 commit comments

Comments
 (0)