@@ -194,7 +194,7 @@ func (se *Engine) Open() error {
194
194
}
195
195
se .notifiers = make (map [string ]notifier )
196
196
197
- if err := se .reload (ctx ); err != nil {
197
+ if err := se .reload (ctx , true ); err != nil {
198
198
return err
199
199
}
200
200
if ! se .SkipMetaCheck {
@@ -285,6 +285,8 @@ func (se *Engine) EnableHistorian(enabled bool) error {
285
285
286
286
// Reload reloads the schema info from the db.
287
287
// Any tables that have changed since the last load are updated.
288
+ // The includeStats argument controls whether table size statistics should be
289
+ // emitted, as they can be expensive to calculate for a large number of tables
288
290
func (se * Engine ) Reload (ctx context.Context ) error {
289
291
return se .ReloadAt (ctx , mysql.Position {})
290
292
}
@@ -294,25 +296,35 @@ func (se *Engine) Reload(ctx context.Context) error {
294
296
// It maintains the position at which the schema was reloaded and if the same position is provided
295
297
// (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema
296
298
func (se * Engine ) ReloadAt (ctx context.Context , pos mysql.Position ) error {
299
+ return se .ReloadAtEx (ctx , pos , true )
300
+ }
301
+
302
+ // ReloadAtEx reloads the schema info from the db.
303
+ // Any tables that have changed since the last load are updated.
304
+ // It maintains the position at which the schema was reloaded and if the same position is provided
305
+ // (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema
306
+ // The includeStats argument controls whether table size statistics should be
307
+ // emitted, as they can be expensive to calculate for a large number of tables
308
+ func (se * Engine ) ReloadAtEx (ctx context.Context , pos mysql.Position , includeStats bool ) error {
297
309
se .mu .Lock ()
298
310
defer se .mu .Unlock ()
299
311
if ! se .isOpen {
300
312
log .Warning ("Schema reload called for an engine that is not yet open" )
301
313
return nil
302
314
}
303
315
if ! pos .IsZero () && se .reloadAtPos .AtLeast (pos ) {
304
- log .V (2 ).Infof ("ReloadAt : found cached schema at %s" , mysql .EncodePosition (pos ))
316
+ log .V (2 ).Infof ("ReloadAtEx : found cached schema at %s" , mysql .EncodePosition (pos ))
305
317
return nil
306
318
}
307
- if err := se .reload (ctx ); err != nil {
319
+ if err := se .reload (ctx , includeStats ); err != nil {
308
320
return err
309
321
}
310
322
se .reloadAtPos = pos
311
323
return nil
312
324
}
313
325
314
326
// reload reloads the schema. It can also be used to initialize it.
315
- func (se * Engine ) reload (ctx context.Context ) error {
327
+ func (se * Engine ) reload (ctx context.Context , includeStats bool ) error {
316
328
defer func () {
317
329
se .env .LogError ()
318
330
}()
@@ -332,7 +344,14 @@ func (se *Engine) reload(ctx context.Context) error {
332
344
if se .SkipMetaCheck {
333
345
return nil
334
346
}
335
- tableData , err := conn .Exec (ctx , conn .BaseShowTables (), maxTableCount , false )
347
+
348
+ var showTablesQuery string
349
+ if includeStats {
350
+ showTablesQuery = conn .BaseShowTablesWithSizes ()
351
+ } else {
352
+ showTablesQuery = conn .BaseShowTables ()
353
+ }
354
+ tableData , err := conn .Exec (ctx , showTablesQuery , maxTableCount , false )
336
355
if err != nil {
337
356
return err
338
357
}
@@ -353,12 +372,15 @@ func (se *Engine) reload(ctx context.Context) error {
353
372
tableName := row [0 ].ToString ()
354
373
curTables [tableName ] = true
355
374
createTime , _ := evalengine .ToInt64 (row [2 ])
356
- fileSize , _ := evalengine .ToUint64 (row [4 ])
357
- allocatedSize , _ := evalengine .ToUint64 (row [5 ])
358
-
359
- // publish the size metrics
360
- se .tableFileSizeGauge .Set (tableName , int64 (fileSize ))
361
- se .tableAllocatedSizeGauge .Set (tableName , int64 (allocatedSize ))
375
+ var fileSize , allocatedSize uint64
376
+
377
+ if includeStats {
378
+ fileSize , _ = evalengine .ToUint64 (row [4 ])
379
+ allocatedSize , _ = evalengine .ToUint64 (row [5 ])
380
+ // publish the size metrics
381
+ se .tableFileSizeGauge .Set (tableName , int64 (fileSize ))
382
+ se .tableAllocatedSizeGauge .Set (tableName , int64 (allocatedSize ))
383
+ }
362
384
363
385
// Table schemas are cached by tabletserver. For each table we cache `information_schema.tables.create_time` (`tbl.CreateTime`).
364
386
// We also record the last time the schema was loaded (`se.lastChange`). Both are in seconds. We reload a table only when:
@@ -372,8 +394,10 @@ func (se *Engine) reload(ctx context.Context) error {
372
394
// #1 will not identify the renamed table as a changed one.
373
395
tbl , isInTablesMap := se .tables [tableName ]
374
396
if isInTablesMap && createTime == tbl .CreateTime && createTime < se .lastChange {
375
- tbl .FileSize = fileSize
376
- tbl .AllocatedSize = allocatedSize
397
+ if includeStats {
398
+ tbl .FileSize = fileSize
399
+ tbl .AllocatedSize = allocatedSize
400
+ }
377
401
continue
378
402
}
379
403
@@ -389,8 +413,10 @@ func (se *Engine) reload(ctx context.Context) error {
389
413
rec .RecordError (vterrors .Wrapf (err , "in Engine.reload(), reading table %s" , tableName ))
390
414
continue
391
415
}
392
- table .FileSize = fileSize
393
- table .AllocatedSize = allocatedSize
416
+ if includeStats {
417
+ table .FileSize = fileSize
418
+ table .AllocatedSize = allocatedSize
419
+ }
394
420
table .CreateTime = createTime
395
421
changedTables [tableName ] = table
396
422
if isInTablesMap {
0 commit comments