Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions cmd/profilecli/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"fmt"
"net/http"
"strings"

"connectrpc.com/connect"
"github.com/prometheus/common/version"
Expand All @@ -17,7 +18,21 @@ const (
protocolTypeGRPCWeb = "grpc-web"
)

var acceptHeaderFeatureFlags = []string{
"allow-utf8-labelnames=true",
}

var userAgentHeader = fmt.Sprintf("pyroscope/%s", version.Version)
var acceptHeaderMimeType = "*/*"

func buildAcceptHeader(featureFlags []string) string {
acceptHeader := acceptHeaderMimeType
if len(acceptHeaderFeatureFlags) > 0 {
acceptHeader += "; " + strings.Join(featureFlags, "; ")
}

return acceptHeader
}

type phlareClient struct {
TenantID string
Expand Down Expand Up @@ -46,7 +61,10 @@ func (a *authRoundTripper) RoundTrip(req *http.Request) (*http.Response, error)
}
}

acceptHeader := buildAcceptHeader(acceptHeaderFeatureFlags)
req.Header.Set("Accept", acceptHeader)
req.Header.Set("User-Agent", userAgentHeader)

return a.next.RoundTrip(req)
}

Expand Down
53 changes: 34 additions & 19 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
distributormodel "github.com/grafana/pyroscope/pkg/distributor/model"
"github.com/grafana/pyroscope/pkg/distributor/sampling"
"github.com/grafana/pyroscope/pkg/distributor/writepath"
"github.com/grafana/pyroscope/pkg/featureflags"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/model/pprofsplit"
"github.com/grafana/pyroscope/pkg/model/relabel"
Expand Down Expand Up @@ -248,7 +249,14 @@ func (d *Distributor) Push(ctx context.Context, grpcReq *connect.Request[pushv1.
Series: make([]*distributormodel.ProfileSeries, 0, len(grpcReq.Msg.Series)),
RawProfileType: distributormodel.RawProfileTypePPROF,
}

allErrors := multierror.New()

clientCapabilities, parseErr := featureflags.ParseClientCapabilities(grpcReq.Header())
if parseErr != nil {
allErrors.Add(parseErr)
}

for _, grpcSeries := range grpcReq.Msg.Series {
for _, grpcSample := range grpcSeries.Samples {
profile, err := pprof.RawFromBytes(grpcSample.RawProfile)
Expand All @@ -257,10 +265,11 @@ func (d *Distributor) Push(ctx context.Context, grpcReq *connect.Request[pushv1.
continue
}
series := &distributormodel.ProfileSeries{
Labels: grpcSeries.Labels,
Profile: profile,
RawProfile: grpcSample.RawProfile,
ID: grpcSample.ID,
Labels: grpcSeries.Labels,
Profile: profile,
RawProfile: grpcSample.RawProfile,
ID: grpcSample.ID,
ClientCapabilities: clientCapabilities,
}
req.Series = append(req.Series, series)
}
Expand Down Expand Up @@ -630,10 +639,11 @@ func (d *Distributor) aggregate(ctx context.Context, req *distributormodel.Profi
return handleErr
}
aggregated := &distributormodel.ProfileSeries{
TenantID: req.TenantID,
Labels: labels,
Profile: pprof.RawFromProto(p.Profile()),
Annotations: annotations,
TenantID: req.TenantID,
Labels: labels,
Profile: pprof.RawFromProto(p.Profile()),
Annotations: annotations,
ClientCapabilities: req.ClientCapabilities,
}
return d.router.Send(localCtx, aggregated)
})()
Expand Down Expand Up @@ -1132,9 +1142,10 @@ func (d *Distributor) visitSampleSeries(s *distributormodel.ProfileSeries, visit
var result []*distributormodel.ProfileSeries
usageGroups := d.usageGroupEvaluator.GetMatch(s.TenantID, usageConfig, s.Labels)
visitor := &sampleSeriesVisitor{
tenantID: s.TenantID,
limits: d.limits,
profile: s.Profile,
tenantID: s.TenantID,
limits: d.limits,
profile: s.Profile,
clientCapabilities: s.ClientCapabilities,
}
if err := visit(s.Profile.Profile, s.Labels, relabelingRules, visitor); err != nil {
validation.DiscardedProfiles.WithLabelValues(string(validation.ReasonOf(err)), s.TenantID).Add(float64(s.TotalProfiles))
Expand Down Expand Up @@ -1164,24 +1175,28 @@ func (d *Distributor) visitSampleSeries(s *distributormodel.ProfileSeries, visit
}

type sampleSeriesVisitor struct {
tenantID string
limits Limits
profile *pprof.Profile
exp *pprof.SampleExporter
series []*distributormodel.ProfileSeries
tenantID string
limits Limits
profile *pprof.Profile
exp *pprof.SampleExporter
series []*distributormodel.ProfileSeries
clientCapabilities []*featureflags.ClientCapability

discardedBytes int
discardedProfiles int
}

func (v *sampleSeriesVisitor) ValidateLabels(labels phlaremodel.Labels) error {
return validation.ValidateLabels(v.limits, v.tenantID, labels)
capability := featureflags.GetClientCapability(v.clientCapabilities, featureflags.AllowUtf8LabelNamesCapabilityName)
utf8LabelNamesEnabled := capability != nil && capability.Value == "true"
return validation.ValidateLabels(v.limits, v.tenantID, utf8LabelNamesEnabled, labels)
}

func (v *sampleSeriesVisitor) VisitProfile(labels phlaremodel.Labels) {
v.series = append(v.series, &distributormodel.ProfileSeries{
Profile: v.profile,
Labels: labels,
Profile: v.profile,
Labels: labels,
ClientCapabilities: v.clientCapabilities,
})
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/distributor/model/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/grafana/pyroscope/pkg/distributor/annotation"
"github.com/grafana/pyroscope/pkg/distributor/ingestlimits"
"github.com/grafana/pyroscope/pkg/distributor/sampling"
"github.com/grafana/pyroscope/pkg/featureflags"
phlaremodel "github.com/grafana/pyroscope/pkg/model"
"github.com/grafana/pyroscope/pkg/pprof"
)
Expand All @@ -30,6 +31,9 @@ type ProfileSeries struct {
RawProfile []byte // may be nil if the Profile is composed not from pprof ( e.g. jfr)
ID string

// List of features the client supports
ClientCapabilities []*featureflags.ClientCapability

// todo split
// Transient state
TenantID string
Expand Down Expand Up @@ -89,6 +93,7 @@ func (req *ProfileSeries) Clone() *ProfileSeries {
ID: req.ID,
Language: req.Language,
Annotations: req.Annotations,
ClientCapabilities: req.ClientCapabilities,
}
return c
}
Expand Down
52 changes: 52 additions & 0 deletions pkg/featureflags/client_capability.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package featureflags

import (
"fmt"
"mime"
"net/http"
)

const (
// Capability names
AllowUtf8LabelNamesCapabilityName = "allow-utf8-labelnames"
)

type ClientCapability struct {
Name string
Value string
}

func ParseClientCapabilities(header http.Header) ([]*ClientCapability, error) {
acceptHeader := header.Get("Accept")
if acceptHeader != "" {
if _, params, err := mime.ParseMediaType(acceptHeader); err != nil {
return nil, err
} else {
capabilities := make([]*ClientCapability, 0, len(params))
seenCapabilityNames := make(map[string]struct{})
for k, v := range params {
// Check for duplicates
if _, ok := seenCapabilityNames[k]; ok {
return nil, fmt.Errorf("duplicate client capabilities parsed from `Accept:` header: '%s'",
params)
}
seenCapabilityNames[k] = struct{}{}

capabilities = append(capabilities, &ClientCapability{Name: k, Value: v})
}

return capabilities, nil
}
}

return []*ClientCapability{}, nil
}

func GetClientCapability(capabilities []*ClientCapability, capabilityName string) *ClientCapability {
for _, capability := range capabilities {
if capability.Name == capabilityName {
return capability
}
}
return nil
}
17 changes: 15 additions & 2 deletions pkg/validation/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ type LabelValidationLimits interface {
}

// ValidateLabels validates the labels of a profile.
func ValidateLabels(limits LabelValidationLimits, tenantID string, ls []*typesv1.LabelPair) error {
func ValidateLabels(limits LabelValidationLimits, tenantID string, utf8LabelNamesEnabled bool, ls []*typesv1.LabelPair) error {
if len(ls) == 0 {
return NewErrorf(MissingLabels, MissingLabelsErrorMsg)
}
Expand Down Expand Up @@ -146,7 +146,12 @@ func ValidateLabels(limits LabelValidationLimits, tenantID string, ls []*typesv1
if len(l.Value) > limits.MaxLabelValueLength(tenantID) {
return NewErrorf(LabelValueTooLong, LabelValueTooLongErrorMsg, phlaremodel.LabelPairsString(ls), l.Value)
}
if origName, newName, ok := SanitizeLabelName(l.Name); ok && origName != newName {
// Note this conditional falls back on legacy logic if not valid utf-8 label name
if ok := ValidateUtf8LabelName(l.Name); utf8LabelNamesEnabled && ok {
idx += 1
continue
} else if origName, newName, ok := SanitizeLabelName(l.Name); ok && origName != newName {
// Legacy logic if client does not specify utf8LabelNamesEnabled
var err error
ls, idx, err = handleSanitizedLabel(ls, idx, origName, newName)
if err != nil {
Expand Down Expand Up @@ -211,6 +216,14 @@ func handleSanitizedLabel(ls []*typesv1.LabelPair, origIdx int, origName, newNam
return ls[:len(newSlice)], finalIdx, nil
}

// ValidateUtf8LabelName validates the name is not empty and is a valid utf-8 string
func ValidateUtf8LabelName(name string) bool {
if len(name) == 0 {
return false
}
return utf8.ValidString(name)
}

// SanitizeLabelName reports whether the label name is valid,
// and returns the sanitized value.
//
Expand Down
Loading