@@ -16,17 +16,20 @@ package profilestore
1616import (
1717 "bytes"
1818 "context"
19+ "encoding/base64"
1920 "fmt"
2021 "strings"
2122 "sync"
2223 "time"
2324
25+ "github.com/apache/arrow-go/v18/arrow"
2426 "github.com/apache/arrow-go/v18/arrow/ipc"
2527 "github.com/apache/arrow-go/v18/arrow/memory"
2628 "github.com/go-kit/log"
2729 "github.com/gogo/status"
2830 "github.com/polarsignals/frostdb/dynparquet"
2931 "github.com/prometheus/client_golang/prometheus"
32+ "github.com/prometheus/prometheus/promql/parser"
3033 "go.opentelemetry.io/otel/trace"
3134 otelgrpcprofilingpb "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"
3235 "google.golang.org/grpc/codes"
@@ -37,6 +40,7 @@ import (
3740 profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
3841 "github.com/parca-dev/parca/pkg/ingester"
3942 "github.com/parca-dev/parca/pkg/normalizer"
43+ "github.com/parca-dev/parca/pkg/profile"
4044)
4145
4246type agent struct {
@@ -109,9 +113,102 @@ func (s *ProfileColumnStore) writeSeries(ctx context.Context, req *profilestorep
109113 return nil
110114 }
111115
116+ schema := r .Schema ()
117+
118+ nameIdx := schema .FieldIndices (profile .ColumnName )
119+ if len (nameIdx ) == 0 {
120+ return fmt .Errorf ("missing required column: %s" , profile .ColumnName )
121+ }
122+ sampleTypeIdx := schema .FieldIndices (profile .ColumnSampleType )
123+ if len (sampleTypeIdx ) == 0 {
124+ return fmt .Errorf ("missing required column: %s" , profile .ColumnSampleType )
125+ }
126+ sampleUnitIdx := schema .FieldIndices (profile .ColumnSampleUnit )
127+ if len (sampleUnitIdx ) == 0 {
128+ return fmt .Errorf ("missing required column: %s" , profile .ColumnSampleUnit )
129+ }
130+ periodTypeIdx := schema .FieldIndices (profile .ColumnPeriodType )
131+ if len (periodTypeIdx ) == 0 {
132+ return fmt .Errorf ("missing required column: %s" , profile .ColumnPeriodType )
133+ }
134+ periodUnitIdx := schema .FieldIndices (profile .ColumnPeriodUnit )
135+ if len (periodUnitIdx ) == 0 {
136+ return fmt .Errorf ("missing required column: %s" , profile .ColumnPeriodUnit )
137+ }
138+ durationIdx := schema .FieldIndices (profile .ColumnDuration )
139+ if len (durationIdx ) == 0 {
140+ return fmt .Errorf ("missing required column: %s" , profile .ColumnDuration )
141+ }
142+
143+ nameCol := r .Column (nameIdx [0 ])
144+ sampleTypeCol := r .Column (sampleTypeIdx [0 ])
145+ sampleUnitCol := r .Column (sampleUnitIdx [0 ])
146+ periodTypeCol := r .Column (periodTypeIdx [0 ])
147+ periodUnitCol := r .Column (periodUnitIdx [0 ])
148+ durationCol := r .Column (durationIdx [0 ])
149+
150+ for rowIdx := 0 ; rowIdx < int (r .NumRows ()); rowIdx ++ {
151+ profileType , err := getProfileTypeString (rowIdx , nameCol , sampleTypeCol , sampleUnitCol , periodTypeCol , periodUnitCol , durationCol )
152+ if err != nil {
153+ return fmt .Errorf ("failed to get profile type at row %d: %v" , rowIdx , err )
154+ }
155+
156+ // Validate profile type by trying to parse it as a PromQL selector.
157+ queryStr := fmt .Sprintf ("%s{}" , profileType )
158+ if _ , parseErr := parser .ParseMetricSelector (queryStr ); parseErr != nil {
159+ return fmt .Errorf ("invalid profile type at row %d (%s): %v" , rowIdx , profileType , parseErr )
160+ }
161+ }
162+
112163 return s .ingester .Ingest (ctx , r )
113164}
114165
166+ func getProfileTypeString (rowIdx int , nameCol , sampleTypeCol , sampleUnitCol , periodTypeCol , periodUnitCol , durationCol arrow.Array ) (string , error ) {
167+ nameEncoded := nameCol .ValueStr (rowIdx )
168+ sampleTypeEncoded := sampleTypeCol .ValueStr (rowIdx )
169+ sampleUnitEncoded := sampleUnitCol .ValueStr (rowIdx )
170+ periodTypeEncoded := periodTypeCol .ValueStr (rowIdx )
171+ periodUnitEncoded := periodUnitCol .ValueStr (rowIdx )
172+ duration := durationCol .ValueStr (rowIdx )
173+
174+ nameBytes , err := base64 .StdEncoding .DecodeString (nameEncoded )
175+ if err != nil {
176+ return "" , fmt .Errorf ("failed to decode name: %v" , err )
177+ }
178+ sampleTypeBytes , err := base64 .StdEncoding .DecodeString (sampleTypeEncoded )
179+ if err != nil {
180+ return "" , fmt .Errorf ("failed to decode sample_type: %v" , err )
181+ }
182+ sampleUnitBytes , err := base64 .StdEncoding .DecodeString (sampleUnitEncoded )
183+ if err != nil {
184+ return "" , fmt .Errorf ("failed to decode sample_unit: %v" , err )
185+ }
186+ periodTypeBytes , err := base64 .StdEncoding .DecodeString (periodTypeEncoded )
187+ if err != nil {
188+ return "" , fmt .Errorf ("failed to decode period_type: %v" , err )
189+ }
190+ periodUnitBytes , err := base64 .StdEncoding .DecodeString (periodUnitEncoded )
191+ if err != nil {
192+ return "" , fmt .Errorf ("failed to decode period_unit: %v" , err )
193+ }
194+
195+ name := string (nameBytes )
196+ sampleType := string (sampleTypeBytes )
197+ sampleUnit := string (sampleUnitBytes )
198+ periodType := string (periodTypeBytes )
199+ periodUnit := string (periodUnitBytes )
200+
201+ // Construct profile type string: name:sample_type:sample_unit:period_type:period_unit(:delta)
202+ profileType := fmt .Sprintf ("%s:%s:%s:%s:%s" , name , sampleType , sampleUnit , periodType , periodUnit )
203+
204+ // Add delta suffix if duration is not 0
205+ if duration != "0" {
206+ profileType += ":delta"
207+ }
208+
209+ return profileType , nil
210+ }
211+
115212func (s * ProfileColumnStore ) updateAgents (nodeNameAndIP string , ag agent ) {
116213 s .mtx .Lock ()
117214 defer s .mtx .Unlock ()
0 commit comments