Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/avalanche/avalanche.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"log"
"log/slog"
"net/http"
"os"
"syscall"
Expand Down Expand Up @@ -86,7 +87,7 @@ func main() {
if writeCfg.URL != nil {
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
if err := metrics.SendRemoteWrite(ctx, writeCfg, reg); err != nil {
if err := metrics.SendRemoteWrite(ctx, slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})), writeCfg, reg); err != nil {
return err
}
return nil // One-off.
Expand Down
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
module github.com/prometheus-community/avalanche

go 1.21
go 1.22.0

toolchain go1.23.1

require (
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.6.0
github.com/nelkinda/health-go v0.0.1
github.com/oklog/run v1.1.0
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498
github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.62.0
github.com/prometheus/prometheus v0.53.1
Expand All @@ -18,16 +19,17 @@ require (

require (
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 // indirect
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nelkinda/http-go v0.0.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
golang.org/x/sys v0.28.0 // indirect
google.golang.org/protobuf v1.36.1 // indirect
google.golang.org/protobuf v1.36.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
28 changes: 18 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9 h1:ez/4by2iGztzR4L0zgAOR8lTQK9VlyBVVd7G4omaOQs=
github.com/alecthomas/units v0.0.0-20231202071711-9a357b53e9c9/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30 h1:t3eaIm0rUkzbrIewtiFmMK5RXHej2XnoXNhxVsAYUfg=
github.com/alecthomas/units v0.0.0-20240626203959-61d1e3462e30/go.mod h1:fvzegU4vN3H1qMT+8wDmzjAcDONcgo2/SZ/TyfdUOFs=
github.com/antchfx/xmlquery v1.2.4/go.mod h1:KQQuESaxSlqugE2ZBcM/qn+ebIpt+d+4Xx7YcSGAIrM=
github.com/antchfx/xpath v1.1.6/go.mod h1:Yee4kTMuNiPYJ7nSNorELQMr1J33uOpXDMByNYhvtNk=
github.com/aslakhellesoy/gox v1.0.100/go.mod h1:AJl542QsKKG96COVsv0N74HHzVQgDIQPceVUh1aeU2M=
Expand Down Expand Up @@ -52,8 +52,6 @@ github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
Expand All @@ -67,8 +65,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -100,8 +98,10 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.5 h1:cxppBPuYhUnsO6yo/aoRol4L7q7UFfdm+bR9r+8l63Y=
github.com/prometheus/client_golang v1.20.5/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498 h1:pvxJNIN8Tr8FleTHltMExyfFF2GOFBcYbgggNDKvWDw=
github.com/prometheus/client_golang v1.20.6-0.20250117112434-e0800f53b498/go.mod h1:SJYWANZZtDbfu6CSYeDvRDkzhGX71yepDKePKDTya3M=
github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b h1:hKmTbsCQrXLTwcv27uQfkkxvFO82hBf92RkucD8VEDY=
github.com/prometheus/client_golang/exp v0.0.0-20250225163354-248c3f7f612b/go.mod h1:QdwnzTHLXXx636iZ1pfTiCI1Bn/b/20AgMqkPQr4xfA=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.62.0 h1:xasJaQlnWAeyHdUBeGjXmutelfJHWMRr+Fg4QszZ2Io=
Expand All @@ -122,10 +122,17 @@ github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down Expand Up @@ -181,8 +188,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.36.1 h1:yBPeRvTftaleIgM3PZ/WBIZ7XM/eEYAaEyCwvyjq/gk=
google.golang.org/protobuf v1.36.1/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand All @@ -194,5 +201,6 @@ gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EV
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
120 changes: 55 additions & 65 deletions metrics/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,33 +14,26 @@
package metrics

import (
"bufio"
"bytes"
"context"
"crypto/tls"
"fmt"
"io"
"log"
"log/slog"
"net/http"
"net/url"
"sort"
"sync"
"time"

"github.com/prometheus-community/avalanche/pkg/errors"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/prompb"
"gopkg.in/alecthomas/kingpin.v2"
)

const maxErrMsgLen = 256

var userAgent = "avalanche"
"github.com/prometheus-community/avalanche/pkg/errors"
)

// ConfigWrite for the remote write requests.
type ConfigWrite struct {
Expand All @@ -55,11 +48,12 @@ type ConfigWrite struct {
TenantHeader string
OutOfOrder bool
Concurrency int
WriteV2 bool
}

func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause) *ConfigWrite {
cfg := &ConfigWrite{}
flagReg("remote-url", "URL to send samples via remote_write API.").
flagReg("remote-url", "URL to send samples via remote_write API. By default, path is set to api/v1/write").
URLVar(&cfg.URL)
flagReg("remote-concurrency-limit", "how many concurrent writes can happen at any given time").Default("20").
IntVar(&cfg.Concurrency)
Expand All @@ -78,6 +72,8 @@ func NewWriteConfigFromFlags(flagReg func(name, help string) *kingpin.FlagClause
// TODO(bwplotka): Make this a non-bool flag (e.g. out-of-order-min-time).
flagReg("remote-out-of-order", "Enable out-of-order timestamps in remote write requests").Default("true").
BoolVar(&cfg.OutOfOrder)
flagReg("remote-write-v2", "Send remote write v2 format requests.").Default("false").
BoolVar(&cfg.WriteV2)

return cfg
}
Expand All @@ -101,27 +97,44 @@ func (c *ConfigWrite) Validate() error {

// Client for the remote write requests.
type Client struct {
client *http.Client
timeout time.Duration
config *ConfigWrite
gatherer prometheus.Gatherer
logger *slog.Logger
timeout time.Duration
config *ConfigWrite
gatherer prometheus.Gatherer
remoteAPI *remote.API
}

// SendRemoteWrite initializes a http client and
// sends metrics to a prometheus compatible remote endpoint.
func SendRemoteWrite(ctx context.Context, cfg *ConfigWrite, gatherer prometheus.Gatherer) error {
func SendRemoteWrite(ctx context.Context, logger *slog.Logger, cfg *ConfigWrite, gatherer prometheus.Gatherer) error {
var rt http.RoundTripper = &http.Transport{
TLSClientConfig: &cfg.TLSClientConfig,
}
rt = &tenantRoundTripper{tenant: cfg.Tenant, tenantHeader: cfg.TenantHeader, rt: rt}
rt = &userAgentRoundTripper{userAgent: "avalanche", rt: rt}
httpClient := &http.Client{Transport: rt}

remoteAPI, err := remote.NewAPI(
cfg.URL.String(),
remote.WithAPIHTTPClient(httpClient),
remote.WithAPILogger(logger.With("component", "remote_write_api")),
)
if err != nil {
return err
}

client := Client{
client: httpClient,
timeout: time.Minute,
config: cfg,
gatherer: gatherer,
logger: logger,
timeout: time.Minute,
config: cfg,
gatherer: gatherer,
remoteAPI: remoteAPI,
}

if cfg.WriteV2 {
return client.writeV2(ctx)
}

return client.write(ctx)
}

Expand All @@ -138,6 +151,18 @@ type tenantRoundTripper struct {
rt http.RoundTripper
}

// User agent round tripper
type userAgentRoundTripper struct {
userAgent string
rt http.RoundTripper
}

func (rt *userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req = cloneRequest(req)
req.Header.Set("User-Agent", rt.userAgent)
return rt.rt.RoundTrip(req)
}

// cloneRequest returns a clone of the provided *http.Request.
// The clone is a shallow copy of the struct and its Header map.
func cloneRequest(r *http.Request) *http.Request {
Expand Down Expand Up @@ -228,10 +253,13 @@ func (c *Client) write(ctx context.Context) error {
req := &prompb.WriteRequest{
Timeseries: tss[i:end],
}
if err := c.Store(context.TODO(), req); err != nil {

if _, err := c.remoteAPI.Write(ctx, remote.WriteV1MessageType, req); err != nil {
merr.Add(err)
c.logger.Error("error writing metrics", "error", err)
return
}

mtx.Lock()
totalSamplesAct += len(tss[i:end])
mtx.Unlock()
Expand All @@ -247,7 +275,11 @@ func (c *Client) write(ctx context.Context) error {
if c.config.RequestCount*len(tss) != totalSamplesAct {
merr.Add(fmt.Errorf("total samples mismatch, exp:%v , act:%v", totalSamplesExp, totalSamplesAct))
}
log.Printf("Total request time: %v ; Total samples: %v; Samples/sec: %v\n", totalTime.Round(time.Second), totalSamplesAct, int(float64(totalSamplesAct)/totalTime.Seconds()))
c.logger.Info("metrics summary",
"total_time", totalTime.Round(time.Second),
"total_samples", totalSamplesAct,
"samples_per_sec", int(float64(totalSamplesAct)/totalTime.Seconds()),
"errors", merr.Count())
return merr.Err()
}

Expand Down Expand Up @@ -335,45 +367,3 @@ func prompbLabels(name string, label []*dto.LabelPair) []prompb.Label {
})
return ret
}

// Store sends a batch of samples to the HTTP endpoint.
func (c *Client) Store(ctx context.Context, req *prompb.WriteRequest) error {
data, err := proto.Marshal(req)
if err != nil {
return err
}

compressed := snappy.Encode(nil, data)
httpReq, err := http.NewRequest("POST", c.config.URL.String(), bytes.NewReader(compressed))
if err != nil {
// Errors from NewRequest are from unparseable URLs, so are not
// recoverable.
return err
}
httpReq.Header.Add("Content-Encoding", "snappy")
httpReq.Header.Set("Content-Type", "application/x-protobuf")
httpReq.Header.Set("User-Agent", userAgent)
httpReq.Header.Set("X-Prometheus-Remote-Write-Version", "0.1.0")
httpReq = httpReq.WithContext(ctx)

ctx, cancel := context.WithTimeout(context.Background(), c.timeout)
defer cancel()

httpResp, err := c.client.Do(httpReq.WithContext(ctx))
if err != nil {
return err
}
defer httpResp.Body.Close()

if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
line := ""
if scanner.Scan() {
line = scanner.Text()
}
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line)
log.Println(err)
}

return err
}
Loading
Loading