@@ -41,10 +41,12 @@ import (
41
41
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
42
42
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
43
43
kvstorage "github.com/cockroachdb/cockroach/pkg/storage"
44
+ "github.com/cockroachdb/cockroach/pkg/util/buildutil"
44
45
"github.com/cockroachdb/cockroach/pkg/util/hlc"
45
46
"github.com/cockroachdb/cockroach/pkg/util/log"
46
47
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
47
48
"github.com/cockroachdb/cockroach/pkg/util/metric"
49
+ "github.com/cockroachdb/cockroach/pkg/util/mon"
48
50
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
49
51
"github.com/cockroachdb/cockroach/pkg/util/retry"
50
52
"github.com/cockroachdb/cockroach/pkg/util/startup"
@@ -777,6 +779,12 @@ func (m *Manager) readOlderVersionForTimestamp(
777
779
return descs , nil
778
780
}
779
781
782
+ // wrapMemoryError adds a hint on memory errors to indicate
783
+ // which setting should be bumped.
784
+ func wrapMemoryError (err error ) error {
785
+ return errors .WithHint (err , "Consider increasing --max-sql-memory startup parameter." )
786
+ }
787
+
780
788
// Insert descriptor versions. The versions provided are not in
781
789
// any particular order.
782
790
func (m * Manager ) insertDescriptorVersions (
@@ -789,16 +797,25 @@ func (m *Manager) insertDescriptorVersions(
789
797
}
790
798
t .mu .Lock ()
791
799
defer t .mu .Unlock ()
800
+ newVersionsToInsert := make ([]* descriptorVersionState , 0 , len (versions ))
792
801
for i := range versions {
793
802
// Since we gave up the lock while reading the versions from
794
803
// the store we have to ensure that no one else inserted the
795
804
// same version.
796
805
existingVersion := t .mu .active .findVersion (versions [i ].desc .GetVersion ())
797
806
if existingVersion == nil {
798
- t .mu .active .insert (
799
- newDescriptorVersionState (t , versions [i ].desc , versions [i ].expiration , session , nil , false ))
807
+ descState := newDescriptorVersionState (t , versions [i ].desc , versions [i ].expiration , session , nil , false )
808
+ if err := t .m .boundAccount .Grow (ctx , descState .getByteSize ()); err != nil {
809
+ return wrapMemoryError (err )
810
+ }
811
+ newVersionsToInsert = append (newVersionsToInsert , descState )
800
812
}
801
813
}
814
+ // Only insert if all versions were allocated.
815
+ for _ , descState := range newVersionsToInsert {
816
+ t .mu .active .insert (descState )
817
+ }
818
+
802
819
return nil
803
820
}
804
821
@@ -978,7 +995,7 @@ func purgeOldVersions(
978
995
t .mu .Lock ()
979
996
defer t .mu .Unlock ()
980
997
t .mu .takenOffline = dropped
981
- return t .removeInactiveVersions (), t .mu .active .findPreviousToExpire (dropped )
998
+ return t .removeInactiveVersions (ctx ), t .mu .active .findPreviousToExpire (dropped )
982
999
}()
983
1000
for _ , l := range leases {
984
1001
releaseLease (ctx , l , m )
@@ -1122,6 +1139,11 @@ type Manager struct {
1122
1139
// initComplete is a fast check to confirm that initialization is complete, since
1123
1140
// performance testing showed select on the waitForInit channel can be expensive.
1124
1141
initComplete atomic.Bool
1142
+
1143
+ // bytesMonitor tracks the memory usage from leased descriptors.
1144
+ bytesMonitor * mon.BytesMonitor
1145
+ // boundAccount tracks the memory usage from leased descriptors.
1146
+ boundAccount * mon.ConcurrentBoundAccount
1125
1147
}
1126
1148
1127
1149
const leaseConcurrencyLimit = 5
@@ -1133,6 +1155,7 @@ const leaseConcurrencyLimit = 5
1133
1155
//
1134
1156
// stopper is used to run async tasks. Can be nil in tests.
1135
1157
func NewLeaseManager (
1158
+ ctx context.Context ,
1136
1159
ambientCtx log.AmbientContext ,
1137
1160
nodeIDContainer * base.SQLIDContainer ,
1138
1161
db isql.DB ,
@@ -1144,7 +1167,11 @@ func NewLeaseManager(
1144
1167
testingKnobs ManagerTestingKnobs ,
1145
1168
stopper * stop.Stopper ,
1146
1169
rangeFeedFactory * rangefeed.Factory ,
1170
+ rootBytesMonitor * mon.BytesMonitor ,
1147
1171
) * Manager {
1172
+ // See pkg/sql/mem_metrics.go
1173
+ // log10int64times1000 = log10(math.MaxInt64) * 1000, rounded up somewhat
1174
+ const log10int64times1000 = 19 * 1000
1148
1175
lm := & Manager {
1149
1176
storage : storage {
1150
1177
nodeIDContainer : nodeIDContainer ,
@@ -1199,6 +1226,24 @@ func NewLeaseManager(
1199
1226
Measurement : "Number of wait for initial version routines executing" ,
1200
1227
Unit : metric .Unit_COUNT ,
1201
1228
}),
1229
+ leaseCurBytesCount : metric .NewGauge (metric.Metadata {
1230
+ Name : "sql.leases.lease_cur_bytes_count" ,
1231
+ Help : "The current number of bytes used by the lease manager." ,
1232
+ Measurement : "Number of bytes used by the lease manager." ,
1233
+ Unit : metric .Unit_BYTES ,
1234
+ }),
1235
+ leaseMaxBytesHist : metric .NewHistogram (metric.HistogramOptions {
1236
+ Metadata : metric.Metadata {
1237
+ Name : "sql.leases.lease_max_bytes_hist" ,
1238
+ Help : "Memory used by the lease manager." ,
1239
+ Measurement : "Number of bytes used by the lease manager." ,
1240
+ Unit : metric .Unit_BYTES ,
1241
+ },
1242
+ Duration : base .DefaultHistogramWindowInterval (),
1243
+ MaxVal : log10int64times1000 ,
1244
+ SigFigs : 3 ,
1245
+ BucketConfig : metric .MemoryUsage64MBBuckets ,
1246
+ }),
1202
1247
},
1203
1248
},
1204
1249
settings : settings ,
@@ -1226,6 +1271,22 @@ func NewLeaseManager(
1226
1271
lm .descUpdateCh = make (chan catalog.Descriptor )
1227
1272
lm .descDelCh = make (chan descpb.ID )
1228
1273
lm .rangefeedErrCh = make (chan error )
1274
+ lm .bytesMonitor = mon .NewMonitor (mon.Options {
1275
+ Name : mon .MakeName ("leased-descriptors" ),
1276
+ CurCount : lm .storage .leasingMetrics .leaseCurBytesCount ,
1277
+ MaxHist : lm .storage .leasingMetrics .leaseMaxBytesHist ,
1278
+ Res : mon .MemoryResource ,
1279
+ Settings : settings ,
1280
+ LongLiving : true ,
1281
+ })
1282
+ lm .bytesMonitor .StartNoReserved (context .Background (), rootBytesMonitor )
1283
+ lm .boundAccount = lm .bytesMonitor .MakeConcurrentBoundAccount ()
1284
+ // Add a stopper for the bound account that we are using to
1285
+ // track memory usage.
1286
+ lm .stopper .AddCloser (stop .CloserFn (func () {
1287
+ lm .boundAccount .Close (ctx )
1288
+ lm .bytesMonitor .Stop (ctx )
1289
+ }))
1229
1290
return lm
1230
1291
}
1231
1292
@@ -1517,7 +1578,10 @@ func (m *Manager) IsDraining() bool {
1517
1578
// been done by the time this call returns. See the explanation in
1518
1579
// pkg/server/drain.go for details.
1519
1580
func (m * Manager ) SetDraining (
1520
- ctx context.Context , drain bool , reporter func (int , redact.SafeString ),
1581
+ ctx context.Context ,
1582
+ drain bool ,
1583
+ reporter func (int , redact.SafeString ),
1584
+ assertOnLeakedDescriptor bool ,
1521
1585
) {
1522
1586
m .draining .Store (drain )
1523
1587
if ! drain {
@@ -1530,7 +1594,13 @@ func (m *Manager) SetDraining(
1530
1594
leases := func () []* storedLease {
1531
1595
t .mu .Lock ()
1532
1596
defer t .mu .Unlock ()
1533
- return t .removeInactiveVersions ()
1597
+ leasesToRelease := t .removeInactiveVersions (ctx )
1598
+ // Ensure that all leases are released at this time.
1599
+ if buildutil .CrdbTestBuild && assertOnLeakedDescriptor && len (t .mu .active .data ) > 0 {
1600
+ // Panic that a descriptor may have leaked.
1601
+ panic (errors .AssertionFailedf ("descriptor leak was detected for: %d (%s)" , t .id , t .mu .active ))
1602
+ }
1603
+ return leasesToRelease
1534
1604
}()
1535
1605
for _ , l := range leases {
1536
1606
releaseLease (ctx , l , m )
@@ -2478,3 +2548,8 @@ func (m *Manager) deleteOrphanedLeasesWithSameInstanceID(
2478
2548
}
2479
2549
}
2480
2550
}
2551
+
2552
+ // TestingGetBoundAccount returns the bound account used by the lease manager.
2553
+ func (m * Manager ) TestingGetBoundAccount () * mon.ConcurrentBoundAccount {
2554
+ return m .boundAccount
2555
+ }
0 commit comments