Skip to content

Commit 07191f9

Browse files
committed
sql,kv: reduce sets of writes included into LeafTxnInputState
This commit reduces the InFlightWrites and BufferedWrites sets that are included in LeafTxnInputState to only those that overlap with the key spans that will be read by the DistSQL processors. We now construct an interval tree that contains all spans read by TableReaders and TableID / IndexID specific key spans read by JoinReaders, InvertedJoiners, and ZigzagJoiners, and all non-overlapping writes are not included into the proto. We skip constructing the reads tree if the txn hasn't performed any writes (avoiding redundant work). We also introduce a session variable that disables this optimization as an escape hatch. All processors have been audited to fall into one of three categories: - if a core might need to have access to the full write sets of the leaf txn, it goes into the "unsafe" category; - if a core is not on the "hot" query path (i.e. part of bulk operations, etc), it goes into the "unoptimized" category; - all other processors are examined more thoroughly in terms of what operations the concrete spec might perform, and it's then decided between "unsafe" and "safe". Let's focus on the last category a bit more. There are two main ways these processors worth optimizing can use the txn: - explicitly, when fetching the data from disk (4 processor cores mentioned above). (Note that vector search processors also use the txn explicitly, but we currently put them into the first category since we don't distribute plans with them.) - implicitly, when evaluating some expression. All processor cores and post-process specs that store `execinfrapb.Expression` are now checked for being "unsafe". Defining what expression is "unsafe" is a bit hand-wavy: if an expression might use the txn somehow (e.g. it's a builtin function call), then we deem it "unsafe". LocalPlanNode gets special treatment too since an arbitrary planNode might use the txn. This commit doesn't perform audit of those, so we mark almost all cases as "unsafe". Release note: None
1 parent 7898c83 commit 07191f9

40 files changed

+879
-48
lines changed

pkg/kv/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ go_library(
3232
"//pkg/util/duration",
3333
"//pkg/util/errorutil/unimplemented",
3434
"//pkg/util/hlc",
35+
"//pkg/util/interval",
3536
"//pkg/util/log",
3637
"//pkg/util/protoutil",
3738
"//pkg/util/retry",

pkg/kv/kvclient/kvcoord/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ go_library(
7878
"//pkg/util/grpcutil",
7979
"//pkg/util/hlc",
8080
"//pkg/util/humanizeutil",
81+
"//pkg/util/interval",
8182
"//pkg/util/iterutil",
8283
"//pkg/util/limit",
8384
"//pkg/util/log",

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/cockroachdb/cockroach/pkg/util/debugutil"
2020
"github.com/cockroachdb/cockroach/pkg/util/envutil"
2121
"github.com/cockroachdb/cockroach/pkg/util/hlc"
22+
"github.com/cockroachdb/cockroach/pkg/util/interval"
2223
"github.com/cockroachdb/cockroach/pkg/util/log"
2324
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2425
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
@@ -191,7 +192,12 @@ type txnInterceptor interface {
191192

192193
// populateLeafInputState populates the given input payload
193194
// for a LeafTxn.
194-
populateLeafInputState(*roachpb.LeafTxnInputState)
195+
//
196+
// readsTree, when non-nil, specifies an interval tree of key spans that
197+
// will be read by the caller. As such, any non-overlapping writes could be
198+
// ignored when populating the LeafTxnInputState. If readsTree is nil, then
199+
// all writes should be included.
200+
populateLeafInputState(*roachpb.LeafTxnInputState, interval.Tree)
195201

196202
// initializeLeaf updates any internal state held inside the interceptor
197203
// from the given LeafTxn input state.
@@ -1407,7 +1413,7 @@ func (tc *TxnCoordSender) Active() bool {
14071413

14081414
// GetLeafTxnInputState is part of the kv.TxnSender interface.
14091415
func (tc *TxnCoordSender) GetLeafTxnInputState(
1410-
ctx context.Context,
1416+
ctx context.Context, readsTree interval.Tree,
14111417
) (*roachpb.LeafTxnInputState, error) {
14121418
tis := new(roachpb.LeafTxnInputState)
14131419
tc.mu.Lock()
@@ -1421,7 +1427,7 @@ func (tc *TxnCoordSender) GetLeafTxnInputState(
14211427
// Copy mutable state so access is safe for the caller.
14221428
tis.Txn = tc.mu.txn
14231429
for _, reqInt := range tc.interceptorStack {
1424-
reqInt.populateLeafInputState(tis)
1430+
reqInt.populateLeafInputState(tis, readsTree)
14251431
}
14261432

14271433
// Also mark the TxnCoordSender as "active". This prevents changing
@@ -1693,6 +1699,7 @@ func (tc *TxnCoordSender) TestingShouldRetry() bool {
16931699
return false
16941700
}
16951701

1702+
// TODO(148760): this doesn't work under Read Committed isolation.
16961703
func (tc *TxnCoordSender) hasPerformedReadsLocked() bool {
16971704
return !tc.interceptorAlloc.txnSpanRefresher.refreshFootprint.empty()
16981705
}

pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,7 +1004,7 @@ func TestTxnMultipleCoord(t *testing.T) {
10041004
}
10051005

10061006
// New create a second, leaf coordinator.
1007-
leafInputState, err := txn.GetLeafTxnInputState(ctx)
1007+
leafInputState, err := txn.GetLeafTxnInputState(ctx, nil /* readsTree */)
10081008
require.NoError(t, err)
10091009
txn2 := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
10101010

@@ -2739,7 +2739,7 @@ func TestLeafTxnClientRejectError(t *testing.T) {
27392739

27402740
ctx := context.Background()
27412741
rootTxn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
2742-
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx)
2742+
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx, nil /* readsTree */)
27432743
require.NoError(t, err)
27442744

27452745
// New create a second, leaf coordinator.
@@ -2771,7 +2771,7 @@ func TestUpdateRootWithLeafFinalStateInAbortedTxn(t *testing.T) {
27712771
ctx := context.Background()
27722772

27732773
txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
2774-
leafInputState, err := txn.GetLeafTxnInputState(ctx)
2774+
leafInputState, err := txn.GetLeafTxnInputState(ctx, nil /* readsTree */)
27752775
require.NoError(t, err)
27762776
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0, leafInputState, nil /* header */)
27772777

@@ -2785,7 +2785,7 @@ func TestUpdateRootWithLeafFinalStateInAbortedTxn(t *testing.T) {
27852785
}
27862786

27872787
// Check that the transaction was not updated.
2788-
leafInputState2, err := txn.GetLeafTxnInputState(ctx)
2788+
leafInputState2, err := txn.GetLeafTxnInputState(ctx, nil /* readsTree */)
27892789
require.NoError(t, err)
27902790
if leafInputState2.Txn.Status != roachpb.PENDING {
27912791
t.Fatalf("expected PENDING txn, got: %s", leafInputState2.Txn.Status)
@@ -2977,7 +2977,7 @@ func TestTxnTypeCompatibleWithBatchRequest(t *testing.T) {
29772977
defer s.Stop()
29782978

29792979
rootTxn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
2980-
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx)
2980+
leafInputState, err := rootTxn.GetLeafTxnInputState(ctx, nil /* readsTree */)
29812981
require.NoError(t, err)
29822982
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState, nil /* header */)
29832983

pkg/kv/kvclient/kvcoord/txn_interceptor_committer.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/settings"
1717
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1818
"github.com/cockroachdb/cockroach/pkg/util"
19+
"github.com/cockroachdb/cockroach/pkg/util/interval"
1920
"github.com/cockroachdb/cockroach/pkg/util/log"
2021
"github.com/cockroachdb/cockroach/pkg/util/stop"
2122
"github.com/cockroachdb/errors"
@@ -611,7 +612,7 @@ func (tc *txnCommitter) maybeDisable1PC(ba *kvpb.BatchRequest) {
611612
func (tc *txnCommitter) setWrapped(wrapped lockedSender) { tc.wrapped = wrapped }
612613

613614
// populateLeafInputState is part of the txnInterceptor interface.
614-
func (*txnCommitter) populateLeafInputState(*roachpb.LeafTxnInputState) {}
615+
func (*txnCommitter) populateLeafInputState(*roachpb.LeafTxnInputState, interval.Tree) {}
615616

616617
// initializeLeaf is part of the txnInterceptor interface.
617618
func (*txnCommitter) initializeLeaf(tis *roachpb.LeafTxnInputState) {}

pkg/kv/kvclient/kvcoord/txn_interceptor_heartbeater.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/cockroachdb/cockroach/pkg/settings"
1818
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1919
"github.com/cockroachdb/cockroach/pkg/util/hlc"
20+
"github.com/cockroachdb/cockroach/pkg/util/interval"
2021
"github.com/cockroachdb/cockroach/pkg/util/log"
2122
"github.com/cockroachdb/cockroach/pkg/util/metamorphic"
2223
"github.com/cockroachdb/cockroach/pkg/util/randutil"
@@ -283,7 +284,7 @@ func (h *txnHeartbeater) setWrapped(wrapped lockedSender) {
283284
}
284285

285286
// populateLeafInputState is part of the txnInterceptor interface.
286-
func (*txnHeartbeater) populateLeafInputState(*roachpb.LeafTxnInputState) {}
287+
func (*txnHeartbeater) populateLeafInputState(*roachpb.LeafTxnInputState, interval.Tree) {}
287288

288289
// initializeLeaf is part of the txnInterceptor interface.
289290
func (*txnHeartbeater) initializeLeaf(tis *roachpb.LeafTxnInputState) {}

pkg/kv/kvclient/kvcoord/txn_interceptor_metric_recorder.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1313
"github.com/cockroachdb/cockroach/pkg/roachpb"
14+
"github.com/cockroachdb/cockroach/pkg/util/interval"
1415
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
1516
)
1617

@@ -59,7 +60,7 @@ func (m *txnMetricRecorder) SendLocked(
5960
func (m *txnMetricRecorder) setWrapped(wrapped lockedSender) { m.wrapped = wrapped }
6061

6162
// populateLeafInputState is part of the txnInterceptor interface.
62-
func (*txnMetricRecorder) populateLeafInputState(*roachpb.LeafTxnInputState) {}
63+
func (*txnMetricRecorder) populateLeafInputState(*roachpb.LeafTxnInputState, interval.Tree) {}
6364

6465
// initializeLeaf is part of the txnInterceptor interface.
6566
func (*txnMetricRecorder) initializeLeaf(tis *roachpb.LeafTxnInputState) {}

pkg/kv/kvclient/kvcoord/txn_interceptor_pipeliner.go

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
1919
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
2020
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
21+
"github.com/cockroachdb/cockroach/pkg/util/interval"
2122
"github.com/cockroachdb/cockroach/pkg/util/log"
2223
"github.com/cockroachdb/errors"
2324
"github.com/cockroachdb/redact"
@@ -947,8 +948,26 @@ func (tp *txnPipeliner) setWrapped(wrapped lockedSender) {
947948
}
948949

949950
// populateLeafInputState is part of the txnInterceptor interface.
950-
func (tp *txnPipeliner) populateLeafInputState(tis *roachpb.LeafTxnInputState) {
951-
tis.InFlightWrites = tp.ifWrites.asSlice()
951+
func (tp *txnPipeliner) populateLeafInputState(
952+
tis *roachpb.LeafTxnInputState, readsTree interval.Tree,
953+
) {
954+
if tp.ifWrites.len() == 0 {
955+
return
956+
}
957+
if readsTree == nil {
958+
tis.InFlightWrites = tp.ifWrites.asSlice()
959+
return
960+
}
961+
var sp roachpb.Span
962+
tp.ifWrites.ascend(func(w *inFlightWrite) {
963+
sp.Key = w.Key
964+
sp.EndKey = w.Key.Next()
965+
if overlaps := readsTree.DoMatching(
966+
func(interval.Interface) (done bool) { return true }, sp.AsRange(),
967+
); overlaps {
968+
tis.InFlightWrites = append(tis.InFlightWrites, w.SequencedWrite)
969+
}
970+
})
952971
}
953972

954973
// initializeLeaf is part of the txnInterceptor interface.

pkg/kv/kvclient/kvcoord/txn_interceptor_seq_num_allocator.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1313
"github.com/cockroachdb/cockroach/pkg/roachpb"
1414
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
15+
"github.com/cockroachdb/cockroach/pkg/util/interval"
1516
"github.com/cockroachdb/errors"
1617
)
1718

@@ -116,7 +117,9 @@ func (s *txnSeqNumAllocator) SendLocked(
116117
func (s *txnSeqNumAllocator) setWrapped(wrapped lockedSender) { s.wrapped = wrapped }
117118

118119
// populateLeafInputState is part of the txnInterceptor interface.
119-
func (s *txnSeqNumAllocator) populateLeafInputState(tis *roachpb.LeafTxnInputState) {
120+
func (s *txnSeqNumAllocator) populateLeafInputState(
121+
tis *roachpb.LeafTxnInputState, tree interval.Tree,
122+
) {
120123
tis.Txn.Sequence = s.writeSeq
121124
tis.SteppingModeEnabled = bool(s.steppingMode)
122125
tis.ReadSeqNum = s.readSeq

pkg/kv/kvclient/kvcoord/txn_interceptor_span_refresher.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1616
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
1717
"github.com/cockroachdb/cockroach/pkg/util/hlc"
18+
"github.com/cockroachdb/cockroach/pkg/util/interval"
1819
"github.com/cockroachdb/cockroach/pkg/util/log"
1920
"github.com/cockroachdb/errors"
2021
"github.com/cockroachdb/redact"
@@ -712,7 +713,9 @@ func (sr *txnSpanRefresher) maxRefreshAttempts() int {
712713
func (sr *txnSpanRefresher) setWrapped(wrapped lockedSender) { sr.wrapped = wrapped }
713714

714715
// populateLeafInputState is part of the txnInterceptor interface.
715-
func (sr *txnSpanRefresher) populateLeafInputState(tis *roachpb.LeafTxnInputState) {
716+
func (sr *txnSpanRefresher) populateLeafInputState(
717+
tis *roachpb.LeafTxnInputState, tree interval.Tree,
718+
) {
716719
tis.RefreshInvalid = sr.refreshInvalid
717720
}
718721

0 commit comments

Comments
 (0)