|
| 1 | +// Copyright 2025 The Cockroach Authors. |
| 2 | +// |
| 3 | +// Use of this software is governed by the CockroachDB Software License |
| 4 | +// included in the /LICENSE file. |
| 5 | + |
| 6 | +package kvcoord |
| 7 | + |
| 8 | +import ( |
| 9 | + "context" |
| 10 | + "testing" |
| 11 | + |
| 12 | + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" |
| 13 | + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" |
| 14 | + "github.com/cockroachdb/cockroach/pkg/roachpb" |
| 15 | + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" |
| 16 | + "github.com/cockroachdb/cockroach/pkg/util/interval" |
| 17 | + "github.com/cockroachdb/cockroach/pkg/util/leaktest" |
| 18 | + "github.com/cockroachdb/cockroach/pkg/util/log" |
| 19 | + "github.com/stretchr/testify/require" |
| 20 | +) |
| 21 | + |
| 22 | +// Test that both the txnPipeliner and txnWriteBuffer correctly populate |
| 23 | +// roachpb.LeafTxnInputState with and without the readsTree that narrows down |
| 24 | +// the relevant set of spans. This test covers both interceptor implementations |
| 25 | +// with the same test scenarios. |
| 26 | +func TestTxnInterceptorsPopulateLeafInputState(t *testing.T) { |
| 27 | + defer leaktest.AfterTest(t)() |
| 28 | + defer log.Scope(t).Close(t) |
| 29 | + |
| 30 | + // Set up test keys with gaps to simulate real scenarios. |
| 31 | + keyA := roachpb.Key("table/1/primary/010") |
| 32 | + keyB := roachpb.Key("table/1/primary/025") |
| 33 | + keyC := roachpb.Key("table/1/secondary/030") |
| 34 | + keyD := roachpb.Key("table/2/primary/005") |
| 35 | + keyE := roachpb.Key("table/2/primary/050") |
| 36 | + keyF := roachpb.Key("table/3/primary/100") |
| 37 | + |
| 38 | + // Common test cases for both interceptors. |
| 39 | + testCases := []struct { |
| 40 | + name string |
| 41 | + readsTree interval.Tree |
| 42 | + expectedWrites []roachpb.Key |
| 43 | + }{ |
| 44 | + { |
| 45 | + name: "nil readsTree includes all writes", |
| 46 | + readsTree: nil, |
| 47 | + expectedWrites: []roachpb.Key{keyA, keyB, keyC, keyD, keyE, keyF}, |
| 48 | + }, |
| 49 | + { |
| 50 | + name: "empty readsTree includes no writes", |
| 51 | + readsTree: interval.NewTree(interval.ExclusiveOverlapper), |
| 52 | + expectedWrites: []roachpb.Key{}, |
| 53 | + }, |
| 54 | + { |
| 55 | + name: "table 1 primary reads include table 1 primary writes", |
| 56 | + readsTree: func() interval.Tree { |
| 57 | + tree := interval.NewTree(interval.ExclusiveOverlapper) |
| 58 | + // Read span covering table 1 primary rows 015-030 (includes keyB but |
| 59 | + // not keyA). |
| 60 | + span := roachpb.Span{ |
| 61 | + Key: roachpb.Key("table/1/primary/015"), |
| 62 | + EndKey: roachpb.Key("table/1/primary/030"), |
| 63 | + } |
| 64 | + err := tree.Insert(intervalSpan(span), false) |
| 65 | + require.NoError(t, err) |
| 66 | + return tree |
| 67 | + }(), |
| 68 | + // Only keyB (row 025) is in range 015-030. |
| 69 | + expectedWrites: []roachpb.Key{keyB}, |
| 70 | + }, |
| 71 | + { |
| 72 | + name: "multiple table reads include writes from different tables", |
| 73 | + readsTree: func() interval.Tree { |
| 74 | + tree := interval.NewTree(interval.ExclusiveOverlapper) |
| 75 | + // Read span for table 1 secondary index. |
| 76 | + span1 := roachpb.Span{ |
| 77 | + Key: roachpb.Key("table/1/secondary/"), |
| 78 | + EndKey: roachpb.Key("table/1/secondary/zz"), |
| 79 | + } |
| 80 | + err := tree.Insert(intervalSpan(span1), false) |
| 81 | + require.NoError(t, err) |
| 82 | + // Read span for table 2 primary keys 040-060. |
| 83 | + span2 := roachpb.Span{ |
| 84 | + Key: roachpb.Key("table/2/primary/040"), |
| 85 | + EndKey: roachpb.Key("table/2/primary/060"), |
| 86 | + } |
| 87 | + err = tree.Insert(intervalSpan(span2), false) |
| 88 | + require.NoError(t, err) |
| 89 | + return tree |
| 90 | + }(), |
| 91 | + // Expect keyC (secondary index) and keyE (row 50). |
| 92 | + expectedWrites: []roachpb.Key{keyC, keyE}, |
| 93 | + }, |
| 94 | + { |
| 95 | + name: "read different table entirely", |
| 96 | + readsTree: func() interval.Tree { |
| 97 | + tree := interval.NewTree(interval.ExclusiveOverlapper) |
| 98 | + // Read span for table 4 (which has no writes). |
| 99 | + span := roachpb.Span{ |
| 100 | + Key: roachpb.Key("table/4/"), |
| 101 | + EndKey: roachpb.Key("table/4/zz"), |
| 102 | + } |
| 103 | + err := tree.Insert(intervalSpan(span), false) |
| 104 | + require.NoError(t, err) |
| 105 | + return tree |
| 106 | + }(), |
| 107 | + // No overlapping writes. |
| 108 | + expectedWrites: []roachpb.Key{}, |
| 109 | + }, |
| 110 | + { |
| 111 | + name: "precise single key read", |
| 112 | + readsTree: func() interval.Tree { |
| 113 | + tree := interval.NewTree(interval.ExclusiveOverlapper) |
| 114 | + // Exact point read of keyD. |
| 115 | + span := roachpb.Span{Key: keyD, EndKey: keyD.Next()} |
| 116 | + err := tree.Insert(intervalSpan(span), false) |
| 117 | + require.NoError(t, err) |
| 118 | + return tree |
| 119 | + }(), |
| 120 | + // Only the exact key. |
| 121 | + expectedWrites: []roachpb.Key{keyD}, |
| 122 | + }, |
| 123 | + } |
| 124 | + |
| 125 | + // Set up both interceptors with the same write sets. |
| 126 | + ctx := context.Background() |
| 127 | + tp, _ := makeMockTxnPipeliner(nil /* iter */) |
| 128 | + twb, _, _ := makeMockTxnWriteBuffer(ctx) |
| 129 | + |
| 130 | + // Set up in-flight writes. |
| 131 | + tp.ifWrites.insert(keyA, 3, lock.Intent) |
| 132 | + tp.ifWrites.insert(keyB, 7, lock.Intent) |
| 133 | + tp.ifWrites.insert(keyC, 8, lock.Intent) |
| 134 | + tp.ifWrites.insert(keyD, 12, lock.Intent) |
| 135 | + tp.ifWrites.insert(keyE, 15, lock.Intent) |
| 136 | + tp.ifWrites.insert(keyF, 20, lock.Intent) |
| 137 | + |
| 138 | + // Set up buffered writes, including multiple values for keyB. |
| 139 | + addBufferedWriteForTest(t, &twb, keyA, roachpb.Value{RawBytes: []byte("valueA1")}, 3) |
| 140 | + addBufferedWriteForTest(t, &twb, keyB, roachpb.Value{RawBytes: []byte("valueB1")}, 7) |
| 141 | + addBufferedWriteForTest(t, &twb, keyB, roachpb.Value{RawBytes: []byte("valueB2")}, 8) |
| 142 | + addBufferedWriteForTest(t, &twb, keyC, roachpb.Value{RawBytes: []byte("valueC1")}, 8) |
| 143 | + addBufferedWriteForTest(t, &twb, keyD, roachpb.Value{RawBytes: []byte("valueD1")}, 12) |
| 144 | + addBufferedWriteForTest(t, &twb, keyE, roachpb.Value{RawBytes: []byte("valueE1")}, 15) |
| 145 | + addBufferedWriteForTest(t, &twb, keyF, roachpb.Value{RawBytes: []byte("valueF1")}, 20) |
| 146 | + |
| 147 | + require.Equal(t, 6, tp.ifWrites.len()) |
| 148 | + require.Equal(t, 6, twb.buffer.Len()) |
| 149 | + |
| 150 | + // Test both interceptors with the same test cases. |
| 151 | + for _, tc := range testCases { |
| 152 | + t.Run(tc.name, func(t *testing.T) { |
| 153 | + leafState := &roachpb.LeafTxnInputState{} |
| 154 | + tp.populateLeafInputState(leafState, tc.readsTree) |
| 155 | + twb.populateLeafInputState(leafState, tc.readsTree) |
| 156 | + |
| 157 | + require.Equal(t, len(tc.expectedWrites), len(leafState.InFlightWrites)) |
| 158 | + for i := range tc.expectedWrites { |
| 159 | + require.Equal(t, tc.expectedWrites[i], leafState.InFlightWrites[i].Key) |
| 160 | + require.Equal(t, tc.expectedWrites[i], leafState.BufferedWrites[i].Key) |
| 161 | + require.Greater(t, len(leafState.BufferedWrites[i].Vals), 0) |
| 162 | + |
| 163 | + // For keyB which has multiple writes, verify we get both values. |
| 164 | + if tc.expectedWrites[i].Equal(keyB) && len(tc.expectedWrites) > 0 { |
| 165 | + require.Equal(t, 2, len(leafState.BufferedWrites[i].Vals)) |
| 166 | + } |
| 167 | + } |
| 168 | + }) |
| 169 | + } |
| 170 | + |
| 171 | + // Test edge case: empty writes. |
| 172 | + t.Run("no writes", func(t *testing.T) { |
| 173 | + tpEmpty, _ := makeMockTxnPipeliner(nil /* iter */) |
| 174 | + twbEmpty, _, _ := makeMockTxnWriteBuffer(ctx) |
| 175 | + require.Equal(t, 0, tpEmpty.ifWrites.len()) |
| 176 | + require.Equal(t, 0, twbEmpty.buffer.Len()) |
| 177 | + leafState := &roachpb.LeafTxnInputState{} |
| 178 | + |
| 179 | + // Test with nil readsTree. |
| 180 | + tpEmpty.populateLeafInputState(leafState, nil) |
| 181 | + twbEmpty.populateLeafInputState(leafState, nil) |
| 182 | + require.Empty(t, leafState.InFlightWrites) |
| 183 | + require.Empty(t, leafState.BufferedWrites) |
| 184 | + |
| 185 | + // Test with empty reads tree. |
| 186 | + tree := interval.NewTree(interval.ExclusiveOverlapper) |
| 187 | + tpEmpty.populateLeafInputState(leafState, tree) |
| 188 | + twbEmpty.populateLeafInputState(leafState, tree) |
| 189 | + require.Empty(t, leafState.InFlightWrites) |
| 190 | + require.Empty(t, leafState.BufferedWrites) |
| 191 | + |
| 192 | + // Test with reads in the tree but no writes. |
| 193 | + treeWithReads := interval.NewTree(interval.ExclusiveOverlapper) |
| 194 | + span := roachpb.Span{ |
| 195 | + Key: roachpb.Key("table/1/primary/000"), |
| 196 | + EndKey: roachpb.Key("table/1/primary/999"), |
| 197 | + } |
| 198 | + err := treeWithReads.Insert(intervalSpan(span), false) |
| 199 | + require.NoError(t, err) |
| 200 | + tpEmpty.populateLeafInputState(leafState, treeWithReads) |
| 201 | + twbEmpty.populateLeafInputState(leafState, treeWithReads) |
| 202 | + require.Empty(t, leafState.InFlightWrites) |
| 203 | + require.Empty(t, leafState.BufferedWrites) |
| 204 | + }) |
| 205 | +} |
| 206 | + |
| 207 | +// intervalSpan is a helper for converting roachpb.Span to interval.Interface. |
| 208 | +type intervalSpan roachpb.Span |
| 209 | + |
| 210 | +var _ interval.Interface = intervalSpan{} |
| 211 | + |
| 212 | +func (is intervalSpan) ID() uintptr { return 0 } |
| 213 | +func (is intervalSpan) Range() interval.Range { |
| 214 | + return interval.Range{Start: []byte(is.Key), End: []byte(is.EndKey)} |
| 215 | +} |
| 216 | + |
| 217 | +// addBufferedWriteForTest is a helper function to add buffered writes to |
| 218 | +// the write buffer. |
| 219 | +func addBufferedWriteForTest( |
| 220 | + t *testing.T, twb *txnWriteBuffer, key roachpb.Key, val roachpb.Value, seq enginepb.TxnSeq, |
| 221 | +) { |
| 222 | + t.Helper() |
| 223 | + ctx := context.Background() |
| 224 | + txn := makeTxnProto() |
| 225 | + txn.Sequence = seq |
| 226 | + |
| 227 | + // Create a BatchRequest with a single PutRequest. |
| 228 | + ba := &kvpb.BatchRequest{} |
| 229 | + ba.Header = kvpb.Header{Txn: &txn} |
| 230 | + putReq := &kvpb.PutRequest{ |
| 231 | + RequestHeader: kvpb.RequestHeader{ |
| 232 | + Key: key, |
| 233 | + Sequence: seq, |
| 234 | + }, |
| 235 | + Value: val, |
| 236 | + } |
| 237 | + ba.Add(putReq) |
| 238 | + |
| 239 | + br, pErr := twb.SendLocked(ctx, ba) |
| 240 | + require.Nil(t, pErr) |
| 241 | + require.NotNil(t, br) |
| 242 | +} |
0 commit comments