Skip to content

Commit a8d1fbf

Browse files
authored
Support remote write v2 by converting request (#6330)
* Support remote write v2 by converting request Signed-off-by: SungJin1212 <[email protected]> * Change to not break exist behavior Signed-off-by: SungJin1212 <[email protected]> * Add benchmarks Signed-off-by: SungJin1212 <[email protected]> * rebase from stream connection Signed-off-by: SungJin1212 <[email protected]> * Change to expose header at Distributor side Signed-off-by: SungJin1212 <[email protected]> * get rebase Signed-off-by: SungJin1212 <[email protected]> * Change distributor.remote-write2-enabled naming Signed-off-by: SungJin1212 <[email protected]> --------- Signed-off-by: SungJin1212 <[email protected]>
1 parent cc96eac commit a8d1fbf

File tree

14 files changed

+1563
-118
lines changed

14 files changed

+1563
-118
lines changed

.github/workflows/test-build-deploy.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ jobs:
162162
- integration_querier
163163
- integration_ruler
164164
- integration_query_fuzz
165+
- integration_remote_write_v2
165166
steps:
166167
- name: Upgrade golang
167168
uses: actions/setup-go@d35c59abb061a4a6fb18e82ac0862c26744d6ab5 # v5.5.0

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
* [FEATURE] Querier/Ruler: Add `query_partial_data` and `rules_partial_data` limits to allow queries/rules to be evaluated with data from a single zone, if other zones are not available. #6526
1010
* [FEATURE] Update prometheus alertmanager version to v0.28.0 and add new integration msteamsv2, jira, and rocketchat. #6590
1111
* [FEATURE] Ingester/StoreGateway: Add `ResourceMonitor` module in Cortex, and add `ResourceBasedLimiter` in Ingesters and StoreGateways. #6674
12+
* [FEATURE] Support Prometheus remote write 2.0. #6330
1213
* [FEATURE] Ingester: Support out-of-order native histogram ingestion. It automatically enabled when `-ingester.out-of-order-time-window > 0` and `-blocks-storage.tsdb.enable-native-histograms=true`. #6626 #6663
1314
* [FEATURE] Ruler: Add support for percentage based sharding for rulers. #6680
1415
* [FEATURE] Ruler: Add support for group labels. #6665

docs/configuration/config-file-reference.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3140,6 +3140,11 @@ ha_tracker:
31403140
# CLI flag: -distributor.use-stream-push
31413141
[use_stream_push: <boolean> | default = false]
31423142
3143+
# EXPERIMENTAL: If true, accept prometheus remote write v2 protocol push
3144+
# request.
3145+
# CLI flag: -distributor.remote-writev2-enabled
3146+
[remote_write2_enabled: <boolean> | default = false]
3147+
31433148
ring:
31443149
kvstore:
31453150
# Backend storage to use for the ring. Supported values are: consul, etcd,

docs/configuration/v1-guarantees.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ Currently experimental features are:
5959
- Distributor:
6060
- Do not extend writes on unhealthy ingesters (`-distributor.extend-writes=false`)
6161
- Accept multiple HA pairs in the same request (enabled via `-experimental.distributor.ha-tracker.mixed-ha-samples=true`)
62+
- Accept Prometheus remote write 2.0 request (`-distributor.remote-writev2-enabled=true`)
6263
- Tenant Deletion in Purger, for blocks storage.
6364
- Query-frontend: query stats tracking (`-frontend.query-stats-enabled`)
6465
- Blocks storage bucket index

integration/e2e/util.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"github.com/prometheus/prometheus/model/histogram"
2020
"github.com/prometheus/prometheus/model/labels"
2121
"github.com/prometheus/prometheus/prompb"
22+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2223
"github.com/prometheus/prometheus/storage"
2324
"github.com/prometheus/prometheus/tsdb"
2425
"github.com/prometheus/prometheus/tsdb/tsdbutil"
@@ -423,3 +424,117 @@ func CreateBlock(
423424

424425
return id, nil
425426
}
427+
428+
func GenerateHistogramSeriesV2(name string, ts time.Time, i uint32, floatHistogram bool, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries) {
429+
tsMillis := TimeToMilliseconds(ts)
430+
431+
st := writev2.NewSymbolTable()
432+
lb := labels.NewScratchBuilder(0)
433+
lb.Add("__name__", name)
434+
for _, lbl := range additionalLabels {
435+
lb.Add(lbl.Name, lbl.Value)
436+
}
437+
438+
var (
439+
h *histogram.Histogram
440+
fh *histogram.FloatHistogram
441+
ph writev2.Histogram
442+
)
443+
if floatHistogram {
444+
fh = tsdbutil.GenerateTestFloatHistogram(int64(i))
445+
ph = writev2.FromFloatHistogram(tsMillis, fh)
446+
} else {
447+
h = tsdbutil.GenerateTestHistogram(int64(i))
448+
ph = writev2.FromIntHistogram(tsMillis, h)
449+
}
450+
451+
// Generate the series
452+
series = append(series, writev2.TimeSeries{
453+
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
454+
Histograms: []writev2.Histogram{ph},
455+
})
456+
457+
symbols = st.Symbols()
458+
459+
return
460+
}
461+
462+
func GenerateSeriesV2(name string, ts time.Time, additionalLabels ...prompb.Label) (symbols []string, series []writev2.TimeSeries, vector model.Vector) {
463+
tsMillis := TimeToMilliseconds(ts)
464+
value := rand.Float64()
465+
466+
st := writev2.NewSymbolTable()
467+
lb := labels.NewScratchBuilder(0)
468+
lb.Add("__name__", name)
469+
470+
for _, label := range additionalLabels {
471+
lb.Add(label.Name, label.Value)
472+
}
473+
series = append(series, writev2.TimeSeries{
474+
// Generate the series
475+
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
476+
Samples: []writev2.Sample{
477+
{Value: value, Timestamp: tsMillis},
478+
},
479+
Metadata: writev2.Metadata{
480+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
481+
},
482+
})
483+
symbols = st.Symbols()
484+
485+
// Generate the expected vector when querying it
486+
metric := model.Metric{}
487+
metric[labels.MetricName] = model.LabelValue(name)
488+
for _, lbl := range additionalLabels {
489+
metric[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
490+
}
491+
492+
vector = append(vector, &model.Sample{
493+
Metric: metric,
494+
Value: model.SampleValue(value),
495+
Timestamp: model.Time(tsMillis),
496+
})
497+
498+
return
499+
}
500+
501+
func GenerateV2SeriesWithSamples(
502+
name string,
503+
startTime time.Time,
504+
scrapeInterval time.Duration,
505+
startValue int,
506+
numSamples int,
507+
additionalLabels ...prompb.Label,
508+
) (symbols []string, series writev2.TimeSeries) {
509+
tsMillis := TimeToMilliseconds(startTime)
510+
durMillis := scrapeInterval.Milliseconds()
511+
512+
st := writev2.NewSymbolTable()
513+
lb := labels.NewScratchBuilder(0)
514+
lb.Add("__name__", name)
515+
516+
for _, label := range additionalLabels {
517+
lb.Add(label.Name, label.Value)
518+
}
519+
520+
startTMillis := tsMillis
521+
samples := make([]writev2.Sample, numSamples)
522+
for i := 0; i < numSamples; i++ {
523+
scrapeJitter := rand.Int63n(10) + 1 // add a jitter to simulate real-world scenarios, refer to: https://github.com/prometheus/prometheus/issues/13213
524+
samples[i] = writev2.Sample{
525+
Timestamp: startTMillis + scrapeJitter,
526+
Value: float64(i + startValue),
527+
}
528+
startTMillis += durMillis
529+
}
530+
531+
series = writev2.TimeSeries{
532+
LabelsRefs: st.SymbolizeLabels(lb.Labels(), nil),
533+
Samples: samples,
534+
Metadata: writev2.Metadata{
535+
Type: writev2.Metadata_METRIC_TYPE_GAUGE,
536+
},
537+
}
538+
539+
return st.Symbols(), series
540+
}

integration/e2ecortex/client.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"github.com/prometheus/prometheus/model/labels"
2525
"github.com/prometheus/prometheus/model/rulefmt"
2626
"github.com/prometheus/prometheus/prompb"
27+
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
2728
"github.com/prometheus/prometheus/storage"
2829
"github.com/prometheus/prometheus/storage/remote"
2930
yaml "gopkg.in/yaml.v3"
@@ -147,6 +148,39 @@ func (c *Client) Push(timeseries []prompb.TimeSeries, metadata ...prompb.MetricM
147148
return res, nil
148149
}
149150

151+
// PushV2 the input timeseries to the remote endpoint
152+
func (c *Client) PushV2(symbols []string, timeseries []writev2.TimeSeries) (*http.Response, error) {
153+
// Create write request
154+
data, err := proto.Marshal(&writev2.Request{Symbols: symbols, Timeseries: timeseries})
155+
if err != nil {
156+
return nil, err
157+
}
158+
159+
// Create HTTP request
160+
compressed := snappy.Encode(nil, data)
161+
req, err := http.NewRequest("POST", fmt.Sprintf("http://%s/api/prom/push", c.distributorAddress), bytes.NewReader(compressed))
162+
if err != nil {
163+
return nil, err
164+
}
165+
166+
req.Header.Add("Content-Encoding", "snappy")
167+
req.Header.Set("Content-Type", "application/x-protobuf;proto=io.prometheus.write.v2.Request")
168+
req.Header.Set("X-Prometheus-Remote-Write-Version", "2.0.0")
169+
req.Header.Set("X-Scope-OrgID", c.orgID)
170+
171+
ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
172+
defer cancel()
173+
174+
// Execute HTTP request
175+
res, err := c.httpClient.Do(req.WithContext(ctx))
176+
if err != nil {
177+
return nil, err
178+
}
179+
180+
defer res.Body.Close()
181+
return res, nil
182+
}
183+
150184
func getNameAndAttributes(ts prompb.TimeSeries) (string, map[string]any) {
151185
var metricName string
152186
attributes := make(map[string]any)

0 commit comments

Comments
 (0)