Skip to content

Commit 56a6d37

Browse files
craig[bot]jasonlmfong
andcommitted
Merge #157226
157226: ts: introduce a new 1 minute poller to write into tsdb r=jasonlmfong a=jasonlmfong This change adds a new 1 minute resolution and the machinery around rollup and storage. Also adds a new poller at server start-up time, this poller currently performs a no-op every 1 minute. Epic: CRDB-55079 Release: None Co-authored-by: Jason Fong <[email protected]>
2 parents a8320b5 + 69a9130 commit 56a6d37

File tree

13 files changed

+256
-28
lines changed

13 files changed

+256
-28
lines changed

docs/generated/settings/settings-for-tenants.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ storage.sstable.compression_algorithm enumeration fastest determines the compres
418418
storage.sstable.compression_algorithm_backup_storage enumeration fastest determines the compression algorithm to use when compressing sstable data blocks for backup row data storage (fast,balanced,good are experimental); [snappy = 1, zstd = 2, none = 3, minlz = 4, fastest = 5, fast = 6, balanced = 7, good = 8] system-visible
419419
storage.sstable.compression_algorithm_backup_transport enumeration fastest determines the compression algorithm to use when compressing sstable data blocks for backup transport (fast,balanced,good are experimental); [snappy = 1, zstd = 2, none = 3, minlz = 4, fastest = 5, fast = 6, balanced = 7, good = 8] system-visible
420420
timeseries.storage.resolution_10s.ttl duration 240h0m0s the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion. system-visible
421+
timeseries.storage.resolution_1m.ttl duration 240h0m0s the maximum age of time series data stored at the 1 minute resolution. Data older than this is subject to rollup and deletion. system-visible
421422
timeseries.storage.resolution_30m.ttl duration 2160h0m0s the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion. system-visible
422423
trace.debug_http_endpoint.enabled (alias: trace.debug.enable) boolean false if set, traces for recent requests can be seen at https://<ui>/debug/requests application
423424
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used. application

docs/generated/settings/settings.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@
377377
<tr><td><div id="setting-storage-wal-failover-unhealthy-op-threshold" class="anchored"><code>storage.wal_failover.unhealthy_op_threshold</code></div></td><td>duration</td><td><code>100ms</code></td><td>the latency of a WAL write considered unhealthy and triggers a failover to a secondary WAL location</td><td>Advanced/Self-Hosted</td></tr>
378378
<tr><td><div id="setting-timeseries-storage-enabled" class="anchored"><code>timeseries.storage.enabled</code></div></td><td>boolean</td><td><code>true</code></td><td>if set, periodic timeseries data is stored within the cluster; disabling is not recommended unless you are storing the data elsewhere</td><td>Advanced/Self-Hosted</td></tr>
379379
<tr><td><div id="setting-timeseries-storage-resolution-10s-ttl" class="anchored"><code>timeseries.storage.resolution_10s.ttl</code></div></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 10 second resolution. Data older than this is subject to rollup and deletion.</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
380+
<tr><td><div id="setting-timeseries-storage-resolution-1m-ttl" class="anchored"><code>timeseries.storage.resolution_1m.ttl</code></div></td><td>duration</td><td><code>240h0m0s</code></td><td>the maximum age of time series data stored at the 1 minute resolution. Data older than this is subject to rollup and deletion.</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
380381
<tr><td><div id="setting-timeseries-storage-resolution-30m-ttl" class="anchored"><code>timeseries.storage.resolution_30m.ttl</code></div></td><td>duration</td><td><code>2160h0m0s</code></td><td>the maximum age of time series data stored at the 30 minute resolution. Data older than this is subject to deletion.</td><td>Advanced/Self-hosted (read-write); Basic/Standard (read-only)</td></tr>
381382
<tr><td><div id="setting-trace-debug-enable" class="anchored"><code>trace.debug_http_endpoint.enabled<br />(alias: trace.debug.enable)</code></div></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen at https://&lt;ui&gt;/debug/requests</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>
382383
<tr><td><div id="setting-trace-opentelemetry-collector" class="anchored"><code>trace.opentelemetry.collector</code></div></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as &lt;host&gt;:&lt;port&gt;. If no port is specified, 4317 will be used.</td><td>Basic/Standard/Advanced/Self-Hosted</td></tr>

pkg/base/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ const (
8888
// contributes meaningfully to the average, while earlier measurements have
8989
// diminishing impact.
9090
DefaultCPUUsageMovingAverageAge = 20
91+
92+
// DefaultHighCardinalityMetricsSampleInterval is the default interval for
93+
// sampling low-frequency high-cardinality metrics.
94+
DefaultHighCardinalityMetricsSampleInterval = time.Minute
9195
)
9296

9397
// DefaultCertsDirectory is the default value for the cert directory flag.

pkg/server/server.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2013,7 +2013,12 @@ func (s *topLevelServer) PreStart(ctx context.Context) error {
20132013
// The writes will be async; we'll wait for the first one to go through
20142014
// later in this method, using the returned channel.
20152015
firstTSDBPollDone := s.tsDB.PollSource(
2016-
s.cfg.AmbientCtx, s.recorder, base.DefaultMetricsSampleInterval, ts.Resolution10s, s.stopper,
2016+
s.cfg.AmbientCtx, s.recorder, base.DefaultMetricsSampleInterval, ts.Resolution10s, s.stopper, false,
2017+
)
2018+
2019+
// high cardinality child metrics collector, we don't need to wait for the first one
2020+
s.tsDB.PollSource(
2021+
s.cfg.AmbientCtx, s.recorder, base.DefaultHighCardinalityMetricsSampleInterval, ts.Resolution1m, s.stopper, true,
20172022
)
20182023

20192024
// Export statistics to graphite, if enabled by configuration.

pkg/server/status/recorder.go

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,17 @@ var bugfix149481Enabled = settings.RegisterBoolSetting(
108108
true,
109109
settings.WithVisibility(settings.Reserved))
110110

111+
// ChildMetricsStorageEnabled controls whether to record high-cardinality child metrics
112+
// into the time series database. This is separate from ChildMetricsEnabled which controls
113+
// Prometheus exports, allowing independent control of child metrics recording vs export.
114+
// This setting enables debugging of changefeeds and should not be considered functionality
115+
// to expand support for.
116+
var ChildMetricsStorageEnabled = settings.RegisterBoolSetting(
117+
settings.ApplicationLevel, "timeseries.child_metrics.enabled",
118+
"enables the collection of high-cardinality child metrics into the time series database",
119+
false,
120+
settings.WithVisibility(settings.Reserved))
121+
111122
// MetricsRecorder is used to periodically record the information in a number of
112123
// metric registries.
113124
//
@@ -428,7 +439,7 @@ func (mr *MetricsRecorder) ExportToGraphite(
428439
// GetTimeSeriesData serializes registered metrics for consumption by
429440
// CockroachDB's time series system. GetTimeSeriesData implements the DataSource
430441
// interface of the ts package.
431-
func (mr *MetricsRecorder) GetTimeSeriesData() []tspb.TimeSeriesData {
442+
func (mr *MetricsRecorder) GetTimeSeriesData(childMetrics bool) []tspb.TimeSeriesData {
432443
mr.mu.RLock()
433444
defer mr.mu.RUnlock()
434445

@@ -440,6 +451,13 @@ func (mr *MetricsRecorder) GetTimeSeriesData() []tspb.TimeSeriesData {
440451
return nil
441452
}
442453

454+
if childMetrics {
455+
if !ChildMetricsStorageEnabled.Get(&mr.settings.SV) {
456+
return nil
457+
}
458+
return nil // TODO(jasonlmfong): to be implemented
459+
}
460+
443461
lastDataCount := atomic.LoadInt64(&mr.lastDataCount)
444462
data := make([]tspb.TimeSeriesData, 0, lastDataCount)
445463

pkg/server/status/recorder_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func TestMetricsRecorderLabels(t *testing.T) {
250250
},
251251
}
252252

253-
actualData := recorder.GetTimeSeriesData()
253+
actualData := recorder.GetTimeSeriesData(false)
254254

255255
// compare actual vs expected values
256256
sort.Sort(byTimeAndName(actualData))
@@ -650,7 +650,7 @@ func TestMetricsRecorder(t *testing.T) {
650650
// ========================================
651651
// Verify time series data
652652
// ========================================
653-
actual := recorder.GetTimeSeriesData()
653+
actual := recorder.GetTimeSeriesData(false)
654654

655655
// Actual comparison is simple: sort the resulting arrays by time and name,
656656
// and use reflect.DeepEqual.
@@ -725,7 +725,7 @@ func TestMetricsRecorder(t *testing.T) {
725725
t.Error(err)
726726
}
727727
_ = recorder.PrintAsText(io.Discard, expfmt.FmtText, false)
728-
_ = recorder.GetTimeSeriesData()
728+
_ = recorder.GetTimeSeriesData(false)
729729
wg.Done()
730730
}()
731731
}

pkg/ts/db.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ var (
2929
// version upgrade, the rollup threshold is used instead.
3030
deprecatedResolution10sDefaultPruneThreshold = 30 * 24 * time.Hour
3131
resolution10sDefaultRollupThreshold = 10 * 24 * time.Hour
32+
resolution1mDefaultRollupThreshold = 10 * 24 * time.Hour
3233
resolution30mDefaultPruneThreshold = 90 * 24 * time.Hour
3334
resolution50nsDefaultPruneThreshold = 1 * time.Millisecond
3435
storeDataTimeout = 1 * time.Minute
@@ -54,6 +55,17 @@ var Resolution10sStorageTTL = settings.RegisterDurationSetting(
5455
resolution10sDefaultRollupThreshold,
5556
settings.WithPublic)
5657

58+
// Resolution1mStorageTTL defines the maximum age of data that will be retained
59+
// at the 1 minute resolution. Data older than this is subject to being "rolled
60+
// up" into the 30 minute resolution and then deleted.
61+
var Resolution1mStorageTTL = settings.RegisterDurationSetting(
62+
settings.SystemVisible, // currently used in DB Console.
63+
"timeseries.storage.resolution_1m.ttl",
64+
"the maximum age of time series data stored at the 1 minute resolution. Data older than this "+
65+
"is subject to rollup and deletion.",
66+
resolution1mDefaultRollupThreshold,
67+
settings.WithPublic)
68+
5769
// Resolution30mStorageTTL defines the maximum age of data that will be
5870
// retained at the 30 minute resolution. Data older than this is subject to
5971
// deletion.
@@ -89,6 +101,7 @@ func NewDB(db *kv.DB, settings *cluster.Settings) *DB {
89101
return Resolution10sStorageTTL.Get(&settings.SV).Nanoseconds()
90102
},
91103
Resolution30m: func() int64 { return Resolution30mStorageTTL.Get(&settings.SV).Nanoseconds() },
104+
Resolution1m: func() int64 { return Resolution1mStorageTTL.Get(&settings.SV).Nanoseconds() },
92105
resolution1ns: func() int64 { return resolution1nsDefaultRollupThreshold.Nanoseconds() },
93106
resolution50ns: func() int64 { return resolution50nsDefaultPruneThreshold.Nanoseconds() },
94107
}
@@ -102,7 +115,7 @@ func NewDB(db *kv.DB, settings *cluster.Settings) *DB {
102115

103116
// A DataSource can be queried for a slice of time series data.
104117
type DataSource interface {
105-
GetTimeSeriesData() []tspb.TimeSeriesData
118+
GetTimeSeriesData(childMetrics bool) []tspb.TimeSeriesData
106119
}
107120

108121
// poller maintains information for a polling process started by PollSource().
@@ -113,6 +126,9 @@ type poller struct {
113126
frequency time.Duration
114127
r Resolution
115128
stopper *stop.Stopper
129+
// When childMetrics is set to true, the polling process calls alternate behaviour in metrics recorder,
130+
// returning child metrics to be stored.
131+
childMetrics bool
116132
}
117133

118134
// PollSource begins a Goroutine which periodically queries the supplied
@@ -125,6 +141,7 @@ func (db *DB) PollSource(
125141
frequency time.Duration,
126142
r Resolution,
127143
stopper *stop.Stopper,
144+
childMetrics bool,
128145
) (firstDone <-chan struct{}) {
129146
ambient.AddLogTag("ts-poll", nil)
130147
p := &poller{
@@ -134,6 +151,7 @@ func (db *DB) PollSource(
134151
frequency: frequency,
135152
r: r,
136153
stopper: stopper,
154+
childMetrics: childMetrics,
137155
}
138156
return p.start()
139157
}
@@ -179,7 +197,7 @@ func (p *poller) poll(ctx context.Context) {
179197
}
180198

181199
if err := p.stopper.RunTask(ctx, "ts.poller: poll", func(ctx context.Context) {
182-
data := p.source.GetTimeSeriesData()
200+
data := p.source.GetTimeSeriesData(p.childMetrics)
183201
if len(data) == 0 {
184202
return
185203
}

pkg/ts/db_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ type modelDataSource struct {
667667
// set of TimeSeriesData to subsequent calls. It stores each TimeSeriesData
668668
// object in the test model before returning it. If all TimeSeriesData objects
669669
// have been returned, this method will stop the provided Stopper.
670-
func (mds *modelDataSource) GetTimeSeriesData() []tspb.TimeSeriesData {
670+
func (mds *modelDataSource) GetTimeSeriesData(childMetrics bool) []tspb.TimeSeriesData {
671671
if len(mds.datasets) == 0 {
672672
// Stop on goroutine to prevent deadlock.
673673
go mds.once.Do(func() { mds.stopper.Stop(context.Background()) })
@@ -762,7 +762,7 @@ func TestPollSource(t *testing.T) {
762762
}
763763

764764
ambient := log.MakeTestingAmbientContext(tr)
765-
tm.DB.PollSource(ambient, &testSource, time.Millisecond, Resolution10s, testSource.stopper)
765+
tm.DB.PollSource(ambient, &testSource, time.Millisecond, Resolution10s, testSource.stopper, false)
766766
<-testSource.stopper.IsStopped()
767767
if a, e := testSource.calledCount, 2; a != e {
768768
t.Errorf("testSource was called %d times, expected %d", a, e)
@@ -815,7 +815,7 @@ func TestDisableStorage(t *testing.T) {
815815
}
816816

817817
ambient := log.MakeTestingAmbientCtxWithNewTracer()
818-
tm.DB.PollSource(ambient, &testSource, time.Millisecond, Resolution10s, testSource.stopper)
818+
tm.DB.PollSource(ambient, &testSource, time.Millisecond, Resolution10s, testSource.stopper, false)
819819
select {
820820
case <-testSource.stopper.IsStopped():
821821
t.Error("testSource data exhausted when polling should have been enabled")

pkg/ts/keys_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,30 @@ func TestDataKeys(t *testing.T) {
4848
12,
4949
"/System/tsd//10s/1924-09-18T08:00:00Z/",
5050
},
51+
{
52+
"test.metric",
53+
"testsource",
54+
0,
55+
Resolution1m,
56+
30,
57+
"/System/tsd/test.metric/1m/1970-01-01T00:00:00Z/testsource",
58+
},
59+
{
60+
"test.no.source",
61+
"",
62+
1429114700000000000,
63+
Resolution1m,
64+
26,
65+
"/System/tsd/test.no.source/1m/2015-04-15T16:00:00Z/",
66+
},
67+
{
68+
"",
69+
"",
70+
-1429114700000000000,
71+
Resolution1m,
72+
12,
73+
"/System/tsd//1m/1924-09-18T08:00:00Z/",
74+
},
5175
}
5276

5377
for i, tc := range testCases {

pkg/ts/memory_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,94 @@ func TestGetMaxTimespan(t *testing.T) {
146146
math.MaxInt64,
147147
"",
148148
},
149+
// Resolution1m - room for exactly one hour of query
150+
{
151+
Resolution1m,
152+
QueryMemoryOptions{
153+
BudgetBytes: 2 * (sizeOfTimeSeriesData + sizeOfSample*60),
154+
EstimatedSources: 1,
155+
InterpolationLimitNanos: 0,
156+
},
157+
(1 * time.Hour).Nanoseconds(),
158+
"",
159+
},
160+
// Resolution1m - insufficient memory
161+
{
162+
Resolution1m,
163+
QueryMemoryOptions{
164+
BudgetBytes: sizeOfTimeSeriesData + sizeOfSample*60,
165+
EstimatedSources: 1,
166+
InterpolationLimitNanos: 0,
167+
},
168+
0,
169+
"insufficient",
170+
},
171+
// Resolution1m - 3 sources, room for 1 hour.
172+
{
173+
Resolution1m,
174+
QueryMemoryOptions{
175+
BudgetBytes: 6 * (sizeOfTimeSeriesData + sizeOfSample*60),
176+
EstimatedSources: 3,
177+
InterpolationLimitNanos: 0,
178+
},
179+
(1 * time.Hour).Nanoseconds(),
180+
"",
181+
},
182+
// Resolution1m - 3 sources, room for 2 hours.
183+
{
184+
Resolution1m,
185+
QueryMemoryOptions{
186+
BudgetBytes: 9 * (sizeOfTimeSeriesData + sizeOfSample*60),
187+
EstimatedSources: 3,
188+
InterpolationLimitNanos: 0,
189+
},
190+
(2 * time.Hour).Nanoseconds(),
191+
"",
192+
},
193+
// Not enough room due to interpolation buffer.
194+
{
195+
Resolution1m,
196+
QueryMemoryOptions{
197+
BudgetBytes: 6 * (sizeOfTimeSeriesData + sizeOfSample*60),
198+
EstimatedSources: 3,
199+
InterpolationLimitNanos: 1,
200+
},
201+
0,
202+
"insufficient",
203+
},
204+
// Sufficient room even with interpolation buffer.
205+
{
206+
Resolution1m,
207+
QueryMemoryOptions{
208+
BudgetBytes: 9 * (sizeOfTimeSeriesData + sizeOfSample*60),
209+
EstimatedSources: 3,
210+
InterpolationLimitNanos: 1,
211+
},
212+
(1 * time.Hour).Nanoseconds(),
213+
"",
214+
},
215+
// Insufficient room for interpolation buffer (due to straddling)
216+
{
217+
Resolution1m,
218+
QueryMemoryOptions{
219+
BudgetBytes: 9 * (sizeOfTimeSeriesData + sizeOfSample*60),
220+
EstimatedSources: 3,
221+
InterpolationLimitNanos: int64(float64(Resolution1m.SlabDuration()) * 0.75),
222+
},
223+
0,
224+
"insufficient",
225+
},
226+
// Sufficient room even with interpolation buffer.
227+
{
228+
Resolution1m,
229+
QueryMemoryOptions{
230+
BudgetBytes: 12 * (sizeOfTimeSeriesData + sizeOfSample*60),
231+
EstimatedSources: 3,
232+
InterpolationLimitNanos: int64(float64(Resolution1m.SlabDuration()) * 0.75),
233+
},
234+
(1 * time.Hour).Nanoseconds(),
235+
"",
236+
},
149237
} {
150238
t.Run("", func(t *testing.T) {
151239
mem := QueryMemoryContext{

0 commit comments

Comments
 (0)