@@ -7,6 +7,7 @@ package changefeedccl
7
7
8
8
import (
9
9
"bytes"
10
+ "cmp"
10
11
"context"
11
12
"crypto/rand"
12
13
"encoding/hex"
@@ -18,7 +19,7 @@ import (
18
19
"sync/atomic"
19
20
"time"
20
21
21
- "github.com/RaduBerinde/btree" // TODO(#144504): switch to the newer btree
22
+ "github.com/RaduBerinde/btreemap"
22
23
"github.com/cockroachdb/cockroach/pkg/base"
23
24
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
24
25
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
@@ -320,7 +321,7 @@ type cloudStorageSink struct {
320
321
// These are fields to track information needed to output files based on the naming
321
322
// convention described above. See comment on cloudStorageSink above for more details.
322
323
fileID int64
323
- files * btree. BTree // of *cloudStorageSinkFile
324
+ files * btreemap. BTreeMap [ cloudStorageSinkKey , * cloudStorageSinkFile ]
324
325
325
326
timestampOracle timestampLowerBoundOracle
326
327
jobSessionID string
@@ -416,7 +417,7 @@ func makeCloudStorageSink(
416
417
sinkID : sinkID ,
417
418
settings : settings ,
418
419
targetMaxFileSize : targetMaxFileSize ,
419
- files : btree .New ( 8 ),
420
+ files : btreemap .New [ cloudStorageSinkKey , * cloudStorageSinkFile ]( 8 , keyCmp ),
420
421
partitionFormat : defaultPartitionFormat ,
421
422
timestampOracle : timestampOracle ,
422
423
// TODO(dan,ajwerner): Use the jobs framework's session ID once that's available.
@@ -516,8 +517,7 @@ func (s *cloudStorageSink) getOrCreateFile(
516
517
) (* cloudStorageSinkFile , error ) {
517
518
name , _ := s .topicNamer .Name (topic )
518
519
key := cloudStorageSinkKey {name , int64 (topic .GetVersion ())}
519
- if item := s .files .Get (key ); item != nil {
520
- f := item .(* cloudStorageSinkFile )
520
+ if _ , f , _ := s .files .Get (key ); f != nil {
521
521
if eventMVCC .Less (f .oldestMVCC ) {
522
522
f .oldestMVCC = eventMVCC
523
523
}
@@ -537,7 +537,7 @@ func (s *cloudStorageSink) getOrCreateFile(
537
537
}
538
538
f .codec = codec
539
539
}
540
- s .files .ReplaceOrInsert (f )
540
+ s .files .ReplaceOrInsert (f . cloudStorageSinkKey , f )
541
541
return f , nil
542
542
}
543
543
@@ -646,20 +646,17 @@ func (s *cloudStorageSink) EmitResolvedTimestamp(
646
646
// on cloudStorageSink)
647
647
func (s * cloudStorageSink ) flushTopicVersions (
648
648
ctx context.Context , topic string , maxVersionToFlush int64 ,
649
- ) ( err error ) {
649
+ ) error {
650
650
var toRemoveAlloc [2 ]int64 // generally avoid allocating
651
651
toRemove := toRemoveAlloc [:0 ] // schemaIDs of flushed files
652
652
gte := cloudStorageSinkKey {topic : topic }
653
653
lt := cloudStorageSinkKey {topic : topic , schemaID : maxVersionToFlush + 1 }
654
- s . files . AscendRange ( gte , lt , func ( i btree. Item ) ( wantMore bool ) {
655
- f := i .( * cloudStorageSinkFile )
656
- if err = s .flushFile (ctx , f ); err = = nil {
657
- toRemove = append ( toRemove , f . schemaID )
654
+
655
+ for _ , f := range s . files . Ascend ( btreemap . GE ( gte ), btreemap . LT ( lt )) {
656
+ if err : = s .flushFile (ctx , f ); err ! = nil {
657
+ return err
658
658
}
659
- return err == nil
660
- })
661
- if err != nil {
662
- return err
659
+ toRemove = append (toRemove , f .schemaID )
663
660
}
664
661
665
662
// Allow synchronization with the async flusher to happen.
@@ -672,8 +669,7 @@ func (s *cloudStorageSink) flushTopicVersions(
672
669
// flushed files may not be removed from s.files. This is ok, since
673
670
// the error will trigger the sink to be closed, and we will only use
674
671
// s.files to ensure that the codecs are closed before deallocating it.
675
- err = s .waitAsyncFlush (ctx )
676
- if err != nil {
672
+ if err := s .waitAsyncFlush (ctx ); err != nil {
677
673
return err
678
674
}
679
675
@@ -682,7 +678,7 @@ func (s *cloudStorageSink) flushTopicVersions(
682
678
for _ , v := range toRemove {
683
679
s .files .Delete (cloudStorageSinkKey {topic : topic , schemaID : v })
684
680
}
685
- return err
681
+ return nil
686
682
}
687
683
688
684
// Flush implements the Sink interface.
@@ -693,13 +689,10 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
693
689
694
690
s .metrics .recordFlushRequestCallback ()()
695
691
696
- var err error
697
- s .files .Ascend (func (i btree.Item ) (wantMore bool ) {
698
- err = s .flushFile (ctx , i .(* cloudStorageSinkFile ))
699
- return err == nil
700
- })
701
- if err != nil {
702
- return err
692
+ for _ , f := range s .files .Ascend (btreemap .Min [cloudStorageSinkKey ](), btreemap .Max [cloudStorageSinkKey ]()) {
693
+ if err := s .flushFile (ctx , f ); err != nil {
694
+ return err
695
+ }
703
696
}
704
697
// Allow synchronization with the async flusher to happen.
705
698
if s .testingKnobs != nil && s .testingKnobs .AsyncFlushSync != nil {
@@ -711,8 +704,7 @@ func (s *cloudStorageSink) Flush(ctx context.Context) error {
711
704
// flushed files may not be removed from s.files. This is ok, since
712
705
// the error will trigger the sink to be closed, and we will only use
713
706
// s.files to ensure that the codecs are closed before deallocating it.
714
- err = s .waitAsyncFlush (ctx )
715
- if err != nil {
707
+ if err := s .waitAsyncFlush (ctx ); err != nil {
716
708
return err
717
709
}
718
710
// Files need to be cleared after the flush completes, otherwise file resources
@@ -909,17 +901,15 @@ func (s *cloudStorageSink) closeAllCodecs() (err error) {
909
901
// Codecs need to be closed because of the klauspost compression library implementation
910
902
// details where it spins up go routines to perform compression in parallel.
911
903
// Those go routines are cleaned up when the compression codec is closed.
912
- s .files .Ascend (func (i btree.Item ) (wantMore bool ) {
913
- f := i .(* cloudStorageSinkFile )
904
+ for _ , f := range s .files .Ascend (btreemap .Min [cloudStorageSinkKey ](), btreemap .Max [cloudStorageSinkKey ]()) {
914
905
if f .codec != nil {
915
906
cErr := f .codec .Close ()
916
907
f .codec = nil
917
908
if err == nil {
918
909
err = cErr
919
910
}
920
911
}
921
- return true
922
- })
912
+ }
923
913
return err
924
914
}
925
915
@@ -943,22 +933,11 @@ type cloudStorageSinkKey struct {
943
933
schemaID int64
944
934
}
945
935
946
- func (k cloudStorageSinkKey ) Less (other btree.Item ) bool {
947
- switch other := other .(type ) {
948
- case * cloudStorageSinkFile :
949
- return keyLess (k , other .cloudStorageSinkKey )
950
- case cloudStorageSinkKey :
951
- return keyLess (k , other )
952
- default :
953
- panic (errors .Errorf ("unexpected item type %T" , other ))
954
- }
955
- }
956
-
957
- func keyLess (a , b cloudStorageSinkKey ) bool {
958
- if a .topic == b .topic {
959
- return a .schemaID < b .schemaID
936
+ func keyCmp (a , b cloudStorageSinkKey ) int {
937
+ if a .topic != b .topic {
938
+ return cmp .Compare (a .topic , b .topic )
960
939
}
961
- return a . topic < b . topic
940
+ return cmp . Compare ( a . schemaID , b . schemaID )
962
941
}
963
942
964
943
// generateChangefeedSessionID generates a unique string that is used to
0 commit comments