Skip to content

Commit 2e2e196

Browse files
committed
pgwire: protect lastUpdateCache with a mutex
The cache is not safe for concurrent access so it needs to be protected with the mutex. Release note: None
1 parent e0ee23b commit 2e2e196

File tree

1 file changed

+26
-14
lines changed

1 file changed

+26
-14
lines changed

pkg/sql/pgwire/last_login_updater.go

Lines changed: 26 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ type lastLoginUpdater struct {
2727
group *singleflight.Group
2828
// execCfg is the executor configuration used for running update operations.
2929
execCfg *sql.ExecutorConfig
30-
// lastUpdateCache tracks the last time each user's login time was updated
31-
// to avoid redundant database updates.
32-
lastUpdateCache *cache.UnorderedCache
3330

3431
mu struct {
3532
syncutil.Mutex
3633
// pendingUsers tracks users that need their last login time updated
3734
// in the next singleflight operation.
3835
pendingUsers map[username.SQLUsername]struct{}
36+
// lastUpdateCache tracks the last time each user's login time was updated
37+
// to avoid redundant database updates.
38+
lastUpdateCache *cache.UnorderedCache
3939
}
4040
}
4141

@@ -69,11 +69,11 @@ func newLastLoginUpdater(execCfg *sql.ExecutorConfig) *lastLoginUpdater {
6969
}
7070

7171
u := &lastLoginUpdater{
72-
group: singleflight.NewGroup("update last login time", ""),
73-
execCfg: execCfg,
74-
lastUpdateCache: cache.NewUnorderedCache(cacheConfig),
72+
group: singleflight.NewGroup("update last login time", singleflight.NoTags),
73+
execCfg: execCfg,
7574
}
7675
u.mu.pendingUsers = make(map[username.SQLUsername]struct{})
76+
u.mu.lastUpdateCache = cache.NewUnorderedCache(cacheConfig)
7777
return u
7878
}
7979

@@ -87,13 +87,21 @@ func (u *lastLoginUpdater) updateLastLoginTime(ctx context.Context, dbUser usern
8787
}
8888

8989
// Check if we recently updated this user's login time.
90-
if lastUpdate, ok := u.lastUpdateCache.Get(dbUser); ok {
91-
if lastUpdateTime, ok := lastUpdate.(time.Time); ok {
92-
// Only update if it's been more than lastLoginEvictionTime since the last update.
93-
if timeutil.Since(lastUpdateTime) < lastLoginEvictionTime {
94-
return
90+
var recentlyUpdated bool
91+
func() {
92+
u.mu.Lock()
93+
defer u.mu.Unlock()
94+
if lastUpdate, ok := u.mu.lastUpdateCache.Get(dbUser); ok {
95+
if lastUpdateTime, ok := lastUpdate.(time.Time); ok {
96+
// Only update if it's been more than lastLoginEvictionTime since the last update.
97+
if timeutil.Since(lastUpdateTime) < lastLoginEvictionTime {
98+
recentlyUpdated = true
99+
}
95100
}
96101
}
102+
}()
103+
if recentlyUpdated {
104+
return
97105
}
98106

99107
// Use singleflight to ensure at most one last login time update batch
@@ -162,9 +170,13 @@ func (u *lastLoginUpdater) processPendingUpdates(ctx context.Context) error {
162170

163171
// Update the cache with the current time for all successfully updated users.
164172
now := timeutil.Now()
165-
for _, user := range users {
166-
u.lastUpdateCache.Add(user, now)
167-
}
173+
func() {
174+
u.mu.Lock()
175+
defer u.mu.Unlock()
176+
for _, user := range users {
177+
u.mu.lastUpdateCache.Add(user, now)
178+
}
179+
}()
168180

169181
return nil
170182
}

0 commit comments

Comments
 (0)