Skip to content

Commit ac3a77b

Browse files
committed
tapdb: ensure the stats cache is always populated
In this commit, we ensure the stats cache is always populated by using a background goroutine on a timer to hot swap the fresh values in the cache based on the interval.
1 parent dc598d8 commit ac3a77b

File tree

2 files changed

+100
-33
lines changed

2 files changed

+100
-33
lines changed

tapdb/universe_stats.go

Lines changed: 77 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -449,29 +449,10 @@ func (u *UniverseStats) LogNewProofEvents(ctx context.Context,
449449
})
450450
}
451451

452-
// AggregateSyncStats returns stats aggregated over all assets within the
453-
// Universe.
454-
func (u *UniverseStats) AggregateSyncStats(
455-
ctx context.Context) (universe.AggregateStats, error) {
456-
457-
stats := u.statsSnapshot.Load()
458-
if stats != nil {
459-
u.statsCacheLogger.Hit()
460-
return *stats, nil
461-
}
462-
463-
u.statsMtx.Lock()
464-
defer u.statsMtx.Unlock()
465-
466-
// Check to see if the stats were loaded in while we were waiting for
467-
// the mutex.
468-
stats = u.statsSnapshot.Load()
469-
if stats != nil {
470-
u.statsCacheLogger.Hit()
471-
return *stats, nil
472-
}
473-
474-
u.statsCacheLogger.Miss()
452+
// querySyncStats is a helper function that's used to query the sync stats for
453+
// the Universe db.
454+
func (u *UniverseStats) querySyncStats(ctx context.Context,
455+
) (universe.AggregateStats, error) {
475456

476457
var dbStats universe.AggregateStats
477458

@@ -516,16 +497,83 @@ func (u *UniverseStats) AggregateSyncStats(
516497
return universe.AggregateStats{}, err
517498
}
518499

500+
return dbStats, nil
501+
}
502+
503+
// populateSyncStatsCache is used to populate the sync stats cache
504+
// periodically.
505+
//
506+
// NOTE: This MUST be run as the call back of a time.AfterFunc.
507+
func (u *UniverseStats) populateSyncStatsCache() {
508+
log.Infof("Refreshing stats cache, duration=%v", u.opts.cacheDuration)
509+
510+
// If this is a test, then we'll just purge the items.
511+
if u.opts.cacheDuration == 0 {
512+
u.statsSnapshot.Store(nil)
513+
return
514+
}
515+
516+
now := time.Now()
517+
518+
// To ensure the stats endpoint is always available, we'll repopulate
519+
// it async ourselves here. This ensures after the first miss, the
520+
// stats are always populated.
521+
ctx := context.Background()
522+
dbStats, err := u.querySyncStats(ctx)
523+
if err != nil {
524+
log.Warnf("Unable to refresh stats cache: %v", err)
525+
return
526+
}
527+
528+
log.Debugf("Refreshed stats cache, interval=%v, took=%v",
529+
u.opts.cacheDuration, time.Since(now))
530+
531+
u.statsSnapshot.Store(&dbStats)
532+
533+
// Reset the timer so we'll refresh again after the cache duration.
534+
if !u.statsRefresh.Stop() {
535+
<-u.statsRefresh.C
536+
}
537+
538+
u.statsRefresh.Reset(u.opts.cacheDuration)
539+
}
540+
541+
// AggregateSyncStats returns stats aggregated over all assets within the
542+
// Universe.
543+
func (u *UniverseStats) AggregateSyncStats(
544+
ctx context.Context) (universe.AggregateStats, error) {
545+
546+
stats := u.statsSnapshot.Load()
547+
if stats != nil {
548+
u.statsCacheLogger.Hit()
549+
return *stats, nil
550+
}
551+
552+
u.statsMtx.Lock()
553+
defer u.statsMtx.Unlock()
554+
555+
// Check to see if the stats were loaded in while we were waiting for
556+
// the mutex.
557+
stats = u.statsSnapshot.Load()
558+
if stats != nil {
559+
u.statsCacheLogger.Hit()
560+
return *stats, nil
561+
}
562+
563+
u.statsCacheLogger.Miss()
564+
565+
dbStats, err := u.querySyncStats(ctx)
566+
if err != nil {
567+
return dbStats, err
568+
}
569+
519570
// We'll store the DB stats then start our time after function to wipe
520571
// the stats pointer so we'll refresh it after a period of time.
521572
u.statsSnapshot.Store(&dbStats)
522573

523-
u.statsRefresh = time.AfterFunc(u.opts.cacheDuration, func() {
524-
log.Infof("Purging stats cache, duration=%v",
525-
u.opts.cacheDuration)
526-
527-
u.statsSnapshot.Store(nil)
528-
})
574+
u.statsRefresh = time.AfterFunc(
575+
u.opts.cacheDuration, u.populateSyncStatsCache,
576+
)
529577

530578
return dbStats, nil
531579
}

tapdb/universe_stats_test.go

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,19 @@ import (
44
"bytes"
55
"context"
66
"database/sql"
7+
"fmt"
78
"math/rand"
89
"sort"
910
"testing"
1011
"time"
1112

13+
"github.com/davecgh/go-spew/spew"
1214
"github.com/lightninglabs/taproot-assets/asset"
1315
"github.com/lightninglabs/taproot-assets/fn"
1416
"github.com/lightninglabs/taproot-assets/tapdb/sqlc"
1517
"github.com/lightninglabs/taproot-assets/universe"
1618
"github.com/lightningnetwork/lnd/clock"
19+
"github.com/lightningnetwork/lnd/lntest/wait"
1720
"github.com/stretchr/testify/require"
1821
)
1922

@@ -27,7 +30,7 @@ func newUniverseStatsWithDB(db *BaseDB, clock clock.Clock) (*UniverseStats,
2730
)
2831

2932
stats := NewUniverseStats(
30-
dbTxer, clock, WithStatsCacheDuration(time.Microsecond),
33+
dbTxer, clock, WithStatsCacheDuration(0),
3134
)
3235

3336
return stats, db
@@ -100,10 +103,26 @@ func (u *uniStatsHarness) logSyncEventByIndex(i int) {
100103
func (u *uniStatsHarness) assertUniverseStatsEqual(t *testing.T,
101104
stats universe.AggregateStats) {
102105

103-
uniStats, err := u.db.AggregateSyncStats(context.Background())
104-
require.NoError(t, err)
106+
var (
107+
uniStats universe.AggregateStats
108+
err error
109+
)
110+
111+
err = wait.NoError(func() error {
112+
uniStats, err = u.db.AggregateSyncStats(context.Background())
113+
if err != nil {
114+
return err
115+
}
105116

106-
require.Equal(t, uniStats, stats)
117+
if uniStats != stats {
118+
return fmt.Errorf("expected %v, got %v",
119+
spew.Sdump(stats),
120+
spew.Sdump(uniStats))
121+
}
122+
123+
return nil
124+
}, time.Second*2)
125+
require.NoError(t, err)
107126
}
108127

109128
// TestUniverseStatsEvents tests that we're able to properly insert, and also

0 commit comments

Comments
 (0)