Skip to content

Commit 2cf9e9c

Browse files
authored
Merge pull request #151149 from celiala/backportrelease-25.3-150991
release-25.3: pgwire: limit concurrent updates to last_login_time
2 parents 3bf4d17 + 2e2e196 commit 2cf9e9c

File tree

12 files changed

+260
-49
lines changed

12 files changed

+260
-49
lines changed

pkg/base/test_server_args.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,6 +635,10 @@ type TestSharedProcessTenantArgs struct {
635635
// TenantID is the ID of the tenant to be created. If not set, an ID is
636636
// assigned automatically.
637637
TenantID roachpb.TenantID
638+
// TenantReadOnly indicates if this tenant should be created as read-only
639+
// (for testing PCR reader tenants). This field is used for testing purposes
640+
// and overrides the tenant record check.
641+
TenantReadOnly bool
638642

639643
Knobs TestingKnobs
640644

pkg/server/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,6 +487,9 @@ type SQLConfig struct {
487487
TenantID roachpb.TenantID
488488
TenantName roachpb.TenantName
489489

490+
// TenantReadOnly indicates if this tenant is read-only (PCR reader tenant).
491+
TenantReadOnly bool
492+
490493
// If set, will to be called at server startup to obtain the tenant id and
491494
// locality.
492495
DelayedSetTenantID func(context.Context) (roachpb.TenantID, roachpb.Locality, error)
@@ -554,8 +557,9 @@ func MakeSQLConfig(
554557
tenID roachpb.TenantID, tenName roachpb.TenantName, tempStorageCfg base.TempStorageConfig,
555558
) SQLConfig {
556559
sqlCfg := SQLConfig{
557-
TenantID: tenID,
558-
TenantName: tenName,
560+
TenantID: tenID,
561+
TenantName: tenName,
562+
TenantReadOnly: false, // Default to false, will be set during tenant initialization
559563
}
560564
sqlCfg.SetDefaults(tempStorageCfg)
561565
return sqlCfg

pkg/server/server_controller_new_server.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,13 +64,18 @@ func (s *topLevelServer) newTenantServer(
6464
portStartHint int,
6565
testArgs base.TestSharedProcessTenantArgs,
6666
) (onDemandServer, error) {
67-
tenantID, err := s.getTenantID(ctx, tenantNameContainer.Get())
67+
tenantID, tenantReadOnly, err := s.getTenantID(ctx, tenantNameContainer.Get())
6868
if err != nil {
6969
return nil, err
7070
}
7171

72+
// Use test override for tenant read-only status if provided.
73+
if testArgs.TenantReadOnly {
74+
tenantReadOnly = true
75+
}
76+
7277
baseCfg, sqlCfg, err := s.makeSharedProcessTenantConfig(ctx, tenantID, tenantNameContainer.Get(), portStartHint,
73-
tenantStopper, testArgs.Settings)
78+
tenantStopper, testArgs.Settings, tenantReadOnly)
7479
if err != nil {
7580
return nil, err
7681
}
@@ -100,23 +105,27 @@ var ErrInvalidTenant error = errInvalidTenantMarker{}
100105

101106
func (s *topLevelServer) getTenantID(
102107
ctx context.Context, tenantName roachpb.TenantName,
103-
) (roachpb.TenantID, error) {
108+
) (roachpb.TenantID, bool, error) {
104109
var rec *mtinfopb.TenantInfo
105110
if err := s.sqlServer.internalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
106111
var err error
107112
rec, err = sql.GetTenantRecordByName(ctx, s.cfg.Settings, txn, tenantName)
108113
return err
109114
}); err != nil {
110-
return roachpb.TenantID{}, errors.Mark(err, ErrInvalidTenant)
115+
return roachpb.TenantID{}, false, errors.Mark(err, ErrInvalidTenant)
111116
}
112117

113118
tenantID, err := roachpb.MakeTenantID(rec.ID)
114119
if err != nil {
115-
return roachpb.TenantID{}, errors.Mark(
120+
return roachpb.TenantID{}, false, errors.Mark(
116121
errors.NewAssertionErrorWithWrappedErrf(err, "stored tenant ID %d does not convert to TenantID", rec.ID),
117122
ErrInvalidTenant)
118123
}
119-
return tenantID, nil
124+
125+
// Check if tenant is read-only (PCR reader tenant).
126+
readOnlyTenant := rec.ReadFromTenant != nil
127+
128+
return tenantID, readOnlyTenant, nil
120129
}
121130

122131
// newTenantServerInternal instantiates a server for the given target
@@ -151,6 +160,7 @@ func (s *topLevelServer) makeSharedProcessTenantConfig(
151160
portStartHint int,
152161
stopper *stop.Stopper,
153162
testSettings *cluster.Settings,
163+
tenantReadOnly bool,
154164
) (BaseConfig, SQLConfig, error) {
155165
// Create a configuration for the new tenant.
156166
parentCfg := s.cfg
@@ -167,7 +177,7 @@ func (s *topLevelServer) makeSharedProcessTenantConfig(
167177
}
168178

169179
baseCfg, sqlCfg, err := makeSharedProcessTenantServerConfig(ctx, tenantID, tenantName, portStartHint, parentCfg,
170-
localServerInfo, st, stopper, s.recorder)
180+
localServerInfo, st, stopper, s.recorder, tenantReadOnly)
171181
if err != nil {
172182
return BaseConfig{}, SQLConfig{}, err
173183
}
@@ -186,6 +196,7 @@ func makeSharedProcessTenantServerConfig(
186196
st *cluster.Settings,
187197
stopper *stop.Stopper,
188198
nodeMetricsRecorder *status.MetricsRecorder,
199+
tenantReadOnly bool,
189200
) (baseCfg BaseConfig, sqlCfg SQLConfig, err error) {
190201
tr := tracing.NewTracerWithOpt(ctx, tracing.WithClusterSettings(&st.SV))
191202

@@ -333,6 +344,7 @@ func makeSharedProcessTenantServerConfig(
333344
}
334345

335346
sqlCfg = MakeSQLConfig(tenantID, tenantName, tempStorageCfg)
347+
sqlCfg.TenantReadOnly = tenantReadOnly
336348
baseCfg.ExternalIODirConfig = kvServerCfg.BaseConfig.ExternalIODirConfig
337349

338350
baseCfg.ExternalIODir = kvServerCfg.BaseConfig.ExternalIODir

pkg/server/server_sql.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
10521052
RangeStatsFetcher: rangeStatsFetcher,
10531053
NodeDescs: cfg.nodeDescs,
10541054
TenantCapabilitiesReader: cfg.tenantCapabilitiesReader,
1055+
TenantReadOnly: cfg.SQLConfig.TenantReadOnly,
10551056
CidrLookup: cfg.BaseConfig.CidrLookup,
10561057
LicenseEnforcer: cfg.SQLConfig.LicenseEnforcer,
10571058
}

pkg/sql/exec_util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1677,6 +1677,9 @@ type ExecutorConfig struct {
16771677

16781678
TenantCapabilitiesReader SystemTenantOnly[tenantcapabilities.Reader]
16791679

1680+
// TenantReadOnly indicates if this tenant is read-only (PCR reader tenant).
1681+
TenantReadOnly bool
1682+
16801683
// VirtualClusterName contains the name of the virtual cluster
16811684
// (tenant).
16821685
VirtualClusterName roachpb.TenantName

pkg/sql/logictest/testdata/logic_test/user

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -240,9 +240,9 @@ onlyif config local-mixed-25.2
240240
statement ok
241241
DROP user testuser
242242

243-
onlyif config local-mixed-25.2
244243
user testuser nodeidx=0 newsession
245244

245+
onlyif config local-mixed-25.2
246246
statement error pq: password authentication failed for user testuser
247247
SHOW session_user
248248

@@ -259,7 +259,6 @@ skipif config local-mixed-25.2
259259
statement ok
260260
DROP user testuser
261261

262-
skipif config local-mixed-25.2
263262
user testuser nodeidx=0 newsession
264263

265264
# The logictest suite currently doesn't work for external authentication methods
@@ -271,39 +270,39 @@ skipif config local-mixed-25.2
271270
statement error pq: user identity unknown for identity provider
272271
SHOW session_user
273272

274-
subtest end
275-
276-
subtest validate_estimated_last_login_time
277-
278273
user root
279274

280275
statement ok
281276
CREATE user IF NOT EXISTS testuser
282277

283-
user testuser
278+
subtest end
279+
280+
subtest validate_estimated_last_login_time
281+
282+
user testuser2
284283

285284
# Since the logictest framework does not connect with the user until a command
286285
# is executed, we need to run a command before checking estimated_last_login_time.
287286
query T
288287
SHOW session_user
289288
----
290-
testuser
289+
testuser2
291290

292291
user root
293292

294293
# The estimated_last_login_time is not guaranteed to be populated synchronously,
295294
# so we poll until testuser's entry was updated with the last login time.
296295
skipif config local-mixed-25.2
297296
query I retry
298-
SELECT count(*) FROM system.users WHERE estimated_last_login_time IS NOT NULL AND username = 'testuser'
297+
SELECT count(*) FROM system.users WHERE estimated_last_login_time IS NOT NULL AND username = 'testuser2'
299298
----
300299
1
301300

302301
user root
303302

304303
onlyif config local-mixed-25.2
305304
statement error pq: column "estimated_last_login_time" does not exist
306-
select estimated_last_login_time from [SHOW USERS] where username = 'testuser';
305+
select estimated_last_login_time from [SHOW USERS] where username = 'testuser2';
307306

308307
subtest end
309308

pkg/sql/pgwire/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ go_library(
1212
"conn.go",
1313
"hba_conf.go",
1414
"ident_map_conf.go",
15+
"last_login_updater.go",
1516
"pre_serve.go",
1617
"pre_serve_options.go",
1718
"role_mapper.go",
@@ -63,6 +64,7 @@ go_library(
6364
"//pkg/sql/sqltelemetry",
6465
"//pkg/sql/types",
6566
"//pkg/util",
67+
"//pkg/util/cache",
6668
"//pkg/util/ctxlog",
6769
"//pkg/util/duration",
6870
"//pkg/util/envutil",
@@ -81,6 +83,7 @@ go_library(
8183
"//pkg/util/ring",
8284
"//pkg/util/stop",
8385
"//pkg/util/syncutil",
86+
"//pkg/util/syncutil/singleflight",
8487
"//pkg/util/system",
8588
"//pkg/util/timeofday",
8689
"//pkg/util/timeutil",

pkg/sql/pgwire/auth.go

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,17 @@ type authOptions struct {
9090
// authentication and update c.sessionArgs with the authenticated user's name,
9191
// if different from the one given initially.
9292
func (c *conn) handleAuthentication(
93-
ctx context.Context, ac AuthConn, authOpt authOptions, execCfg *sql.ExecutorConfig,
93+
ctx context.Context, ac AuthConn, authOpt authOptions, server *Server,
9494
) (connClose func(), _ error) {
9595
if authOpt.testingSkipAuth {
9696
return nil, nil
9797
}
9898
if authOpt.testingAuthHook != nil {
9999
return nil, authOpt.testingAuthHook(ctx)
100100
}
101+
// Get execCfg from the server.
102+
execCfg := server.execCfg
103+
101104
// To book-keep the authentication start time.
102105
authStartTime := timeutil.Now()
103106

@@ -278,28 +281,11 @@ func (c *conn) handleAuthentication(
278281
duration := timeutil.Since(authStartTime).Nanoseconds()
279282
c.publishConnLatencyMetric(duration, hbaEntry.Method.String())
280283

281-
c.populateLastLoginTime(ctx, execCfg, dbUser)
284+
server.lastLoginUpdater.updateLastLoginTime(ctx, dbUser)
282285

283286
return connClose, nil
284287
}
285288

286-
// populateLastLoginTime updates the last login time for the sql user
287-
// asynchronously. This not guaranteed to succeed and we log any errors obtained
288-
// from the update transaction to the DEV channel.
289-
func (c *conn) populateLastLoginTime(
290-
ctx context.Context, execCfg *sql.ExecutorConfig, dbUser username.SQLUsername,
291-
) {
292-
// Update last login time in async. This is done asynchronously to avoid
293-
// blocking the connection.
294-
if err := execCfg.Stopper.RunAsyncTask(ctx, "write_last_login_time", func(ctx context.Context) {
295-
if err := sql.UpdateLastLoginTime(ctx, execCfg, dbUser.SQLIdentifier()); err != nil {
296-
log.Warningf(ctx, "failed to update last login time for user %s: %v", dbUser, err)
297-
}
298-
}); err != nil {
299-
log.Warningf(ctx, "failed to create async task to update last login time for user %s: %v", dbUser, err)
300-
}
301-
}
302-
303289
// publishConnLatencyMetric publishes the latency of the connection
304290
// based on the authentication method.
305291
func (c *conn) publishConnLatencyMetric(duration int64, authMethod string) {

pkg/sql/pgwire/conn.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func (c *conn) processCommands(
151151
ctx context.Context,
152152
authOpt authOptions,
153153
ac AuthConn,
154-
sqlServer *sql.Server,
154+
server *Server,
155155
reserved *mon.BoundAccount,
156156
onDefaultIntSizeChange func(newSize int32),
157157
sessionID clusterunique.ID,
@@ -175,7 +175,7 @@ func (c *conn) processCommands(
175175
// network read on the connection's goroutine.
176176
c.cancelConn()
177177

178-
pgwireKnobs := sqlServer.GetExecutorConfig().PGWireTestingKnobs
178+
pgwireKnobs := server.SQLServer.GetExecutorConfig().PGWireTestingKnobs
179179
if pgwireKnobs != nil && pgwireKnobs.CatchPanics {
180180
if r := recover(); r != nil {
181181
// Catch the panic and return it to the client as an error.
@@ -203,14 +203,14 @@ func (c *conn) processCommands(
203203

204204
// Authenticate the connection.
205205
if connCloseAuthHandler, retErr = c.handleAuthentication(
206-
ctx, ac, authOpt, sqlServer.GetExecutorConfig(),
206+
ctx, ac, authOpt, server,
207207
); retErr != nil {
208208
// Auth failed or some other error.
209209
return
210210
}
211211

212212
var decrementConnectionCount func()
213-
if decrementConnectionCount, retErr = sqlServer.IncrementConnectionCount(c.sessionArgs); retErr != nil {
213+
if decrementConnectionCount, retErr = server.SQLServer.IncrementConnectionCount(c.sessionArgs); retErr != nil {
214214
// This will return pgcode.TooManyConnections which is used by the sql proxy
215215
// to skip failed auth throttle (as in this case the auth was fine but the
216216
// error occurred before sending back auth ok msg)
@@ -224,7 +224,7 @@ func (c *conn) processCommands(
224224
}
225225

226226
// Inform the client of the default session settings.
227-
connHandler, retErr = c.sendInitialConnData(ctx, sqlServer, onDefaultIntSizeChange, sessionID)
227+
connHandler, retErr = c.sendInitialConnData(ctx, server.SQLServer, onDefaultIntSizeChange, sessionID)
228228
if retErr != nil {
229229
return
230230
}
@@ -250,7 +250,7 @@ func (c *conn) processCommands(
250250

251251
// Now actually process commands.
252252
reservedOwned = false // We're about to pass ownership away.
253-
retErr = sqlServer.ServeConn(
253+
retErr = server.SQLServer.ServeConn(
254254
ctx,
255255
connHandler,
256256
reserved,

0 commit comments

Comments
 (0)