@@ -1515,10 +1515,10 @@ func (rr requestRecord) toResp(
1515
1515
// a lock, we add it to the buffer since we may need to flush it as
1516
1516
// replicated lock.
1517
1517
if rr .transformed {
1518
-
1519
1518
transformedGetResponse := br .GetInner ().(* kvpb.GetResponse )
1520
1519
valueWasPresent := transformedGetResponse .Value .IsPresent ()
1521
- lockShouldHaveBeenAcquired := valueWasPresent || req .LockNonExisting
1520
+ lockShouldHaveBeenAcquired := (valueWasPresent || req .LockNonExisting ) &&
1521
+ transformedGetResponse .ResumeSpan == nil
1522
1522
1523
1523
if lockShouldHaveBeenAcquired {
1524
1524
dla := & bufferedDurableLockAcquisition {
@@ -1771,10 +1771,7 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
1771
1771
}
1772
1772
1773
1773
midTxnFlush := ! hasEndTxn
1774
-
1775
- // SkipLocked reads cannot be in a batch with basically anything else. If we
1776
- // encounter one, we need to flush our buffer in its own batch.
1777
- splitBatchRequired := ba .WaitPolicy == lock .WaitPolicy_SkipLocked
1774
+ splitBatchRequired := separateBatchIsNeeded (ba )
1778
1775
1779
1776
// Flush all buffered writes by pre-pending them to the requests being sent
1780
1777
// in the batch.
@@ -1816,17 +1813,21 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
1816
1813
})
1817
1814
1818
1815
if splitBatchRequired {
1819
- log .VEventf (ctx , 2 , "flushing buffer via separate batch" )
1820
1816
flushBatch := ba .ShallowCopy ()
1821
- flushBatch . WaitPolicy = 0
1817
+ clearBatchRequestOptions ( flushBatch )
1822
1818
flushBatch .Requests = reqs
1819
+ log .VEventf (ctx , 2 , "flushing %d buffered requests via separate batch" , len (reqs ))
1820
+
1823
1821
br , pErr := twb .wrapped .SendLocked (ctx , flushBatch )
1824
1822
if pErr != nil {
1825
1823
pErr .Index = nil
1826
1824
return nil , pErr
1827
1825
}
1828
-
1826
+ if err := requireAllFlushedRequestsProcessed (br .Responses ); err != nil {
1827
+ return nil , kvpb .NewError (err )
1828
+ }
1829
1829
ba .UpdateTxn (br .Txn )
1830
+
1830
1831
return twb .wrapped .SendLocked (ctx , ba )
1831
1832
} else {
1832
1833
ba = ba .ShallowCopy ()
@@ -1835,13 +1836,56 @@ func (twb *txnWriteBuffer) flushBufferAndSendBatch(
1835
1836
if pErr != nil {
1836
1837
return nil , twb .adjustErrorUponFlush (ctx , numRevisionsBuffered , pErr )
1837
1838
}
1838
-
1839
+ if err := requireAllFlushedRequestsProcessed (br .Responses [0 :numRevisionsBuffered ]); err != nil {
1840
+ return nil , kvpb .NewError (err )
1841
+ }
1839
1842
// Strip out responses for all the flushed buffered writes.
1840
1843
br .Responses = br .Responses [numRevisionsBuffered :]
1841
1844
return br , nil
1842
1845
}
1843
1846
}
1844
1847
1848
+ func requireAllFlushedRequestsProcessed (responses []kvpb.ResponseUnion ) error {
1849
+ for _ , resp := range responses {
1850
+ if resp .GetInner ().Header ().ResumeSpan != nil {
1851
+ return errors .AssertionFailedf ("response from buffered request has non-nil resume span" )
1852
+ }
1853
+ }
1854
+ return nil
1855
+ }
1856
+
1857
+ // separateBatchIsNeeded returns true if BatchRequest contains any options that
1858
+ // require us to flush buffered requests using a separate batch.
1859
+ //
1860
+ // NB: If you are updating this function, you need to update
1861
+ // clearBatchRequestOptions as well.
1862
+ func separateBatchIsNeeded (ba * kvpb.BatchRequest ) bool {
1863
+ return ba .MightStopEarly () ||
1864
+ ba .ReadConsistency != 0 ||
1865
+ ba .WaitPolicy != 0 ||
1866
+ ba .WriteOptions != nil && (* ba .WriteOptions != kvpb.WriteOptions {}) ||
1867
+ ba .IsReverse
1868
+ }
1869
+
1870
+ // clearBatchRequestOptions clears any options that should not be present on a
1871
+ // batch used to send previously buffered requests.
1872
+ //
1873
+ // NB: If you are updating this function, you need to update
1874
+ // separateBatchIsNeeded as well.
1875
+ func clearBatchRequestOptions (ba * kvpb.BatchRequest ) {
1876
+ // If read consistency is set to anything but CONSISTENT, our flush will fail
1877
+ // because we only allow inconsistent reads for read only requests.
1878
+ ba .ReadConsistency = 0
1879
+ // If WaitPolicy is set to SkipLocked, our request may fail validation.
1880
+ ba .WaitPolicy = 0
1881
+ // Reset options that could result in an early batch return.
1882
+ ba .MaxSpanRequestKeys = 0
1883
+ ba .TargetBytes = 0
1884
+ ba .ReturnElasticCPUResumeSpans = false
1885
+ ba .IsReverse = false
1886
+ ba .WriteOptions = nil
1887
+ }
1888
+
1845
1889
// hasBufferedWrites returns whether the interceptor has buffered any writes
1846
1890
// locally.
1847
1891
func (twb * txnWriteBuffer ) hasBufferedWrites () bool {
0 commit comments