Skip to content

Commit a518c68

Browse files
committed
kvclient: serve reads from the buffer on the response path
Previously, we were serving reads from the buffer when transforming a batch. Because of the last commit, this meant that any writes from earlier in the batch that needed to be added to the buffer would be added at response time, so the reads served on the request path are no longer correct. This patch fixes the issue by serving reads from the buffer on the response path. Currently, we're not being smart about omitting Get requests for keys that are reading keys that were written to earlier in the batch. We could change this behaviour in the future if we think its worthwhile. SQL never constructs batches that read and write to the same key, so this is likely not a big deal for now. Epic: none Release note: None
1 parent 988f4fa commit a518c68

File tree

2 files changed

+141
-42
lines changed

2 files changed

+141
-42
lines changed

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 51 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -424,7 +424,9 @@ func (twb *txnWriteBuffer) adjustError(
424424
baIdx := int32(0)
425425
for i := range numOriginalRequests {
426426
if len(ts) > 0 && ts[0].index == i {
427-
if ts[0].stripped {
427+
curTs := ts[0]
428+
ts = ts[1:]
429+
if curTs.stripped {
428430
numStripped++
429431
} else {
430432
// This is a transformed request (for example a LockingGet that was
@@ -433,14 +435,19 @@ func (twb *txnWriteBuffer) adjustError(
433435
// exactly the request the user sent.
434436
//
435437
// For now, we handle this by logging and removing the error index.
436-
if baIdx == pErr.Index.Index {
438+
//
439+
// [1] Get requests are always collected as transformations, but
440+
// they're never transformed. Attributing an error to them shouldn't
441+
// confuse the client.
442+
if baIdx == pErr.Index.Index && curTs.origRequest.Method() != kvpb.Get {
437443
log.Warningf(ctx, "error index %d is part of a transformed request", pErr.Index.Index)
438444
pErr.Index = nil
439445
return pErr
440446
}
441447
}
442-
ts = ts[1:]
443-
continue
448+
if curTs.origRequest.Method() != kvpb.Get {
449+
continue
450+
}
444451
}
445452
if baIdx == pErr.Index.Index {
446453
break
@@ -713,42 +720,43 @@ func (twb *txnWriteBuffer) applyTransformations(
713720

714721
case *kvpb.GetRequest:
715722
// If the key is in the buffer, we must serve the read from the buffer.
716-
val, served := twb.maybeServeRead(t.Key, t.Sequence)
723+
// The actual serving of the read will happen on the response path though.
724+
stripped := false
725+
_, served := twb.maybeServeRead(t.Key, t.Sequence)
717726
if served {
718-
log.VEventf(ctx, 2, "serving %s on key %s from the buffer", t.Method(), t.Key)
719-
var resp kvpb.ResponseUnion
720-
getResp := &kvpb.GetResponse{}
721-
if val.IsPresent() {
722-
getResp.Value = val
723-
}
724-
resp.MustSetInner(getResp)
725-
726-
stripped := true
727727
if t.KeyLockingStrength != lock.None {
728728
// Even though the Get request must be served from the buffer, as the
729729
// transaction performed a previous write to the key, we still need to
730730
// acquire a lock at the leaseholder. As a result, we can't strip the
731-
// request from the batch.
731+
// request from the remote batch.
732732
//
733733
// TODO(arul): we could eschew sending this request if we knew there
734734
// was a sufficiently strong lock already present on the key.
735-
stripped = false
735+
log.VEventf(ctx, 2, "locking %s on key %s must be sent to the server", t.Method(), t.Key)
736736
baRemote.Requests = append(baRemote.Requests, ru)
737+
} else {
738+
// We'll synthesize the response from the buffer on the response path;
739+
// eschew sending the request to the KV layer as we don't need to
740+
// acquire a lock.
741+
stripped = true
742+
log.VEventf(
743+
ctx, 2, "non-locking %s on key %s can be fully served by the client; not sending to KV", t.Method(), t.Key,
744+
)
737745
}
738-
739-
ts = append(ts, transformation{
740-
stripped: stripped,
741-
index: i,
742-
origRequest: req,
743-
resp: resp,
744-
})
745-
// We've constructed a response that we'll stitch together with the
746-
// result on the response path; eschew sending the request to the KV
747-
// layer.
748-
continue
746+
} else {
747+
// Wasn't served locally; send the request to the KV layer.
748+
baRemote.Requests = append(baRemote.Requests, ru)
749749
}
750-
// Wasn't served locally; send the request to the KV layer.
751-
baRemote.Requests = append(baRemote.Requests, ru)
750+
// Even if the request wasn't served from the buffer here, we still track
751+
// a transformation for it. That's because we haven't buffered any writes
752+
// from our current batch in the buffer yet, so checking the buffer above
753+
// isn't sufficient to determine whether the request needs to serve a read
754+
// from the buffer before returning a response or not.
755+
ts = append(ts, transformation{
756+
stripped: stripped,
757+
index: i,
758+
origRequest: req,
759+
})
752760

753761
case *kvpb.ScanRequest:
754762
overlaps := twb.scanOverlaps(t.Key, t.EndKey)
@@ -1054,12 +1062,6 @@ type transformation struct {
10541062
index int
10551063
// origRequest is the original request that was transformed.
10561064
origRequest kvpb.Request
1057-
// resp is locally produced response that needs to be merged with any
1058-
// responses returned by the KV layer. This is set for requests that can be
1059-
// evaluated locally (e.g. blind writes, reads that can be served entirely
1060-
// from the buffer). Must be set if stripped is true, but the converse doesn't
1061-
// hold.
1062-
resp kvpb.ResponseUnion
10631065
}
10641066

10651067
// toResp returns the response that should be added to the batch response as
@@ -1142,13 +1144,20 @@ func (t transformation) toResp(
11421144
twb.addToBuffer(req.Key, roachpb.Value{}, req.Sequence)
11431145

11441146
case *kvpb.GetRequest:
1145-
// Get requests must be served from the local buffer if a transaction
1146-
// performed a previous write to the key being read. However, Get
1147-
// requests must be sent to the KV layer (i.e. not be stripped) iff they
1148-
// are locking in nature.
1149-
assertTrue(t.stripped == (req.KeyLockingStrength == lock.None),
1150-
"Get requests should either be stripped or be locking")
1151-
ru = t.resp
1147+
val, served := twb.maybeServeRead(req.Key, req.Sequence)
1148+
if served {
1149+
getResp := &kvpb.GetResponse{}
1150+
if val.IsPresent() {
1151+
getResp.Value = val
1152+
}
1153+
ru.MustSetInner(getResp)
1154+
log.VEventf(ctx, 2, "serving %s on key %s from the buffer", req.Method(), req.Key)
1155+
} else {
1156+
// The request wasn't served from the buffer; return the response from the
1157+
// KV layer.
1158+
assertTrue(!t.stripped, "we shouldn't be stripping requests that aren't served from the buffer")
1159+
ru = br
1160+
}
11521161

11531162
case *kvpb.ScanRequest:
11541163
scanResp, err := twb.mergeWithScanResp(

pkg/kv/kvclient/kvcoord/txn_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ import (
4242
"github.com/stretchr/testify/require"
4343
)
4444

45+
func checkGetResults(t *testing.T, expected map[string][]byte, results ...kv.Result) {
46+
for _, result := range results {
47+
require.Equal(t, 1, len(result.Rows))
48+
require.Equal(t, expected[string(result.Rows[0].Key)], result.Rows[0].ValueBytes())
49+
}
50+
require.Len(t, expected, len(results))
51+
}
52+
4553
// TestTxnDBBasics verifies that a simple transaction can be run and
4654
// either committed or aborted. On commit, mutations are visible; on
4755
// abort, mutations are never visible. During the txn, verify that
@@ -2161,3 +2169,85 @@ func TestTxnBufferedWriteRetriesCorrectly(t *testing.T) {
21612169
require.Equal(t, 1, len(keys))
21622170
})
21632171
}
2172+
2173+
// TestTxnBufferedWriteReadYourOwnWrites tests that read-your-own-writes are
2174+
// served correctly from the transaction's buffer. We test two cases:
2175+
// 1. The write that needs to be observed was part of an earlier batch.
2176+
// 2. The write that needs to be observed was part of the same batch.
2177+
func TestTxnBufferedWriteReadYourOwnWrites(t *testing.T) {
2178+
defer leaktest.AfterTest(t)()
2179+
defer log.Scope(t).Close(t)
2180+
ctx := context.Background()
2181+
2182+
s := createTestDB(t)
2183+
defer s.Stop()
2184+
2185+
value1 := []byte("value1")
2186+
value21 := []byte("value21")
2187+
value22 := []byte("value22")
2188+
value3 := []byte("value3")
2189+
2190+
keyA := []byte("keyA")
2191+
keyB := []byte("keyB")
2192+
keyC := []byte("keyC")
2193+
2194+
// Before the test begins, write a value to keyC.
2195+
txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
2196+
require.NoError(t, txn.Put(ctx, keyC, value3))
2197+
require.NoError(t, txn.Commit(ctx))
2198+
2199+
err := s.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
2200+
txn.SetBufferedWritesEnabled(true)
2201+
2202+
// Put transactional value at keyA.
2203+
if err := txn.Put(ctx, keyA, value1); err != nil {
2204+
return err
2205+
}
2206+
2207+
// Construct a batch that contains two Gets -- one on keyA, which will be
2208+
// served from the buffer, and another on keyC, which will be served by
2209+
// the server.
2210+
b := txn.NewBatch()
2211+
b.Get(keyA)
2212+
b.Get(keyC)
2213+
if err := txn.Run(ctx, b); err != nil {
2214+
return err
2215+
}
2216+
expected := map[string][]byte{
2217+
"keyA": value1,
2218+
"keyC": value3,
2219+
}
2220+
checkGetResults(t, expected, b.Results...)
2221+
2222+
// Next, construct a batch that contains both Puts and Gets to keyB. The Get
2223+
// should see the value written by the Put preceding it in the batch.
2224+
b = txn.NewBatch()
2225+
b.Get(keyB)
2226+
b.Put(keyB, value21)
2227+
b.Get(keyB)
2228+
b.Put(keyB, value22)
2229+
b.Get(keyB)
2230+
2231+
if err := txn.Run(ctx, b); err != nil {
2232+
return err
2233+
}
2234+
checkGetResults(t, map[string][]byte{
2235+
"keyB": nil,
2236+
},
2237+
b.Results[0],
2238+
)
2239+
checkGetResults(t, map[string][]byte{
2240+
"keyB": value21,
2241+
},
2242+
b.Results[2],
2243+
)
2244+
checkGetResults(t, map[string][]byte{
2245+
"keyB": value22,
2246+
},
2247+
b.Results[4],
2248+
)
2249+
2250+
return nil
2251+
})
2252+
require.NoError(t, err)
2253+
}

0 commit comments

Comments
 (0)