Skip to content

Commit 76bf8cc

Browse files
committed
pgwire: limit concurrent updates to last_login_time
This patch uses singleflight to ensure that each node only has one request in flight for updating the last_login_time. If there already is a request in flight, then that user is added to a set of pending users to update, and their time gets populated in the next singleflight. This will reduce write traffic to the system.users table. This includes a minor refactor to separate the login_time updating logic into a separate component. Release note: None
1 parent 902a44c commit 76bf8cc

File tree

5 files changed

+125
-26
lines changed

5 files changed

+125
-26
lines changed

pkg/sql/pgwire/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ go_library(
1212
"conn.go",
1313
"hba_conf.go",
1414
"ident_map_conf.go",
15+
"last_login_updater.go",
1516
"pre_serve.go",
1617
"pre_serve_options.go",
1718
"role_mapper.go",
@@ -81,6 +82,7 @@ go_library(
8182
"//pkg/util/ring",
8283
"//pkg/util/stop",
8384
"//pkg/util/syncutil",
85+
"//pkg/util/syncutil/singleflight",
8486
"//pkg/util/system",
8587
"//pkg/util/timeofday",
8688
"//pkg/util/timeutil",

pkg/sql/pgwire/auth.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,17 @@ type authOptions struct {
9090
// authentication and update c.sessionArgs with the authenticated user's name,
9191
// if different from the one given initially.
9292
func (c *conn) handleAuthentication(
93-
ctx context.Context, ac AuthConn, authOpt authOptions, execCfg *sql.ExecutorConfig,
93+
ctx context.Context, ac AuthConn, authOpt authOptions, server *Server,
9494
) (connClose func(), _ error) {
9595
if authOpt.testingSkipAuth {
9696
return nil, nil
9797
}
9898
if authOpt.testingAuthHook != nil {
9999
return nil, authOpt.testingAuthHook(ctx)
100100
}
101+
// Get execCfg from the server.
102+
execCfg := server.execCfg
103+
101104
// To book-keep the authentication start time.
102105
authStartTime := timeutil.Now()
103106

@@ -278,28 +281,11 @@ func (c *conn) handleAuthentication(
278281
duration := timeutil.Since(authStartTime).Nanoseconds()
279282
c.publishConnLatencyMetric(duration, hbaEntry.Method.String())
280283

281-
c.populateLastLoginTime(ctx, execCfg, dbUser)
284+
server.lastLoginUpdater.updateLastLoginTime(ctx, dbUser)
282285

283286
return connClose, nil
284287
}
285288

286-
// populateLastLoginTime updates the last login time for the sql user
287-
// asynchronously. This not guaranteed to succeed and we log any errors obtained
288-
// from the update transaction to the DEV channel.
289-
func (c *conn) populateLastLoginTime(
290-
ctx context.Context, execCfg *sql.ExecutorConfig, dbUser username.SQLUsername,
291-
) {
292-
// Update last login time in async. This is done asynchronously to avoid
293-
// blocking the connection.
294-
if err := execCfg.Stopper.RunAsyncTask(ctx, "write_last_login_time", func(ctx context.Context) {
295-
if err := sql.UpdateLastLoginTime(ctx, execCfg, dbUser.SQLIdentifier()); err != nil {
296-
log.Warningf(ctx, "failed to update last login time for user %s: %v", dbUser, err)
297-
}
298-
}); err != nil {
299-
log.Warningf(ctx, "failed to create async task to update last login time for user %s: %v", dbUser, err)
300-
}
301-
}
302-
303289
// publishConnLatencyMetric publishes the latency of the connection
304290
// based on the authentication method.
305291
func (c *conn) publishConnLatencyMetric(duration int64, authMethod string) {

pkg/sql/pgwire/conn.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (c *conn) processCommands(
151151
ctx context.Context,
152152
authOpt authOptions,
153153
ac AuthConn,
154-
sqlServer *sql.Server,
154+
server *Server,
155155
reserved *mon.BoundAccount,
156156
onDefaultIntSizeChange func(newSize int32),
157157
sessionID clusterunique.ID,
@@ -175,7 +175,7 @@ func (c *conn) processCommands(
175175
// network read on the connection's goroutine.
176176
c.cancelConn()
177177

178-
pgwireKnobs := sqlServer.GetExecutorConfig().PGWireTestingKnobs
178+
pgwireKnobs := server.SQLServer.GetExecutorConfig().PGWireTestingKnobs
179179
if pgwireKnobs != nil && pgwireKnobs.CatchPanics {
180180
if r := recover(); r != nil {
181181
// Catch the panic and return it to the client as an error.
@@ -203,14 +203,14 @@ func (c *conn) processCommands(
203203

204204
// Authenticate the connection.
205205
if connCloseAuthHandler, retErr = c.handleAuthentication(
206-
ctx, ac, authOpt, sqlServer.GetExecutorConfig(),
206+
ctx, ac, authOpt, server,
207207
); retErr != nil {
208208
// Auth failed or some other error.
209209
return
210210
}
211211

212212
var decrementConnectionCount func()
213-
if decrementConnectionCount, retErr = sqlServer.IncrementConnectionCount(c.sessionArgs); retErr != nil {
213+
if decrementConnectionCount, retErr = server.SQLServer.IncrementConnectionCount(c.sessionArgs); retErr != nil {
214214
// This will return pgcode.TooManyConnections which is used by the sql proxy
215215
// to skip failed auth throttle (as in this case the auth was fine but the
216216
// error occurred before sending back auth ok msg)
@@ -224,7 +224,7 @@ func (c *conn) processCommands(
224224
}
225225

226226
// Inform the client of the default session settings.
227-
connHandler, retErr = c.sendInitialConnData(ctx, sqlServer, onDefaultIntSizeChange, sessionID)
227+
connHandler, retErr = c.sendInitialConnData(ctx, server.SQLServer, onDefaultIntSizeChange, sessionID)
228228
if retErr != nil {
229229
return
230230
}
@@ -250,7 +250,7 @@ func (c *conn) processCommands(
250250

251251
// Now actually process commands.
252252
reservedOwned = false // We're about to pass ownership away.
253-
retErr = sqlServer.ServeConn(
253+
retErr = server.SQLServer.ServeConn(
254254
ctx,
255255
connHandler,
256256
reserved,
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package pgwire
7+
8+
import (
9+
"context"
10+
11+
"github.com/cockroachdb/cockroach/pkg/security/username"
12+
"github.com/cockroachdb/cockroach/pkg/sql"
13+
"github.com/cockroachdb/cockroach/pkg/util/log"
14+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
15+
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
16+
)
17+
18+
// lastLoginUpdater handles updating the last login time for SQL users with
19+
// deduplication via singleflight to reduce concurrent updates.
20+
type lastLoginUpdater struct {
21+
// group ensures that there is at most one last login time update
22+
// in-flight at any given time.
23+
group *singleflight.Group
24+
// execCfg is the executor configuration used for running update operations.
25+
execCfg *sql.ExecutorConfig
26+
27+
mu struct {
28+
syncutil.Mutex
29+
// pendingUsers tracks users that need their last login time updated
30+
// in the next singleflight operation.
31+
pendingUsers map[username.SQLUsername]struct{}
32+
}
33+
}
34+
35+
// newLastLoginUpdater creates a new lastLoginUpdater instance.
36+
func newLastLoginUpdater(execCfg *sql.ExecutorConfig) *lastLoginUpdater {
37+
u := &lastLoginUpdater{
38+
group: singleflight.NewGroup("update last login time", ""),
39+
execCfg: execCfg,
40+
}
41+
u.mu.pendingUsers = make(map[username.SQLUsername]struct{})
42+
return u
43+
}
44+
45+
// updateLastLoginTime updates the last login time for the SQL user
46+
// asynchronously. This is not guaranteed to succeed and we log any errors
47+
// obtained from the update transaction to the DEV channel.
48+
func (u *lastLoginUpdater) updateLastLoginTime(ctx context.Context, dbUser username.SQLUsername) {
49+
// Avoid updating if we're in a read-only tenant.
50+
if u.execCfg.TenantReadOnly {
51+
return
52+
}
53+
54+
// Use singleflight to ensure at most one last login time update batch
55+
// is in-flight at any given time.
56+
future, leader := u.group.DoChan(ctx, "UpdateLastLoginTime",
57+
singleflight.DoOpts{
58+
Stop: u.execCfg.Stopper,
59+
InheritCancelation: false,
60+
},
61+
func(ctx context.Context) (interface{}, error) {
62+
// The leader adds itself to pending users, and processes all
63+
// pending updates.
64+
u.mu.Lock()
65+
u.mu.pendingUsers[dbUser] = struct{}{}
66+
u.mu.Unlock()
67+
return nil, u.processPendingUpdates(ctx)
68+
})
69+
70+
// Add this user to the pending set if not the leader,
71+
if !leader {
72+
u.mu.Lock()
73+
u.mu.pendingUsers[dbUser] = struct{}{}
74+
u.mu.Unlock()
75+
} else {
76+
// Leader waits for the result in an async task to avoid blocking authentication
77+
if err := u.execCfg.Stopper.RunAsyncTask(ctx, "wait_last_login_update", func(ctx context.Context) {
78+
result := future.WaitForResult(ctx)
79+
if result.Err != nil {
80+
log.Warningf(ctx, "could not update last login times: %v", result.Err)
81+
}
82+
}); err != nil {
83+
log.Warningf(ctx, "could not create async task to wait for last login update: %v", err)
84+
}
85+
}
86+
}
87+
88+
// processPendingUpdates processes all users in the pending set and clears it.
89+
func (u *lastLoginUpdater) processPendingUpdates(ctx context.Context) error {
90+
u.mu.Lock()
91+
users := make([]username.SQLUsername, 0, len(u.mu.pendingUsers))
92+
for user := range u.mu.pendingUsers {
93+
users = append(users, user)
94+
}
95+
// Clear the pending users set.
96+
u.mu.pendingUsers = make(map[username.SQLUsername]struct{})
97+
u.mu.Unlock()
98+
99+
// Update last login time for all pending users.
100+
for _, user := range users {
101+
if err := sql.UpdateLastLoginTime(ctx, u.execCfg, user.SQLIdentifier()); err != nil {
102+
return err
103+
}
104+
}
105+
return nil
106+
}

pkg/sql/pgwire/server.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,10 @@ type Server struct {
276276

277277
destinationMetrics destinationAggMetrics
278278

279+
// lastLoginUpdater handles updating the last login time for SQL users
280+
// with deduplication to reduce concurrent updates for the same user.
281+
lastLoginUpdater *lastLoginUpdater
282+
279283
mu struct {
280284
syncutil.Mutex
281285
// connCancelMap entries represent connections started when the server
@@ -444,6 +448,7 @@ func MakeServer(
444448
BytesInCount: aggmetric.NewCounter(MetaBytesIn, "remote"),
445449
BytesOutCount: aggmetric.NewCounter(MetaBytesOut, "remote"),
446450
},
451+
lastLoginUpdater: newLastLoginUpdater(executorConfig),
447452
}
448453
server.sqlMemoryPool = mon.NewMonitor(mon.Options{
449454
Name: mon.MakeName("sql"),
@@ -1210,7 +1215,7 @@ func (s *Server) serveImpl(
12101215
ctx,
12111216
authOpt,
12121217
authPipe,
1213-
sqlServer,
1218+
s,
12141219
reserved,
12151220
onDefaultIntSizeChange,
12161221
sessionID,

0 commit comments

Comments
 (0)