44package integration
55
66import (
7+ "fmt"
78 "math/rand"
89 "testing"
910 "time"
@@ -21,136 +22,149 @@ import (
2122func TestNativeHistogramIngestionAndQuery (t * testing.T ) {
2223 const blockRangePeriod = 5 * time .Second
2324
24- s , err := e2e .NewScenario (networkName )
25- require .NoError (t , err )
26- defer s .Close ()
27-
28- // Configure the blocks storage to frequently compact TSDB head
29- // and ship blocks to the storage.
30- flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
31- "-blocks-storage.tsdb.block-ranges-period" : blockRangePeriod .String (),
32- "-blocks-storage.tsdb.ship-interval" : "1s" ,
33- "-blocks-storage.tsdb.retention-period" : ((blockRangePeriod * 2 ) - 1 ).String (),
34- "-blocks-storage.tsdb.enable-native-histograms" : "true" ,
35- })
36-
37- // Start dependencies.
38- consul := e2edb .NewConsul ()
39- minio := e2edb .NewMinio (9000 , flags ["-blocks-storage.s3.bucket-name" ])
40- require .NoError (t , s .StartAndWaitReady (consul , minio ))
41-
42- // Start Cortex components for the write path.
43- distributor := e2ecortex .NewDistributor ("distributor" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
44- ingester := e2ecortex .NewIngester ("ingester" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
45- require .NoError (t , s .StartAndWaitReady (distributor , ingester ))
46-
47- // Wait until the distributor has updated the ring.
48- require .NoError (t , distributor .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
49-
50- // Push some series to Cortex.
51- c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), "" , "" , "" , "user-1" )
52- require .NoError (t , err )
53-
54- seriesTimestamp := time .Now ()
55- series2Timestamp := seriesTimestamp .Add (blockRangePeriod * 2 )
56- histogramIdx1 := rand .Uint32 ()
57- series1 := e2e .GenerateHistogramSeries ("series_1" , seriesTimestamp , histogramIdx1 , false , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "false" })
58- series1Float := e2e .GenerateHistogramSeries ("series_1" , seriesTimestamp , histogramIdx1 , true , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "true" })
59- res , err := c .Push (append (series1 , series1Float ... ))
60- require .NoError (t , err )
61- require .Equal (t , 200 , res .StatusCode )
62-
63- histogramIdx2 := rand .Uint32 ()
64- series2 := e2e .GenerateHistogramSeries ("series_2" , series2Timestamp , histogramIdx2 , false , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "false" })
65- series2Float := e2e .GenerateHistogramSeries ("series_2" , series2Timestamp , histogramIdx2 , true , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "true" })
66- res , err = c .Push (append (series2 , series2Float ... ))
67- require .NoError (t , err )
68- require .Equal (t , 200 , res .StatusCode )
69-
70- // Wait until the TSDB head is compacted and shipped to the storage.
71- // The shipped block contains the 2 series from `series_1` and `series_2` will be in head.
72- require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (1 ), "cortex_ingester_shipper_uploads_total" ))
73- require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (4 ), "cortex_ingester_memory_series_created_total" ))
74- require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (2 ), "cortex_ingester_memory_series_removed_total" ))
75- require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (2 ), "cortex_ingester_memory_series" ))
76-
77- queryFrontend := e2ecortex .NewQueryFrontendWithConfigFile ("query-frontend" , "" , flags , "" )
78- require .NoError (t , s .Start (queryFrontend ))
79-
80- // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
81- storeGateway := e2ecortex .NewStoreGateway ("store-gateway" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), mergeFlags (flags , map [string ]string {
82- "-blocks-storage.bucket-store.sync-interval" : "5s" ,
83- }), "" )
84- querier := e2ecortex .NewQuerier ("querier" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), mergeFlags (flags , map [string ]string {
85- "-blocks-storage.bucket-store.sync-interval" : "1s" ,
86- "-querier.frontend-address" : queryFrontend .NetworkGRPCEndpoint (),
87- }), "" )
88- require .NoError (t , s .StartAndWaitReady (querier , storeGateway ))
89-
90- // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
91- require .NoError (t , querier .WaitSumMetrics (e2e .Equals (512 * 2 ), "cortex_ring_tokens_total" ))
92- require .NoError (t , storeGateway .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
93- require .NoError (t , querier .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (4 ), []string {"cortex_querier_blocks_scan_duration_seconds" }, e2e .WithMetricCount ))
94-
95- // Sleep 3 * bucket sync interval to make sure consistency checker
96- // doesn't consider block is uploaded recently.
97- time .Sleep (3 * time .Second )
98-
99- // Query back the series.
100- c , err = e2ecortex .NewClient ("" , queryFrontend .HTTPEndpoint (), "" , "" , "user-1" )
101- require .NoError (t , err )
102-
103- expectedHistogram1 := tsdbutil .GenerateTestHistogram (int (histogramIdx1 ))
104- expectedHistogram2 := tsdbutil .GenerateTestHistogram (int (histogramIdx2 ))
105- result , err := c .QueryRange (`series_1` , series2Timestamp .Add (- time .Minute * 10 ), series2Timestamp , time .Second )
106- require .NoError (t , err )
107- require .Equal (t , model .ValMatrix , result .Type ())
108- m := result .(model.Matrix )
109- require .Equal (t , 2 , m .Len ())
110- for _ , ss := range m {
111- require .Empty (t , ss .Values )
112- require .NotEmpty (t , ss .Histograms )
113- for _ , h := range ss .Histograms {
114- require .NotEmpty (t , h )
115- require .Equal (t , float64 (expectedHistogram1 .Count ), float64 (h .Histogram .Count ))
116- require .Equal (t , float64 (expectedHistogram1 .Sum ), float64 (h .Histogram .Sum ))
117- }
25+ configs := []map [string ]string {
26+ {
27+ "-api.querier-default-codec" : "json" ,
28+ },
29+ {
30+ "-api.querier-default-codec" : "protobuf" ,
31+ },
11832 }
11933
120- result , err = c .QueryRange (`series_2` , series2Timestamp .Add (- time .Minute * 10 ), series2Timestamp , time .Second )
121- require .NoError (t , err )
122- require .Equal (t , model .ValMatrix , result .Type ())
123- m = result .(model.Matrix )
124- require .Equal (t , 2 , m .Len ())
125- for _ , ss := range m {
126- require .Empty (t , ss .Values )
127- require .NotEmpty (t , ss .Histograms )
128- for _ , h := range ss .Histograms {
129- require .NotEmpty (t , h )
130- require .Equal (t , float64 (expectedHistogram2 .Count ), float64 (h .Histogram .Count ))
131- require .Equal (t , float64 (expectedHistogram2 .Sum ), float64 (h .Histogram .Sum ))
132- }
133- }
134-
135- result , err = c .Query (`series_1` , series2Timestamp )
136- require .NoError (t , err )
137- require .Equal (t , model .ValVector , result .Type ())
138- v := result .(model.Vector )
139- require .Equal (t , 2 , v .Len ())
140- for _ , s := range v {
141- require .NotNil (t , s .Histogram )
142- require .Equal (t , float64 (expectedHistogram1 .Count ), float64 (s .Histogram .Count ))
143- require .Equal (t , float64 (expectedHistogram1 .Sum ), float64 (s .Histogram .Sum ))
144- }
145-
146- result , err = c .Query (`series_2` , series2Timestamp )
147- require .NoError (t , err )
148- require .Equal (t , model .ValVector , result .Type ())
149- v = result .(model.Vector )
150- require .Equal (t , 2 , v .Len ())
151- for _ , s := range v {
152- require .NotNil (t , s .Histogram )
153- require .Equal (t , float64 (expectedHistogram2 .Count ), float64 (s .Histogram .Count ))
154- require .Equal (t , float64 (expectedHistogram2 .Sum ), float64 (s .Histogram .Sum ))
34+ for _ , config := range configs {
35+ t .Run (fmt .Sprintf ("native histograms with %s codec" , config ["-api.querier-default-codec" ]), func (t * testing.T ) {
36+ s , err := e2e .NewScenario (networkName )
37+ require .NoError (t , err )
38+ defer s .Close ()
39+
40+ // Configure the blocks storage to frequently compact TSDB head
41+ // and ship blocks to the storage.
42+ flags := mergeFlags (BlocksStorageFlags (), map [string ]string {
43+ "-blocks-storage.tsdb.block-ranges-period" : blockRangePeriod .String (),
44+ "-blocks-storage.tsdb.ship-interval" : "1s" ,
45+ "-blocks-storage.tsdb.retention-period" : ((blockRangePeriod * 2 ) - 1 ).String (),
46+ "-blocks-storage.tsdb.enable-native-histograms" : "true" ,
47+ })
48+
49+ // Start dependencies.
50+ consul := e2edb .NewConsul ()
51+ minio := e2edb .NewMinio (9000 , flags ["-blocks-storage.s3.bucket-name" ])
52+ require .NoError (t , s .StartAndWaitReady (consul , minio ))
53+
54+ // Start Cortex components for the write path.
55+ distributor := e2ecortex .NewDistributor ("distributor" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
56+ ingester := e2ecortex .NewIngester ("ingester" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), flags , "" )
57+ require .NoError (t , s .StartAndWaitReady (distributor , ingester ))
58+
59+ // Wait until the distributor has updated the ring.
60+ require .NoError (t , distributor .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
61+
62+ // Push some series to Cortex.
63+ c , err := e2ecortex .NewClient (distributor .HTTPEndpoint (), "" , "" , "" , "user-1" )
64+ require .NoError (t , err )
65+
66+ seriesTimestamp := time .Now ()
67+ series2Timestamp := seriesTimestamp .Add (blockRangePeriod * 2 )
68+ histogramIdx1 := rand .Uint32 ()
69+ series1 := e2e .GenerateHistogramSeries ("series_1" , seriesTimestamp , histogramIdx1 , false , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "false" })
70+ series1Float := e2e .GenerateHistogramSeries ("series_1" , seriesTimestamp , histogramIdx1 , true , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "true" })
71+ res , err := c .Push (append (series1 , series1Float ... ))
72+ require .NoError (t , err )
73+ require .Equal (t , 200 , res .StatusCode )
74+
75+ histogramIdx2 := rand .Uint32 ()
76+ series2 := e2e .GenerateHistogramSeries ("series_2" , series2Timestamp , histogramIdx2 , false , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "false" })
77+ series2Float := e2e .GenerateHistogramSeries ("series_2" , series2Timestamp , histogramIdx2 , true , prompb.Label {Name : "job" , Value : "test" }, prompb.Label {Name : "float" , Value : "true" })
78+ res , err = c .Push (append (series2 , series2Float ... ))
79+ require .NoError (t , err )
80+ require .Equal (t , 200 , res .StatusCode )
81+
82+ // Wait until the TSDB head is compacted and shipped to the storage.
83+ // The shipped block contains the 2 series from `series_1` and `series_2` will be in head.
84+ require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (1 ), "cortex_ingester_shipper_uploads_total" ))
85+ require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (4 ), "cortex_ingester_memory_series_created_total" ))
86+ require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (2 ), "cortex_ingester_memory_series_removed_total" ))
87+ require .NoError (t , ingester .WaitSumMetrics (e2e .Equals (2 ), "cortex_ingester_memory_series" ))
88+
89+ queryFrontend := e2ecortex .NewQueryFrontendWithConfigFile ("query-frontend" , "" , mergeFlags (flags , config ), "" )
90+ require .NoError (t , s .Start (queryFrontend ))
91+
92+ // Start the querier and store-gateway, and configure them to frequently sync blocks fast enough to trigger consistency check.
93+ storeGateway := e2ecortex .NewStoreGateway ("store-gateway" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), mergeFlags (flags , map [string ]string {
94+ "-blocks-storage.bucket-store.sync-interval" : "5s" ,
95+ }), "" )
96+ querier := e2ecortex .NewQuerier ("querier" , e2ecortex .RingStoreConsul , consul .NetworkHTTPEndpoint (), mergeFlags (flags , map [string ]string {
97+ "-blocks-storage.bucket-store.sync-interval" : "1s" ,
98+ "-querier.frontend-address" : queryFrontend .NetworkGRPCEndpoint (),
99+ }), "" )
100+ require .NoError (t , s .StartAndWaitReady (querier , storeGateway ))
101+
102+ // Wait until the querier and store-gateway have updated the ring, and wait until the blocks are old enough for consistency check
103+ require .NoError (t , querier .WaitSumMetrics (e2e .Equals (512 * 2 ), "cortex_ring_tokens_total" ))
104+ require .NoError (t , storeGateway .WaitSumMetrics (e2e .Equals (512 ), "cortex_ring_tokens_total" ))
105+ require .NoError (t , querier .WaitSumMetricsWithOptions (e2e .GreaterOrEqual (4 ), []string {"cortex_querier_blocks_scan_duration_seconds" }, e2e .WithMetricCount ))
106+
107+ // Sleep 3 * bucket sync interval to make sure consistency checker
108+ // doesn't consider block is uploaded recently.
109+ time .Sleep (3 * time .Second )
110+
111+ // Query back the series.
112+ c , err = e2ecortex .NewClient ("" , queryFrontend .HTTPEndpoint (), "" , "" , "user-1" )
113+ require .NoError (t , err )
114+
115+ expectedHistogram1 := tsdbutil .GenerateTestHistogram (int (histogramIdx1 ))
116+ expectedHistogram2 := tsdbutil .GenerateTestHistogram (int (histogramIdx2 ))
117+ result , err := c .QueryRange (`series_1` , series2Timestamp .Add (- time .Minute * 10 ), series2Timestamp , time .Second )
118+ require .NoError (t , err )
119+ require .Equal (t , model .ValMatrix , result .Type ())
120+ m := result .(model.Matrix )
121+ require .Equal (t , 2 , m .Len ())
122+ for _ , ss := range m {
123+ require .Empty (t , ss .Values )
124+ require .NotEmpty (t , ss .Histograms )
125+ for _ , h := range ss .Histograms {
126+ require .NotEmpty (t , h )
127+ require .Equal (t , float64 (expectedHistogram1 .Count ), float64 (h .Histogram .Count ))
128+ require .Equal (t , float64 (expectedHistogram1 .Sum ), float64 (h .Histogram .Sum ))
129+ }
130+ }
131+
132+ result , err = c .QueryRange (`series_2` , series2Timestamp .Add (- time .Minute * 10 ), series2Timestamp , time .Second )
133+ require .NoError (t , err )
134+ require .Equal (t , model .ValMatrix , result .Type ())
135+ m = result .(model.Matrix )
136+ require .Equal (t , 2 , m .Len ())
137+ for _ , ss := range m {
138+ require .Empty (t , ss .Values )
139+ require .NotEmpty (t , ss .Histograms )
140+ for _ , h := range ss .Histograms {
141+ require .NotEmpty (t , h )
142+ require .Equal (t , float64 (expectedHistogram2 .Count ), float64 (h .Histogram .Count ))
143+ require .Equal (t , float64 (expectedHistogram2 .Sum ), float64 (h .Histogram .Sum ))
144+ }
145+ }
146+
147+ result , err = c .Query (`series_1` , series2Timestamp )
148+ require .NoError (t , err )
149+ require .Equal (t , model .ValVector , result .Type ())
150+ v := result .(model.Vector )
151+ require .Equal (t , 2 , v .Len ())
152+ for _ , s := range v {
153+ require .NotNil (t , s .Histogram )
154+ require .Equal (t , float64 (expectedHistogram1 .Count ), float64 (s .Histogram .Count ))
155+ require .Equal (t , float64 (expectedHistogram1 .Sum ), float64 (s .Histogram .Sum ))
156+ }
157+
158+ result , err = c .Query (`series_2` , series2Timestamp )
159+ require .NoError (t , err )
160+ require .Equal (t , model .ValVector , result .Type ())
161+ v = result .(model.Vector )
162+ require .Equal (t , 2 , v .Len ())
163+ for _ , s := range v {
164+ require .NotNil (t , s .Histogram )
165+ require .Equal (t , float64 (expectedHistogram2 .Count ), float64 (s .Histogram .Count ))
166+ require .Equal (t , float64 (expectedHistogram2 .Sum ), float64 (s .Histogram .Sum ))
167+ }
168+ })
155169 }
156170}
0 commit comments