Skip to content

Commit 76e7d25

Browse files
craig[bot]normanchennstevendannawenyihu6
committed
144509: jsonpath: refactor string representation r=normanchenn a=normanchenn This commit refactors the JSONPath string representation logic to more closely match Postgres' output format. This new implementation notably removes unnecessary parentheses around operations based on operation precedence. Epic: None Release note: None 144872: kvclient: flush txnWriteBuffer if IncrementRequest is received r=stevendanna a=stevendanna Previously we issued an error on IncrementRequest assuming this was only used in (1) transactions that have already disabled the write buffer because of DDL statements or (2) non-transactional requests. However, it is also used by any test that sets UseTransactionalDescIDGenerator to true. While we could call txn.SetBufferedWritesEnabled from the transactional ID generator itself, I opted to simply flush the buffer if we encounter an IncrementRequest so that IncrementRequest does not have a hidden requirement of disabling write buffering. Epic: none Release note: None 144944: changefeedccl: skip TestChangefeedJobUpdateFailsIfNotClaimed r=msbutler a=wenyihu6 This commit skips TestChangefeedJobUpdateFailsIfNotClaimed since it is causing CI to flake. Epic: none Release note: none Co-authored-by: Norman Chen <[email protected]> Co-authored-by: Steven Danna <[email protected]> Co-authored-by: wenyihu6 <[email protected]>
4 parents ba001bb + e675d57 + fad25b4 + 854fe58 commit 76e7d25

File tree

11 files changed

+377
-226
lines changed

11 files changed

+377
-226
lines changed

pkg/ccl/changefeedccl/changefeed_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6132,6 +6132,8 @@ func TestChangefeedJobUpdateFailsIfNotClaimed(t *testing.T) {
61326132
defer leaktest.AfterTest(t)()
61336133
defer log.Scope(t).Close(t)
61346134

6135+
skip.WithIssue(t, 144912)
6136+
61356137
// Set TestingKnobs to return a known session for easier
61366138
// comparison.
61376139
adoptionInterval := 20 * time.Minute

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -217,31 +217,7 @@ func (twb *txnWriteBuffer) SendLocked(
217217
return twb.flushBufferAndSendBatch(ctx, ba)
218218
}
219219

220-
if _, ok := ba.GetArg(kvpb.DeleteRange); ok {
221-
log.VEventf(ctx, 2, "DeleteRangeRequest forcing flush of write buffer")
222-
// DeleteRange requests can delete an arbitrary number of keys over a
223-
// given keyspan. We won't know the exact scope of the delete until
224-
// we've scanned the keyspan, which must happen on the server. We've got
225-
// a couple of options here:
226-
// 1. We decompose the DeleteRange request into a (potentially locking)
227-
// Scan followed by buffered point Deletes for each key in the scan's
228-
// result.
229-
// 2. We flush the buffer[1] and send the DeleteRange request to the KV
230-
// layer.
231-
//
232-
// We choose option 2, as typically the number of keys deleted is large,
233-
// and we may realize we're over budget after performing the initial
234-
// scan of the keyspan. At that point, we'll have to flush the buffer
235-
// anyway. Moreover, buffered writes are most impactful when a
236-
// transaction is writing to a small number of keys. As such, it's fine
237-
// to not optimize the DeleteRange case, as typically it results in a
238-
// large writing transaction.
239-
//
240-
// [1] Technically, we only need to flush the overlapping portion of the
241-
// buffer. However, for simplicity, the txnWriteBuffer doesn't support
242-
// transactions with partially buffered writes and partially flushed
243-
// writes. We could change this in the future if there's benefit to
244-
// doing so.
220+
if twb.batchRequiresFlush(ctx, ba) {
245221
return twb.flushBufferAndSendBatch(ctx, ba)
246222
}
247223

@@ -295,6 +271,55 @@ func (twb *txnWriteBuffer) SendLocked(
295271
return twb.mergeResponseWithTransformations(ctx, ts, br)
296272
}
297273

274+
func (twb *txnWriteBuffer) batchRequiresFlush(ctx context.Context, ba *kvpb.BatchRequest) bool {
275+
for _, ru := range ba.Requests {
276+
req := ru.GetInner()
277+
switch req.(type) {
278+
case *kvpb.IncrementRequest:
279+
// We don't typically see IncrementRequest in transactional batches that
280+
// haven't already had write buffering disabled becuase of DDL statements.
281+
//
282+
// However, we do have at least a few users of the NewTransactionalGenerator
283+
// in test code and builtins.
284+
//
285+
// We could handle this similar to how we handle ConditionalPut, but its
286+
// not clear there is much value in that.
287+
log.VEventf(ctx, 2, "%s forcing flush of write buffer", req.Method())
288+
return true
289+
case *kvpb.DeleteRangeRequest:
290+
// DeleteRange requests can delete an arbitrary number of keys over a
291+
// given keyspan. We won't know the exact scope of the delete until
292+
// we've scanned the keyspan, which must happen on the server. We've got
293+
// a couple of options here:
294+
//
295+
// 1. We decompose the DeleteRange request into a (potentially
296+
// locking) Scan followed by buffered point Deletes for each
297+
// key in the scan's result.
298+
//
299+
// 2. We flush the buffer[1] and send the DeleteRange request to
300+
// the KV layer.
301+
//
302+
// We choose option 2, as typically the number of keys deleted is large,
303+
// and we may realize we're over budget after performing the initial
304+
// scan of the keyspan. At that point, we'll have to flush the buffer
305+
// anyway. Moreover, buffered writes are most impactful when a
306+
// transaction is writing to a small number of keys. As such, it's fine
307+
// to not optimize the DeleteRange case, as typically it results in a
308+
// large writing transaction.
309+
//
310+
// [1] Technically, we only need to flush the overlapping portion of the
311+
// buffer. However, for simplicity, the txnWriteBuffer doesn't support
312+
// transactions with partially buffered writes and partially flushed
313+
// writes. We could change this in the future if there's benefit to
314+
// doing so.
315+
316+
log.VEventf(ctx, 2, "%s forcing flush of write buffer", req.Method())
317+
return true
318+
}
319+
}
320+
return false
321+
}
322+
298323
// validateBatch returns an error if the batch is unsupported
299324
// by the txnWriteBuffer.
300325
func (twb *txnWriteBuffer) validateBatch(ba *kvpb.BatchRequest) error {
@@ -353,9 +378,9 @@ func (twb *txnWriteBuffer) validateRequests(ba *kvpb.BatchRequest) error {
353378
}
354379
case *kvpb.QueryLocksRequest, *kvpb.LeaseInfoRequest:
355380
default:
356-
// All other requests are unsupported. Note that we assume EndTxn and
357-
// DeleteRange requests were handled explicitly before this method was
358-
// called.
381+
// All other requests are unsupported. Note that we assume that requests
382+
// that should result in a buffer flush are handled explicitly before this
383+
// method was called.
359384
return unsupportedMethodError(t.Method())
360385
}
361386
}

pkg/kv/kvclient/kvcoord/txn_interceptor_write_buffer_test.go

Lines changed: 137 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -1508,118 +1508,164 @@ func TestTxnWriteBufferFlushesWhenOverBudget(t *testing.T) {
15081508
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferMemoryLimitExceeded.Count())
15091509
}
15101510

1511-
// TestTxnWriteBufferDeleteRange ensures that the txnWriteBuffer correctly
1512-
// handles DeleteRange requests. In particular, whenever we see a batch with a
1513-
// DeleteRange request, the write buffer is flushed and write buffering is
1514-
// turned off for subsequent requests.
1515-
func TestTxnWriteBufferDeleteRange(t *testing.T) {
1511+
// TestTxnWriteBufferFlushesIfBatchRequiresFlushing ensures that the
1512+
// txnWriteBuffer correctly handles requests that aren't currently
1513+
// supported by the txnWriteBuffer by flushing the buffer before
1514+
// processing the request.
1515+
func TestTxnWriteBufferFlushesIfBatchRequiresFlushing(t *testing.T) {
15161516
defer leaktest.AfterTest(t)()
15171517
defer log.Scope(t).Close(t)
15181518
ctx := context.Background()
1519-
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
1520-
1521-
txn := makeTxnProto()
1522-
txn.Sequence = 10
15231519
keyA, keyB, keyC := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")
15241520
valA := "valA"
15251521

1526-
ba := &kvpb.BatchRequest{}
1527-
ba.Header = kvpb.Header{Txn: &txn}
1528-
putA := putArgs(keyA, valA, txn.Sequence)
1529-
delC := delArgs(keyC, txn.Sequence)
1530-
ba.Add(putA)
1531-
ba.Add(delC)
1532-
1533-
numCalled := mockSender.NumCalled()
1534-
br, pErr := twb.SendLocked(ctx, ba)
1535-
require.Nil(t, pErr)
1536-
require.NotNil(t, br)
1537-
// All the requests should be buffered and not make it past the
1538-
// txnWriteBuffer. The response returned should be indistinguishable.
1539-
require.Equal(t, numCalled, mockSender.NumCalled())
1540-
require.Len(t, br.Responses, 2)
1541-
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1542-
// Verify the Put was buffered correctly.
1543-
expBufferedWrites := []bufferedWrite{
1544-
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1545-
makeBufferedWrite(keyC, makeBufferedValue("", 10)),
1522+
type batchSendMock func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error)
1523+
type testCase struct {
1524+
name string
1525+
ba func(*kvpb.BatchRequest)
1526+
baSender func(*testing.T) batchSendMock
1527+
validateResp func(*testing.T, *kvpb.BatchResponse, *kvpb.Error)
15461528
}
1547-
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
1548-
1549-
// Send a DeleteRange request. This should result in the entire buffer
1550-
// being flushed. Note that we're flushing the delete to key C as well, even
1551-
// though it doesn't overlap with the DeleteRange request.
1552-
ba = &kvpb.BatchRequest{}
1553-
ba.Header = kvpb.Header{Txn: &txn}
1554-
delRange := delRangeArgs(keyA, keyB, txn.Sequence)
1555-
ba.Add(delRange)
15561529

1557-
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1558-
require.Len(t, ba.Requests, 3)
1530+
testCases := []testCase{
1531+
{
1532+
name: "DeleteRange",
1533+
ba: func(b *kvpb.BatchRequest) {
1534+
b.Add(delRangeArgs(keyA, keyB, b.Txn.Sequence))
1535+
},
1536+
baSender: func(t *testing.T) batchSendMock {
1537+
return func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1538+
require.Len(t, ba.Requests, 3)
1539+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1540+
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
1541+
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[2].GetInner())
1542+
1543+
br := ba.CreateReply()
1544+
br.Txn = ba.Txn
1545+
return br, nil
1546+
}
1547+
},
1548+
validateResp: func(t *testing.T, br *kvpb.BatchResponse, pErr *kvpb.Error) {
1549+
require.Nil(t, pErr)
1550+
require.NotNil(t, br)
1551+
require.Len(t, br.Responses, 1)
1552+
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
1553+
},
1554+
},
1555+
{
1556+
name: "Increment",
1557+
ba: func(b *kvpb.BatchRequest) {
1558+
b.Add(&kvpb.IncrementRequest{
1559+
RequestHeader: kvpb.RequestHeader{Key: keyA},
1560+
Increment: 1,
1561+
})
1562+
},
1563+
baSender: func(t *testing.T) batchSendMock {
1564+
return func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1565+
require.Len(t, ba.Requests, 3)
1566+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1567+
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
1568+
require.IsType(t, &kvpb.IncrementRequest{}, ba.Requests[2].GetInner())
1569+
1570+
br := ba.CreateReply()
1571+
br.Txn = ba.Txn
1572+
return br, nil
1573+
}
1574+
},
1575+
validateResp: func(t *testing.T, br *kvpb.BatchResponse, pErr *kvpb.Error) {
1576+
require.Nil(t, pErr)
1577+
require.NotNil(t, br)
1578+
require.Len(t, br.Responses, 1)
1579+
require.IsType(t, &kvpb.IncrementResponse{}, br.Responses[0].GetInner())
1580+
},
1581+
},
1582+
}
15591583

1560-
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1561-
require.IsType(t, &kvpb.DeleteRequest{}, ba.Requests[1].GetInner())
1562-
require.IsType(t, &kvpb.DeleteRangeRequest{}, ba.Requests[2].GetInner())
1584+
for _, tc := range testCases {
1585+
t.Run(tc.name, func(t *testing.T) {
1586+
twb, mockSender := makeMockTxnWriteBuffer(cluster.MakeClusterSettings())
15631587

1564-
br = ba.CreateReply()
1565-
br.Txn = ba.Txn
1566-
return br, nil
1567-
})
1588+
txn := makeTxnProto()
1589+
txn.Sequence = 10
1590+
ba := &kvpb.BatchRequest{}
1591+
ba.Header = kvpb.Header{Txn: &txn}
1592+
putA := putArgs(keyA, valA, txn.Sequence)
1593+
delC := delArgs(keyC, txn.Sequence)
1594+
ba.Add(putA)
1595+
ba.Add(delC)
15681596

1569-
br, pErr = twb.SendLocked(ctx, ba)
1570-
require.Nil(t, pErr)
1571-
require.NotNil(t, br)
1572-
// Even though we flushed some writes, it shouldn't make it back to the response.
1573-
require.Len(t, br.Responses, 1)
1574-
require.IsType(t, &kvpb.DeleteRangeResponse{}, br.Responses[0].GetInner())
1575-
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
1597+
numCalled := mockSender.NumCalled()
1598+
br, pErr := twb.SendLocked(ctx, ba)
1599+
require.Nil(t, pErr)
1600+
require.NotNil(t, br)
1601+
// All the requests should be buffered and not make it past the
1602+
// txnWriteBuffer. The response returned should be indistinguishable.
1603+
require.Equal(t, numCalled, mockSender.NumCalled())
1604+
require.Len(t, br.Responses, 2)
1605+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1606+
// Verify the Put was buffered correctly.
1607+
expBufferedWrites := []bufferedWrite{
1608+
makeBufferedWrite(keyA, makeBufferedValue("valA", 10)),
1609+
makeBufferedWrite(keyC, makeBufferedValue("", 10)),
1610+
}
1611+
require.Equal(t, expBufferedWrites, twb.testingBufferedWritesAsSlice())
15761612

1577-
// Ensure the buffer is empty at this point.
1578-
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
1613+
// Send the batch that should require a flush
1614+
ba = &kvpb.BatchRequest{}
1615+
ba.Header = kvpb.Header{Txn: &txn}
1616+
tc.ba(ba)
1617+
mockSender.MockSend(tc.baSender(t))
1618+
br, pErr = twb.SendLocked(ctx, ba)
1619+
tc.validateResp(t, br, pErr)
1620+
// Ensure the buffer is empty at this point.
1621+
require.Equal(t, 0, len(twb.testingBufferedWritesAsSlice()))
1622+
require.Equal(t, int64(1), twb.txnMetrics.TxnWriteBufferDisabledAfterBuffering.Count())
15791623

1580-
// Subsequent batches should not be buffered.
1581-
ba = &kvpb.BatchRequest{}
1582-
ba.Header = kvpb.Header{Txn: &txn}
1583-
putC := putArgs(keyC, valA, txn.Sequence)
1584-
ba.Add(putC)
1624+
// Subsequent batches should not be buffered.
1625+
ba = &kvpb.BatchRequest{}
1626+
ba.Header = kvpb.Header{Txn: &txn}
1627+
putC := putArgs(keyC, valA, txn.Sequence)
1628+
ba.Add(putC)
15851629

1586-
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1587-
require.Len(t, ba.Requests, 1)
1630+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1631+
require.Len(t, ba.Requests, 1)
15881632

1589-
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
1633+
require.IsType(t, &kvpb.PutRequest{}, ba.Requests[0].GetInner())
15901634

1591-
br = ba.CreateReply()
1592-
br.Txn = ba.Txn
1593-
return br, nil
1594-
})
1635+
br = ba.CreateReply()
1636+
br.Txn = ba.Txn
1637+
return br, nil
1638+
})
15951639

1596-
br, pErr = twb.SendLocked(ctx, ba)
1597-
require.Nil(t, pErr)
1598-
require.NotNil(t, br)
1599-
require.Len(t, br.Responses, 1)
1600-
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
1640+
br, pErr = twb.SendLocked(ctx, ba)
1641+
require.Nil(t, pErr)
1642+
require.NotNil(t, br)
1643+
require.Len(t, br.Responses, 1)
1644+
require.IsType(t, &kvpb.PutResponse{}, br.Responses[0].GetInner())
16011645

1602-
// Commit the transaction. We flushed the buffer already, and no subsequent
1603-
// writes were buffered, so the buffer should be empty. As such, no write
1604-
// requests should be added to the batch.
1605-
ba = &kvpb.BatchRequest{}
1606-
ba.Header = kvpb.Header{Txn: &txn}
1607-
ba.Add(&kvpb.EndTxnRequest{Commit: true})
1646+
// Commit the transaction. We flushed the buffer already, and no subsequent
1647+
// writes were buffered, so the buffer should be empty. As such, no write
1648+
// requests should be added to the batch.
1649+
ba = &kvpb.BatchRequest{}
1650+
ba.Header = kvpb.Header{Txn: &txn}
1651+
ba.Add(&kvpb.EndTxnRequest{Commit: true})
16081652

1609-
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1610-
require.Len(t, ba.Requests, 1)
1611-
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())
1653+
mockSender.MockSend(func(ba *kvpb.BatchRequest) (*kvpb.BatchResponse, *kvpb.Error) {
1654+
require.Len(t, ba.Requests, 1)
1655+
require.IsType(t, &kvpb.EndTxnRequest{}, ba.Requests[0].GetInner())
16121656

1613-
br = ba.CreateReply()
1614-
br.Txn = ba.Txn
1615-
return br, nil
1616-
})
1657+
br = ba.CreateReply()
1658+
br.Txn = ba.Txn
1659+
return br, nil
1660+
})
16171661

1618-
br, pErr = twb.SendLocked(ctx, ba)
1619-
require.Nil(t, pErr)
1620-
require.NotNil(t, br)
1621-
require.Len(t, br.Responses, 1)
1622-
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1662+
br, pErr = twb.SendLocked(ctx, ba)
1663+
require.Nil(t, pErr)
1664+
require.NotNil(t, br)
1665+
require.Len(t, br.Responses, 1)
1666+
require.IsType(t, &kvpb.EndTxnResponse{}, br.Responses[0].GetInner())
1667+
})
1668+
}
16231669
}
16241670

16251671
// TestTxnWriteBufferRollbackToSavepoint tests the savepoint rollback logic.
@@ -1915,16 +1961,6 @@ func TestTxnWriteBufferBatchRequestValidation(t *testing.T) {
19151961
return b
19161962
},
19171963
},
1918-
{
1919-
name: "batch with Increment",
1920-
ba: func() *kvpb.BatchRequest {
1921-
b := &kvpb.BatchRequest{Header: kvpb.Header{Txn: &txn}}
1922-
b.Add(&kvpb.IncrementRequest{
1923-
RequestHeader: kvpb.RequestHeader{Key: keyA, Sequence: txn.Sequence},
1924-
})
1925-
return b
1926-
},
1927-
},
19281964
{
19291965
name: "batch with ReturnRawMVCCValues Scan",
19301966
ba: func() *kvpb.BatchRequest {

0 commit comments

Comments
 (0)