Skip to content

Commit 00a438c

Browse files
authored
Integration test for native histograms (#6037)
* integration test for native histograms Signed-off-by: Ben Ye <[email protected]> * lint Signed-off-by: Ben Ye <[email protected]> * add nil check for histogram sample Signed-off-by: Ben Ye <[email protected]> * add test coverage for range query API Signed-off-by: Ben Ye <[email protected]> * check each histogram Signed-off-by: Ben Ye <[email protected]> --------- Signed-off-by: Ben Ye <[email protected]>
1 parent adcc4d5 commit 00a438c

File tree

2 files changed

+185
-0
lines changed

2 files changed

+185
-0
lines changed

integration/e2e/util.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,17 @@ import (
1515
"github.com/oklog/ulid"
1616
"github.com/pkg/errors"
1717
"github.com/prometheus/common/model"
18+
"github.com/prometheus/prometheus/model/histogram"
1819
"github.com/prometheus/prometheus/model/labels"
1920
"github.com/prometheus/prometheus/prompb"
2021
"github.com/prometheus/prometheus/storage"
22+
"github.com/prometheus/prometheus/storage/remote"
2123
"github.com/prometheus/prometheus/tsdb"
2224
"github.com/thanos-io/thanos/pkg/block/metadata"
2325
"github.com/thanos-io/thanos/pkg/runutil"
2426

2527
cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb"
28+
histogram_util "github.com/cortexproject/cortex/pkg/util/histogram"
2629
)
2730

2831
func RunCommandAndGetOutput(name string, args ...string) ([]byte, error) {
@@ -147,6 +150,46 @@ func GenerateSeries(name string, ts time.Time, additionalLabels ...prompb.Label)
147150
return
148151
}
149152

153+
func GenerateHistogramSeries(name string, ts time.Time, floatHistogram bool, additionalLabels ...prompb.Label) (series []prompb.TimeSeries) {
154+
tsMillis := TimeToMilliseconds(ts)
155+
i := rand.Uint32()
156+
157+
lbls := append(
158+
[]prompb.Label{
159+
{Name: labels.MetricName, Value: name},
160+
},
161+
additionalLabels...,
162+
)
163+
164+
// Generate the expected vector when querying it
165+
metric := model.Metric{}
166+
metric[labels.MetricName] = model.LabelValue(name)
167+
for _, lbl := range additionalLabels {
168+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
169+
}
170+
171+
var (
172+
h *histogram.Histogram
173+
fh *histogram.FloatHistogram
174+
ph prompb.Histogram
175+
)
176+
if floatHistogram {
177+
fh = histogram_util.GenerateTestFloatHistogram(int(i))
178+
ph = remote.FloatHistogramToHistogramProto(tsMillis, fh)
179+
} else {
180+
h = histogram_util.GenerateTestHistogram(int(i))
181+
ph = remote.HistogramToHistogramProto(tsMillis, h)
182+
}
183+
184+
// Generate the series
185+
series = append(series, prompb.TimeSeries{
186+
Labels: lbls,
187+
Histograms: []prompb.Histogram{ph},
188+
})
189+
190+
return
191+
}
192+
150193
func GenerateSeriesWithSamples(
151194
name string,
152195
startTime time.Time,

integration/native_histogram_test.go

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//go:build requires_docker
2+
// +build requires_docker
3+
4+
package integration
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/prometheus/common/model"
11+
"github.com/prometheus/prometheus/prompb"
12+
"github.com/stretchr/testify/require"
13+
14+
"github.com/cortexproject/cortex/integration/e2e"
15+
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
16+
"github.com/cortexproject/cortex/integration/e2ecortex"
17+
)
18+
19+
func TestNativeHistogramIngestionAndQuery(t *testing.T) {
20+
const blockRangePeriod = 5 * time.Second
21+
22+
s, err := e2e.NewScenario(networkName)
23+
require.NoError(t, err)
24+
defer s.Close()
25+
26+
// Configure the blocks storage to frequently compact TSDB head
27+
// and ship blocks to the storage.
28+
flags := mergeFlags(BlocksStorageFlags(), map[string]string{
29+
"-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(),
30+
"-blocks-storage.tsdb.ship-interval": "1s",
31+
"-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(),
32+
"-blocks-storage.tsdb.enable-native-histograms": "true",
33+
})
34+
35+
// Start dependencies.
36+
consul := e2edb.NewConsul()
37+
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
38+
require.NoError(t, s.StartAndWaitReady(consul, minio))
39+
40+
// Start Cortex components for the write path.
41+
distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
42+
ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "")
43+
require.NoError(t, s.StartAndWaitReady(distributor, ingester))
44+
45+
// Wait until the distributor has updated the ring.
46+
require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
47+
48+
// Push some series to Cortex.
49+
c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), "", "", "", "user-1")
50+
require.NoError(t, err)
51+
52+
seriesTimestamp := time.Now()
53+
series2Timestamp := seriesTimestamp.Add(blockRangePeriod * 2)
54+
series1 := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
55+
series1Float := e2e.GenerateHistogramSeries("series_1", seriesTimestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
56+
res, err := c.Push(append(series1, series1Float...))
57+
require.NoError(t, err)
58+
require.Equal(t, 200, res.StatusCode)
59+
60+
series2 := e2e.GenerateHistogramSeries("series_2", series2Timestamp, false, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "false"})
61+
series2Float := e2e.GenerateHistogramSeries("series_2", series2Timestamp, true, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "float", Value: "true"})
62+
res, err = c.Push(append(series2, series2Float...))
63+
require.NoError(t, err)
64+
require.Equal(t, 200, res.StatusCode)
65+
66+
// Wait until the TSDB head is compacted and shipped to the storage.
67+
// The shipped block contains the 2 series from `series_1` and `series_2` will be in head.
68+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total"))
69+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(4), "cortex_ingester_memory_series_created_total"))
70+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total"))
71+
require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series"))
72+
73+
queryFrontend := e2ecortex.NewQueryFrontendWithConfigFile("query-frontend", "", flags, "")
74+
require.NoError(t, s.Start(queryFrontend))
75+
76+
// Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
77+
storeGateway := e2ecortex.NewStoreGateway("store-gateway", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
78+
"-blocks-storage.bucket-store.sync-interval": "5s",
79+
}), "")
80+
querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), mergeFlags(flags, map[string]string{
81+
"-blocks-storage.bucket-store.sync-interval": "1s",
82+
"-querier.frontend-address": queryFrontend.NetworkGRPCEndpoint(),
83+
}), "")
84+
require.NoError(t, s.StartAndWaitReady(querier, storeGateway))
85+
86+
// Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
87+
require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512*2), "cortex_ring_tokens_total"))
88+
require.NoError(t, storeGateway.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total"))
89+
require.NoError(t, querier.WaitSumMetricsWithOptions(e2e.GreaterOrEqual(4), []string{"cortex_querier_blocks_scan_duration_seconds"}, e2e.WithMetricCount))
90+
91+
// Sleep 3 * bucket sync interval to make sure consistency checker
92+
// doesn't consider block is uploaded recently.
93+
time.Sleep(3 * time.Second)
94+
95+
// Query back the series.
96+
c, err = e2ecortex.NewClient("", queryFrontend.HTTPEndpoint(), "", "", "user-1")
97+
require.NoError(t, err)
98+
99+
result, err := c.QueryRange(`series_1`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second)
100+
require.NoError(t, err)
101+
require.Equal(t, model.ValMatrix, result.Type())
102+
m := result.(model.Matrix)
103+
require.Equal(t, 2, m.Len())
104+
for _, ss := range m {
105+
require.Empty(t, ss.Values)
106+
require.NotEmpty(t, ss.Histograms)
107+
for _, h := range ss.Histograms {
108+
require.NotEmpty(t, h)
109+
}
110+
}
111+
112+
result, err = c.QueryRange(`series_2`, series2Timestamp.Add(-time.Minute*10), series2Timestamp, time.Second)
113+
require.NoError(t, err)
114+
require.Equal(t, model.ValMatrix, result.Type())
115+
m = result.(model.Matrix)
116+
require.Equal(t, 2, m.Len())
117+
for _, ss := range m {
118+
require.Empty(t, ss.Values)
119+
require.NotEmpty(t, ss.Histograms)
120+
for _, h := range ss.Histograms {
121+
require.NotEmpty(t, h)
122+
}
123+
}
124+
125+
result, err = c.Query(`series_1`, series2Timestamp)
126+
require.NoError(t, err)
127+
require.Equal(t, model.ValVector, result.Type())
128+
v := result.(model.Vector)
129+
require.Equal(t, 2, v.Len())
130+
for _, s := range v {
131+
require.NotNil(t, s.Histogram)
132+
}
133+
134+
result, err = c.Query(`series_2`, series2Timestamp)
135+
require.NoError(t, err)
136+
require.Equal(t, model.ValVector, result.Type())
137+
v = result.(model.Vector)
138+
require.Equal(t, 2, v.Len())
139+
for _, s := range v {
140+
require.NotNil(t, s.Histogram)
141+
}
142+
}

0 commit comments

Comments
 (0)