@@ -12,6 +12,7 @@ import (
1212 "github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1313 "github.com/cockroachdb/cockroach/pkg/roachpb"
1414 "github.com/cockroachdb/cockroach/pkg/util/hlc"
15+ "github.com/cockroachdb/cockroach/pkg/util/log"
1516 "github.com/cockroachdb/cockroach/pkg/util/retry"
1617 "github.com/cockroachdb/cockroach/pkg/util/syncutil"
1718 "github.com/cockroachdb/crlib/crtime"
@@ -291,11 +292,53 @@ func (ubr *unbufferedRegistration) drainAllocations(ctx context.Context) {
291292// drainAllocations should never be called concurrently with this function.
292293// Caller is responsible for draining it again if error is returned.
293294func (ubr * unbufferedRegistration ) publishCatchUpBuffer (ctx context.Context ) error {
294- publish := func () error {
295+
296+ // We use the unbuffered sender until we have sent a non-empty checkpoint. The
297+ // goal is to inform the stream of its successful catch-up scan promptly.
298+ //
299+ // When under lock we always use the buffered sender to avoid blocking for
300+ // too long.
301+ //
302+ // Once we've set shouldSendBuffered to true, we should never send an
303+ // unbuffered event.
304+ shouldSendBuffered := false
305+
306+ // Because the checkpoint is typically the first event in the buffer, we also
307+ // only send a limited number of events before moving to the buffered sender.
308+ //
309+ // TODO(ssd): We are considering using the buffered sender during the catch up
310+ // scan. If we do that, we must remove use of the unbuffered sender here.
311+ maxUnbufferedSends := min (len (ubr .mu .catchUpBuf ), 1024 )
312+ unbufferedSendCount := 0
313+
314+ publish := func (underLock bool ) error {
315+ shouldSendBuffered = shouldSendBuffered || underLock
295316 for {
317+ // Prioritize checking ctx.Done() so that we respond to context
318+ // cancellation quickly even in the presence of a long queue.
319+ select {
320+ case <- ctx .Done ():
321+ return ctx .Err ()
322+ default :
323+ }
324+
296325 select {
297326 case e := <- ubr .mu .catchUpBuf :
298- err := ubr .stream .SendBuffered (e .event , e .alloc )
327+ var err error
328+ if shouldSendBuffered {
329+ err = ubr .stream .SendBuffered (e .event , e .alloc )
330+ } else {
331+ unbufferedSendCount ++
332+ foundCheckpoint := e .event .Checkpoint != nil && ! e .event .Checkpoint .ResolvedTS .IsEmpty ()
333+ if foundCheckpoint || unbufferedSendCount >= maxUnbufferedSends {
334+ shouldSendBuffered = true
335+ if ! foundCheckpoint {
336+ log .KvExec .Warningf (ctx , "no non-empty checkpoint found before transitioning to buffered sender" )
337+ }
338+ }
339+ err = ubr .stream .SendUnbuffered (e .event )
340+ }
341+
299342 e .alloc .Release (ctx )
300343 putPooledSharedEvent (e )
301344 if err != nil {
@@ -311,7 +354,12 @@ func (ubr *unbufferedRegistration) publishCatchUpBuffer(ctx context.Context) err
311354 }
312355
313356 // Drain without holding locks first to avoid unnecessary blocking on publish().
314- if err := publish (); err != nil {
357+ //
358+ // TODO(ssd): An unfortunate side-effect of this is that we are not
359+ // coordinated with the disconnected bool. This represents what I believe is
360+ // the only place where we could end up sending events to the buffered sender
361+ // _after_ an error has already been delivered.
362+ if err := publish (false ); err != nil {
315363 return err
316364 }
317365
@@ -320,7 +368,7 @@ func (ubr *unbufferedRegistration) publishCatchUpBuffer(ctx context.Context) err
320368
321369 // Drain again with lock held to ensure that events added to the buffer while
322370 // draining took place are also published.
323- if err := publish (); err != nil {
371+ if err := publish (true ); err != nil {
324372 return err
325373 }
326374
0 commit comments