@@ -28,9 +28,10 @@ import (
28
28
29
29
var pollingInterval = settings .RegisterDurationSetting (
30
30
settings .SystemVisible ,
31
- "sql.stmt_diagnostics .poll_interval" ,
32
- "rate at which the stmtdiagnostics.Registry polls for requests, set to zero to disable" ,
31
+ "sql.diagnostics .poll_interval" ,
32
+ "rate at which the stmtdiagnostics registries polls for requests, set to zero to disable" ,
33
33
10 * time .Second ,
34
+ settings .WithRetiredName ("sql.stmt_diagnostics.poll_interval" ),
34
35
)
35
36
36
37
var bundleChunkSize = settings .RegisterByteSizeSetting (
@@ -148,68 +149,6 @@ func NewRegistry(db isql.DB, st *cluster.Settings) *Registry {
148
149
return r
149
150
}
150
151
151
- // Start will start the polling loop for the Registry.
152
- func (r * Registry ) Start (ctx context.Context , stopper * stop.Stopper ) {
153
- // The registry has the same lifetime as the server, so the cancellation
154
- // function can be ignored and it'll be called by the stopper.
155
- ctx , _ = stopper .WithCancelOnQuiesce (ctx ) // nolint:quiesce
156
-
157
- // Since background statement diagnostics collection is not under user
158
- // control, exclude it from cost accounting and control.
159
- ctx = multitenant .WithTenantCostControlExemption (ctx )
160
-
161
- // NB: The only error that should occur here would be if the server were
162
- // shutting down so let's swallow it.
163
- _ = stopper .RunAsyncTask (ctx , "stmt-diag-poll" , r .poll )
164
- }
165
-
166
- func (r * Registry ) poll (ctx context.Context ) {
167
- var (
168
- timer timeutil.Timer
169
- lastPoll time.Time
170
- deadline time.Time
171
- pollIntervalChanged = make (chan struct {}, 1 )
172
- maybeResetTimer = func () {
173
- if interval := pollingInterval .Get (& r .st .SV ); interval == 0 {
174
- // Setting the interval to zero stops the polling.
175
- timer .Stop ()
176
- } else {
177
- newDeadline := lastPoll .Add (interval )
178
- if deadline .IsZero () || ! deadline .Equal (newDeadline ) {
179
- deadline = newDeadline
180
- timer .Reset (timeutil .Until (deadline ))
181
- }
182
- }
183
- }
184
- poll = func () {
185
- if err := r .pollRequests (ctx ); err != nil {
186
- if ctx .Err () != nil {
187
- return
188
- }
189
- log .Dev .Warningf (ctx , "error polling for statement diagnostics requests: %s" , err )
190
- }
191
- lastPoll = timeutil .Now ()
192
- }
193
- )
194
- pollingInterval .SetOnChange (& r .st .SV , func (ctx context.Context ) {
195
- select {
196
- case pollIntervalChanged <- struct {}{}:
197
- default :
198
- }
199
- })
200
- for {
201
- maybeResetTimer ()
202
- select {
203
- case <- pollIntervalChanged :
204
- continue // go back around and maybe reset the timer
205
- case <- timer .C :
206
- case <- ctx .Done ():
207
- return
208
- }
209
- poll ()
210
- }
211
- }
212
-
213
152
type StmtDiagnostic struct {
214
153
requestID RequestID
215
154
req Request
@@ -733,9 +672,9 @@ func (r *Registry) innerInsertStatementDiagnostics(
733
672
return diagID , nil
734
673
}
735
674
736
- // pollRequests reads the pending rows from system.statement_diagnostics_requests and
675
+ // pollStmtRequests reads the pending rows from system.statement_diagnostics_requests and
737
676
// updates r.mu.requests accordingly.
738
- func (r * Registry ) pollRequests (ctx context.Context ) error {
677
+ func (r * Registry ) pollStmtRequests (ctx context.Context ) error {
739
678
var rows []tree.Datums
740
679
741
680
// Loop until we run the query without straddling an epoch increment.
@@ -823,3 +762,71 @@ func (r *Registry) pollRequests(ctx context.Context) error {
823
762
}
824
763
return nil
825
764
}
765
+
766
+ // StartPolling starts a background task that polls for new statement and
767
+ // transaction requests and updates the corresponding registries.
768
+ func StartPolling (ctx context.Context , tr * TxnRegistry , sr * Registry , stopper * stop.Stopper ) {
769
+ // The registry has the same lifetime as the server, so the cancellation
770
+ // function can be ignored and it'll be called by the stopper.
771
+ ctx , _ = stopper .WithCancelOnQuiesce (ctx ) // nolint:quiesce
772
+
773
+ // Since background diagnostics collection is not under user
774
+ // control, exclude it from cost accounting and control.
775
+ ctx = multitenant .WithTenantCostControlExemption (ctx )
776
+
777
+ // NB: The only error that should occur here would be if the server were
778
+ // shutting down so let's swallow it.
779
+ _ = stopper .RunAsyncTask (ctx , "stmt-txn-diag-poll" , func (ctx context.Context ) {
780
+ var (
781
+ timer timeutil.Timer
782
+ lastPoll time.Time
783
+ deadline time.Time
784
+ pollIntervalChanged = make (chan struct {}, 1 )
785
+ maybeResetTimer = func () {
786
+ if interval := pollingInterval .Get (& sr .st .SV ); interval == 0 {
787
+ // Setting the interval to zero stops the polling.
788
+ timer .Stop ()
789
+ } else {
790
+ newDeadline := lastPoll .Add (interval )
791
+ if deadline .IsZero () || ! deadline .Equal (newDeadline ) {
792
+ deadline = newDeadline
793
+ timer .Reset (timeutil .Until (deadline ))
794
+ }
795
+ }
796
+ }
797
+ poll = func () {
798
+ if err := tr .pollTxnRequests (ctx ); err != nil {
799
+ if ctx .Err () != nil {
800
+ return
801
+ }
802
+ log .Ops .Warningf (ctx , "error polling for transaction diagnostics requests: %s" , err )
803
+ }
804
+ if err := sr .pollStmtRequests (ctx ); err != nil {
805
+ if ctx .Err () != nil {
806
+ return
807
+ }
808
+ log .Ops .Warningf (ctx , "error polling for statement diagnostics requests: %s" , err )
809
+ }
810
+ lastPoll = timeutil .Now ()
811
+ }
812
+ )
813
+
814
+ pollingInterval .SetOnChange (& sr .st .SV , func (ctx context.Context ) {
815
+ select {
816
+ case pollIntervalChanged <- struct {}{}:
817
+ default :
818
+ }
819
+ })
820
+ for {
821
+ maybeResetTimer ()
822
+ select {
823
+ case <- pollIntervalChanged :
824
+ continue // go back around and maybe reset the timer
825
+ case <- timer .C :
826
+ case <- ctx .Done ():
827
+ return
828
+ }
829
+ poll ()
830
+ }
831
+ })
832
+ }
0 commit comments