Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/debuginfo/debuginfod.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package debuginfo

import (
"bytes"
"context"
"errors"
"fmt"
"io"
Expand All @@ -34,7 +35,6 @@ import (
"go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/net/context"

debuginfopb "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1"
"github.com/parca-dev/parca/pkg/cache"
Expand Down
72 changes: 72 additions & 0 deletions pkg/profilestore/profilecolumnstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@ package profilestore
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"strings"
"sync"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql/parser"
"go.opentelemetry.io/otel/trace"
otelgrpcprofilingpb "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"
"google.golang.org/grpc/codes"
Expand All @@ -37,6 +40,7 @@ import (
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/parca-dev/parca/pkg/ingester"
"github.com/parca-dev/parca/pkg/normalizer"
"github.com/parca-dev/parca/pkg/profile"
)

type agent struct {
Expand Down Expand Up @@ -109,9 +113,77 @@ func (s *ProfileColumnStore) writeSeries(ctx context.Context, req *profilestorep
return nil
}

schema := r.Schema()

nameCol := r.Column(schema.FieldIndices(profile.ColumnName)[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is user-supplied data, so we shouldn't panic if there is no field index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah makes sense. Fixing it.

sampleTypeCol := r.Column(schema.FieldIndices(profile.ColumnSampleType)[0])
sampleUnitCol := r.Column(schema.FieldIndices(profile.ColumnSampleUnit)[0])
periodTypeCol := r.Column(schema.FieldIndices(profile.ColumnPeriodType)[0])
periodUnitCol := r.Column(schema.FieldIndices(profile.ColumnPeriodUnit)[0])
durationCol := r.Column(schema.FieldIndices(profile.ColumnDuration)[0])

for rowIdx := 0; rowIdx < int(r.NumRows()); rowIdx++ {
profileType, err := getProfileTypeString(rowIdx, nameCol, sampleTypeCol, sampleUnitCol, periodTypeCol, periodUnitCol, durationCol)
if err != nil {
return fmt.Errorf("failed to get profile type at row %d: %v", rowIdx, err)
}

// Validate profile type by trying to parse it as a PromQL selector.
queryStr := fmt.Sprintf("%s{}", profileType)
if _, parseErr := parser.ParseMetricSelector(queryStr); parseErr != nil {
return fmt.Errorf("invalid profile type at row %d (%s): %v", rowIdx, profileType, parseErr)
}
}

return s.ingester.Ingest(ctx, r)
}

func getProfileTypeString(rowIdx int, nameCol, sampleTypeCol, sampleUnitCol, periodTypeCol, periodUnitCol, durationCol arrow.Array) (string, error) {
nameEncoded := nameCol.ValueStr(rowIdx)
sampleTypeEncoded := sampleTypeCol.ValueStr(rowIdx)
sampleUnitEncoded := sampleUnitCol.ValueStr(rowIdx)
periodTypeEncoded := periodTypeCol.ValueStr(rowIdx)
periodUnitEncoded := periodUnitCol.ValueStr(rowIdx)
duration := durationCol.ValueStr(rowIdx)

nameBytes, err := base64.StdEncoding.DecodeString(nameEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode name: %v", err)
}
sampleTypeBytes, err := base64.StdEncoding.DecodeString(sampleTypeEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode sample_type: %v", err)
}
sampleUnitBytes, err := base64.StdEncoding.DecodeString(sampleUnitEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode sample_unit: %v", err)
}
periodTypeBytes, err := base64.StdEncoding.DecodeString(periodTypeEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode period_type: %v", err)
}
periodUnitBytes, err := base64.StdEncoding.DecodeString(periodUnitEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode period_unit: %v", err)
}

name := string(nameBytes)
sampleType := string(sampleTypeBytes)
sampleUnit := string(sampleUnitBytes)
periodType := string(periodTypeBytes)
periodUnit := string(periodUnitBytes)

// Construct profile type string: name:sample_type:sample_unit:period_type:period_unit(:delta)
profileType := fmt.Sprintf("%s:%s:%s:%s:%s", name, sampleType, sampleUnit, periodType, periodUnit)

// Add delta suffix if duration is not 0
if duration != "0" {
profileType += ":delta"
}

return profileType, nil
}

func (s *ProfileColumnStore) updateAgents(nodeNameAndIP string, ag agent) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
Loading