Skip to content

Commit 5f5c6a6

Browse files
craig[bot]kyle-a-wong
andcommitted
Merge #153295
153295: sql,server: add transaction diagnostic polling logic r=kyle-a-wong a=kyle-a-wong Adds logic to the TxnRegistry to poll for new transaction diagnostic requests. This is necessary to ensure that all nodes in a cluster are watching for transactions to capture diagnostic bundles for. The diagnostic polling is controlled by a new cluster setting, `sql.txn_diagnostics.poll_interval`, which can be set to 0 to disable polling. Note that this wont disable transaction diagnostics request handling all together, the gateway node handling a transaction diagnostics request will still have the request in its local registry. The poll requests implementation is the same as the implementation in statement_diagnostics.go for the statement diagnostics regsitry Resolves: [CRDB-54320](https://cockroachlabs.atlassian.net/browse/CRDB-54320) Epic: [CRDB-53541](https://cockroachlabs.atlassian.net/browse/CRDB-53541) Release note: None ---- Note: This is a stacked PR. Only the last commit should be reviewed Co-authored-by: Kyle Wong <[email protected]>
2 parents 8bb7919 + 137ac6f commit 5f5c6a6

File tree

7 files changed

+270
-72
lines changed

7 files changed

+270
-72
lines changed

pkg/server/server_sql.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ type SQLServer struct {
172172
statsRefresher *stats.Refresher
173173
temporaryObjectCleaner *sql.TemporaryObjectCleaner
174174
stmtDiagnosticsRegistry *stmtdiagnostics.Registry
175+
txnDiagnosticsRegistry *stmtdiagnostics.TxnRegistry
175176
sqlLivenessSessionID sqlliveness.SessionID
176177
sqlLivenessProvider sqlliveness.Provider
177178
sqlInstanceReader *instancestorage.Reader
@@ -1257,6 +1258,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
12571258
)
12581259
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry
12591260

1261+
txnDiagnosticsRegistry := stmtdiagnostics.NewTxnRegistry(cfg.internalDB,
1262+
cfg.Settings, stmtDiagnosticsRegistry, timeutil.DefaultTimeSource{})
1263+
execCfg.TxnDiagnosticsRecorder = txnDiagnosticsRegistry
1264+
12601265
var upgradeMgr *upgrademanager.Manager
12611266
{
12621267
var c upgrade.Cluster
@@ -1434,6 +1439,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
14341439
statsRefresher: statsRefresher,
14351440
temporaryObjectCleaner: temporaryObjectCleaner,
14361441
stmtDiagnosticsRegistry: stmtDiagnosticsRegistry,
1442+
txnDiagnosticsRegistry: txnDiagnosticsRegistry,
14371443
sqlLivenessProvider: cfg.sqlLivenessProvider,
14381444
sqlInstanceStorage: cfg.sqlInstanceStorage,
14391445
sqlInstanceReader: cfg.sqlInstanceReader,
@@ -1764,7 +1770,9 @@ func (s *SQLServer) preStart(
17641770
if err := s.statsRefresher.Start(ctx, stopper, stats.DefaultRefreshInterval); err != nil {
17651771
return err
17661772
}
1767-
s.stmtDiagnosticsRegistry.Start(ctx, stopper)
1773+
1774+
stmtdiagnostics.StartPolling(ctx, s.txnDiagnosticsRegistry, s.stmtDiagnosticsRegistry, stopper)
1775+
17681776
if err := s.execCfg.TableStatsCache.Start(ctx, s.execCfg.Codec, s.execCfg.RangeFeedFactory); err != nil {
17691777
return err
17701778
}

pkg/sql/exec_util.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1671,6 +1671,9 @@ type ExecutorConfig struct {
16711671
// StmtDiagnosticsRecorder deals with recording statement diagnostics.
16721672
StmtDiagnosticsRecorder *stmtdiagnostics.Registry
16731673

1674+
// TxnDiagnosticsRecorder deals with recording transaction diagnostics.
1675+
TxnDiagnosticsRecorder *stmtdiagnostics.TxnRegistry
1676+
16741677
ExternalIODirConfig base.ExternalIODirConfig
16751678

16761679
GCJobNotifier *gcjobnotifier.Notifier

pkg/sql/stmtdiagnostics/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ go_test(
6666
"//pkg/util/uuid",
6767
"@com_github_cockroachdb_errors//:errors",
6868
"@com_github_lib_pq//:pq",
69+
"@com_github_stretchr_testify//assert",
6970
"@com_github_stretchr_testify//require",
7071
],
7172
)

pkg/sql/stmtdiagnostics/statement_diagnostics.go

Lines changed: 73 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@ import (
2828

2929
var pollingInterval = settings.RegisterDurationSetting(
3030
settings.SystemVisible,
31-
"sql.stmt_diagnostics.poll_interval",
32-
"rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable",
31+
"sql.diagnostics.poll_interval",
32+
"rate at which the stmtdiagnostics registries polls for requests, set to zero to disable",
3333
10*time.Second,
34+
settings.WithRetiredName("sql.stmt_diagnostics.poll_interval"),
3435
)
3536

3637
var bundleChunkSize = settings.RegisterByteSizeSetting(
@@ -148,68 +149,6 @@ func NewRegistry(db isql.DB, st *cluster.Settings) *Registry {
148149
return r
149150
}
150151

151-
// Start will start the polling loop for the Registry.
152-
func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) {
153-
// The registry has the same lifetime as the server, so the cancellation
154-
// function can be ignored and it'll be called by the stopper.
155-
ctx, _ = stopper.WithCancelOnQuiesce(ctx) // nolint:quiesce
156-
157-
// Since background statement diagnostics collection is not under user
158-
// control, exclude it from cost accounting and control.
159-
ctx = multitenant.WithTenantCostControlExemption(ctx)
160-
161-
// NB: The only error that should occur here would be if the server were
162-
// shutting down so let's swallow it.
163-
_ = stopper.RunAsyncTask(ctx, "stmt-diag-poll", r.poll)
164-
}
165-
166-
func (r *Registry) poll(ctx context.Context) {
167-
var (
168-
timer timeutil.Timer
169-
lastPoll time.Time
170-
deadline time.Time
171-
pollIntervalChanged = make(chan struct{}, 1)
172-
maybeResetTimer = func() {
173-
if interval := pollingInterval.Get(&r.st.SV); interval == 0 {
174-
// Setting the interval to zero stops the polling.
175-
timer.Stop()
176-
} else {
177-
newDeadline := lastPoll.Add(interval)
178-
if deadline.IsZero() || !deadline.Equal(newDeadline) {
179-
deadline = newDeadline
180-
timer.Reset(timeutil.Until(deadline))
181-
}
182-
}
183-
}
184-
poll = func() {
185-
if err := r.pollRequests(ctx); err != nil {
186-
if ctx.Err() != nil {
187-
return
188-
}
189-
log.Dev.Warningf(ctx, "error polling for statement diagnostics requests: %s", err)
190-
}
191-
lastPoll = timeutil.Now()
192-
}
193-
)
194-
pollingInterval.SetOnChange(&r.st.SV, func(ctx context.Context) {
195-
select {
196-
case pollIntervalChanged <- struct{}{}:
197-
default:
198-
}
199-
})
200-
for {
201-
maybeResetTimer()
202-
select {
203-
case <-pollIntervalChanged:
204-
continue // go back around and maybe reset the timer
205-
case <-timer.C:
206-
case <-ctx.Done():
207-
return
208-
}
209-
poll()
210-
}
211-
}
212-
213152
type StmtDiagnostic struct {
214153
requestID RequestID
215154
req Request
@@ -733,9 +672,9 @@ func (r *Registry) innerInsertStatementDiagnostics(
733672
return diagID, nil
734673
}
735674

736-
// pollRequests reads the pending rows from system.statement_diagnostics_requests and
675+
// pollStmtRequests reads the pending rows from system.statement_diagnostics_requests and
737676
// updates r.mu.requests accordingly.
738-
func (r *Registry) pollRequests(ctx context.Context) error {
677+
func (r *Registry) pollStmtRequests(ctx context.Context) error {
739678
var rows []tree.Datums
740679

741680
// Loop until we run the query without straddling an epoch increment.
@@ -823,3 +762,71 @@ func (r *Registry) pollRequests(ctx context.Context) error {
823762
}
824763
return nil
825764
}
765+
766+
// StartPolling starts a background task that polls for new statement and
767+
// transaction requests and updates the corresponding registries.
768+
func StartPolling(ctx context.Context, tr *TxnRegistry, sr *Registry, stopper *stop.Stopper) {
769+
// The registry has the same lifetime as the server, so the cancellation
770+
// function can be ignored and it'll be called by the stopper.
771+
ctx, _ = stopper.WithCancelOnQuiesce(ctx) // nolint:quiesce
772+
773+
// Since background diagnostics collection is not under user
774+
// control, exclude it from cost accounting and control.
775+
ctx = multitenant.WithTenantCostControlExemption(ctx)
776+
777+
// NB: The only error that should occur here would be if the server were
778+
// shutting down so let's swallow it.
779+
_ = stopper.RunAsyncTask(ctx, "stmt-txn-diag-poll", func(ctx context.Context) {
780+
var (
781+
timer timeutil.Timer
782+
lastPoll time.Time
783+
deadline time.Time
784+
pollIntervalChanged = make(chan struct{}, 1)
785+
maybeResetTimer = func() {
786+
if interval := pollingInterval.Get(&sr.st.SV); interval == 0 {
787+
// Setting the interval to zero stops the polling.
788+
timer.Stop()
789+
} else {
790+
newDeadline := lastPoll.Add(interval)
791+
if deadline.IsZero() || !deadline.Equal(newDeadline) {
792+
deadline = newDeadline
793+
timer.Reset(timeutil.Until(deadline))
794+
}
795+
}
796+
}
797+
poll = func() {
798+
if err := tr.pollTxnRequests(ctx); err != nil {
799+
if ctx.Err() != nil {
800+
return
801+
}
802+
log.Ops.Warningf(ctx, "error polling for transaction diagnostics requests: %s", err)
803+
}
804+
if err := sr.pollStmtRequests(ctx); err != nil {
805+
if ctx.Err() != nil {
806+
return
807+
}
808+
log.Ops.Warningf(ctx, "error polling for statement diagnostics requests: %s", err)
809+
}
810+
lastPoll = timeutil.Now()
811+
}
812+
)
813+
814+
pollingInterval.SetOnChange(&sr.st.SV, func(ctx context.Context) {
815+
select {
816+
case pollIntervalChanged <- struct{}{}:
817+
default:
818+
}
819+
})
820+
for {
821+
maybeResetTimer()
822+
select {
823+
case <-pollIntervalChanged:
824+
continue // go back around and maybe reset the timer
825+
case <-timer.C:
826+
case <-ctx.Done():
827+
return
828+
}
829+
poll()
830+
}
831+
})
832+
}

pkg/sql/stmtdiagnostics/statement_diagnostics_test.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
3434
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3535
"github.com/cockroachdb/errors"
36+
"github.com/stretchr/testify/assert"
3637
"github.com/stretchr/testify/require"
3738
)
3839

@@ -692,3 +693,81 @@ func TestChangePollInterval(t *testing.T) {
692693
stmtdiagnostics.PollingInterval.Override(ctx, &settings.SV, 200*time.Microsecond)
693694
waitForScans(10) // ensure several scans occur
694695
}
696+
697+
func TestTxnRegistry_InsertTxnRequest_Polling(t *testing.T) {
698+
defer leaktest.AfterTest(t)()
699+
defer log.Scope(t).Close(t)
700+
701+
ctx := context.Background()
702+
settings := cluster.MakeTestingClusterSettings()
703+
// Set to 1s so that we can quickly pick up the request.
704+
stmtdiagnostics.PollingInterval.Override(ctx, &settings.SV, time.Second)
705+
706+
tc := serverutils.StartCluster(t, 3, base.TestClusterArgs{
707+
ServerArgs: base.TestServerArgs{
708+
Settings: settings,
709+
},
710+
})
711+
defer tc.Stopper().Stop(ctx)
712+
713+
s0 := tc.Server(0).ApplicationLayer()
714+
registry := s0.ExecutorConfig().(sql.ExecutorConfig).TxnDiagnosticsRecorder
715+
registry2 := tc.Server(1).ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig).TxnDiagnosticsRecorder
716+
registry3 := tc.Server(2).ApplicationLayer().ExecutorConfig().(sql.ExecutorConfig).TxnDiagnosticsRecorder
717+
718+
id, err := registry.InsertTxnRequestInternal(
719+
ctx,
720+
1111,
721+
[]uint64{1111, 2222, 3333},
722+
"testuser",
723+
0.5,
724+
time.Millisecond*100,
725+
0,
726+
false,
727+
)
728+
require.NoError(t, err)
729+
require.NotEqual(t, stmtdiagnostics.RequestID(0), id)
730+
731+
var expectedRequest, req stmtdiagnostics.TxnRequest
732+
var ok bool
733+
expectedRequest, ok = registry.GetRequest(id)
734+
require.True(t, ok)
735+
testutils.SucceedsSoon(t, func() error {
736+
737+
req, ok = registry2.GetRequest(id)
738+
if !ok {
739+
return errors.New("request not found on server 2")
740+
}
741+
if !assert.Equal(t, expectedRequest, req) {
742+
return errors.Newf("request on server2 doesnt match expected request. Expected: %+v, got: %+v", expectedRequest, req)
743+
}
744+
745+
req, ok = registry3.GetRequest(id)
746+
if !ok {
747+
return errors.New("request not found on server 3")
748+
}
749+
require.Equal(t, expectedRequest, req)
750+
if !assert.Equal(t, expectedRequest, req) {
751+
return errors.Newf("request on server3 doesnt match expected request. Expected: %+v, got: %+v", expectedRequest, req)
752+
}
753+
754+
return nil
755+
})
756+
757+
// mark the request as complete and ensure that it is removed from all 3 nodes
758+
runner := sqlutils.MakeSQLRunner(tc.ServerConn(0))
759+
runner.Exec(t, "UPDATE system.transaction_diagnostics_requests "+
760+
"SET completed = true, transaction_diagnostics_id = 12345 WHERE id = $1", id)
761+
testutils.SucceedsSoon(t, func() error {
762+
if _, ok = registry.GetRequest(id); ok {
763+
return errors.New("request still found on server 1")
764+
}
765+
if _, ok = registry2.GetRequest(id); ok {
766+
return errors.New("request still found on server 2")
767+
}
768+
if _, ok = registry3.GetRequest(id); ok {
769+
return errors.New("request still found on server 3")
770+
}
771+
return nil
772+
})
773+
}

0 commit comments

Comments
 (0)