Skip to content

Commit ac59c38

Browse files
committed
feat: support jaeger grpc
Signed-off-by: Young Xu <xuthus5@gmail.com>
1 parent ff78db8 commit ac59c38

File tree

1 file changed

+290
-32
lines changed

1 file changed

+290
-32
lines changed

trace/service/storage_reader.go

Lines changed: 290 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ import (
1919
"encoding/hex"
2020
"errors"
2121
"fmt"
22-
"log/slog"
22+
"time"
2323

2424
"github.com/openGemini/opengemini-client-go/opengemini"
2525
otlpcommonv1 "go.opentelemetry.io/proto/otlp/common/v1"
2626
otlpresourcev1 "go.opentelemetry.io/proto/otlp/resource/v1"
2727
otlptracev1 "go.opentelemetry.io/proto/otlp/trace/v1"
2828
"google.golang.org/grpc"
29+
"google.golang.org/protobuf/types/known/timestamppb"
2930

3031
"github.com/openGemini/observability/trace/gen/jaeger_storage/v2"
3132
"github.com/openGemini/observability/trace/utils"
@@ -37,13 +38,147 @@ var (
3738
)
3839

3940
func (o *OpenGeminiStorage) GetDependencies(ctx context.Context, request *jaeger_storage.GetDependenciesRequest) (*jaeger_storage.GetDependenciesResponse, error) {
40-
//TODO implement me
41-
panic("implement me")
41+
resp := &jaeger_storage.GetDependenciesResponse{}
42+
qb := opengemini.CreateQueryBuilder()
43+
if request != nil {
44+
var conds []opengemini.Condition
45+
if request.StartTime != nil && request.StartTime.AsTime().UnixNano() != 0 {
46+
conds = append(conds, opengemini.NewComparisonCondition(StartTimeUnixNano, opengemini.GreaterThan, request.StartTime.AsTime().UnixNano()))
47+
}
48+
if request.EndTime != nil && request.EndTime.AsTime().UnixNano() != 0 {
49+
conds = append(conds, opengemini.NewComparisonCondition(EndTimeUnixNano, opengemini.LessThan, request.EndTime.AsTime().UnixNano()))
50+
}
51+
if len(conds) > 0 {
52+
qb = qb.Where(opengemini.NewCompositeCondition(opengemini.And, conds...))
53+
}
54+
}
55+
rd := qb.Build()
56+
rd.Database = o.config.Database
57+
qr, err := o.client.Query(*rd)
58+
if err != nil {
59+
return nil, err
60+
}
61+
if qr.Error != "" {
62+
return nil, errors.New(qr.Error)
63+
}
64+
if len(qr.Results) == 0 || qr.Results[0] == nil || qr.Results[0].Error != "" {
65+
return resp, nil
66+
}
67+
type spanMeta struct {
68+
service string
69+
parent string
70+
kind int
71+
}
72+
traces := map[string]map[string]spanMeta{}
73+
for _, series := range qr.Results[0].Series {
74+
for _, row := range series.Values {
75+
var traceHex, spanHex, parentHex, serviceName string
76+
var kind int
77+
for idx, v := range row {
78+
col := series.Columns[idx]
79+
switch col {
80+
case TraceID:
81+
if v != nil {
82+
traceHex = v.(string)
83+
}
84+
case SpanID:
85+
if v != nil {
86+
spanHex = v.(string)
87+
}
88+
case ParentSpanID:
89+
if v != nil {
90+
parentHex = v.(string)
91+
}
92+
case SpanKind:
93+
if v != nil {
94+
kind = int(v.(float64))
95+
}
96+
case ServiceName:
97+
if v != nil {
98+
serviceName = fmt.Sprintf("%v", v)
99+
}
100+
}
101+
}
102+
if traceHex == "" || spanHex == "" {
103+
continue
104+
}
105+
m := traces[traceHex]
106+
if m == nil {
107+
m = map[string]spanMeta{}
108+
}
109+
m[spanHex] = spanMeta{service: serviceName, parent: parentHex, kind: kind}
110+
traces[traceHex] = m
111+
}
112+
}
113+
type edgeKey struct{ parent, child string }
114+
counts := map[edgeKey]uint64{}
115+
for _, spans := range traces {
116+
for _, sm := range spans {
117+
childSvc := sm.service
118+
if childSvc == "" {
119+
continue
120+
}
121+
parentSvc := ""
122+
if sm.parent != "" {
123+
if p, ok := spans[sm.parent]; ok {
124+
parentSvc = p.service
125+
}
126+
}
127+
if parentSvc == "" || parentSvc == childSvc {
128+
continue
129+
}
130+
counts[edgeKey{parent: parentSvc, child: childSvc}]++
131+
}
132+
}
133+
for ek, c := range counts {
134+
resp.Dependencies = append(resp.Dependencies, &jaeger_storage.Dependency{
135+
Parent: ek.parent,
136+
Child: ek.child,
137+
CallCount: c,
138+
Source: "opengemini",
139+
})
140+
}
141+
return resp, nil
42142
}
43143

44144
func (o *OpenGeminiStorage) GetTraces(request *jaeger_storage.GetTracesRequest, g grpc.ServerStreamingServer[otlptracev1.TracesData]) error {
45-
//TODO implement me
46-
slog.Info("GetTraces called")
145+
if request == nil || len(request.Query) == 0 {
146+
return nil
147+
}
148+
for _, qp := range request.Query {
149+
// 以 service 维度不在 GetTraces 提供,因此这里可能需要全库扫描;
150+
// 简化:若用户通过 FindTraceIDs 获得时间窗口,则带上窗口过滤。
151+
qb := opengemini.CreateQueryBuilder()
152+
// 无 measurement 信息时无法指定 From,这里选择全库由后端处理。
153+
conds := []opengemini.Condition{
154+
opengemini.NewComparisonCondition(TraceID, opengemini.Equals, hex.EncodeToString(qp.TraceId)),
155+
}
156+
if qp.StartTime != nil && qp.StartTime.AsTime().UnixNano() != 0 {
157+
conds = append(conds, opengemini.NewComparisonCondition(StartTimeUnixNano, opengemini.GreaterThan, qp.StartTime.AsTime().UnixNano()))
158+
}
159+
if qp.EndTime != nil && qp.EndTime.AsTime().UnixNano() != 0 {
160+
conds = append(conds, opengemini.NewComparisonCondition(EndTimeUnixNano, opengemini.LessThan, qp.EndTime.AsTime().UnixNano()))
161+
}
162+
qb = qb.Where(opengemini.NewCompositeCondition(opengemini.And, conds...))
163+
requestData := qb.Build()
164+
requestData.Database = o.config.Database
165+
resp, err := o.client.Query(*requestData)
166+
if err != nil {
167+
return err
168+
}
169+
if resp.Error != "" {
170+
return errors.New(resp.Error)
171+
}
172+
traceDataList, err := o.convertTraceData(&jaeger_storage.TraceQueryParameters{}, resp.Results)
173+
if err != nil {
174+
return err
175+
}
176+
for _, td := range traceDataList {
177+
if err := g.Send(td); err != nil {
178+
return err
179+
}
180+
}
181+
}
47182
return nil
48183
}
49184

@@ -123,54 +258,65 @@ func (o *OpenGeminiStorage) FindTraces(request *jaeger_storage.FindTracesRequest
123258
return errors.New(queryResult.Error)
124259
}
125260

126-
traceData, err := o.convertTraceData(query, queryResult.Results)
261+
traceDataList, err := o.convertTraceData(query, queryResult.Results)
127262
if err != nil {
128263
o.logger.Error("failed to convert traces", "query", requestData.Command, "reason", err)
129264
return err
130265
}
131-
132-
err = g.Send(traceData)
133-
if err != nil {
134-
o.logger.Error("failed to send traces", "query", requestData.Command, "reason", err)
135-
return err
266+
for _, td := range traceDataList {
267+
if err := g.Send(td); err != nil {
268+
o.logger.Error("failed to send traces", "query", requestData.Command, "reason", err)
269+
return err
270+
}
136271
}
137272
return nil
138273
}
139274

140-
func (o *OpenGeminiStorage) convertTraceData(query *jaeger_storage.TraceQueryParameters, results []*opengemini.SeriesResult) (*otlptracev1.TracesData, error) {
141-
var response = new(otlptracev1.TracesData)
275+
func (o *OpenGeminiStorage) convertTraceData(query *jaeger_storage.TraceQueryParameters, results []*opengemini.SeriesResult) ([]*otlptracev1.TracesData, error) {
276+
var responseList []*otlptracev1.TracesData
142277
if len(results) == 0 {
143278
o.logger.Warn("no results to convert")
144-
return response, nil
279+
return responseList, nil
145280
}
146281

147282
seriesResult := results[0]
148283
if seriesResult.Error != "" {
149284
o.logger.Error("convertTraceData failed", "reason", seriesResult.Error)
150-
return response, nil
285+
return responseList, nil
286+
}
287+
288+
// group by trace_id
289+
type grouped struct {
290+
resourceSpans *otlptracev1.ResourceSpans
291+
scopeSpans *otlptracev1.ScopeSpans
151292
}
293+
groups := make(map[string]*grouped)
152294

153295
seriesList := seriesResult.Series
154-
var resourceSpans = new(otlptracev1.ResourceSpans)
155-
var scopeSpans = new(otlptracev1.ScopeSpans)
156296
for _, series := range seriesList {
157297
var lines = series.Values
158298
for _, columns := range lines {
159299
var span = new(otlptracev1.Span)
300+
var traceIDHex string
160301
for idx, columnValue := range columns {
161302
if series.Columns[idx] == ColumnNameTimestamp || series.Columns[idx] == ProcessTags ||
162303
series.Columns[idx] == DurationUnixNano {
163304
continue
164305
}
165306
if series.Columns[idx] == TraceID {
166-
span.TraceId, _ = hex.DecodeString(columnValue.(string))
307+
hexStr := columnValue.(string)
308+
span.TraceId, _ = hex.DecodeString(hexStr)
309+
traceIDHex = hexStr
167310
continue
168311
}
169312
if series.Columns[idx] == SpanID {
170313
span.SpanId, _ = hex.DecodeString(columnValue.(string))
171314
continue
172315
}
173316
if series.Columns[idx] == ParentSpanID {
317+
if columnValue == nil {
318+
continue // if parent_span_id nil, not set
319+
}
174320
span.ParentSpanId, _ = hex.DecodeString(columnValue.(string))
175321
continue
176322
}
@@ -199,24 +345,136 @@ func (o *OpenGeminiStorage) convertTraceData(query *jaeger_storage.TraceQueryPar
199345
Value: &otlpcommonv1.AnyValue{Value: &otlpcommonv1.AnyValue_StringValue{StringValue: fmt.Sprintf("%v", columnValue)}},
200346
})
201347
}
202-
scopeSpans.Spans = append(scopeSpans.Spans, span)
348+
349+
if traceIDHex == "" {
350+
continue
351+
}
352+
353+
grp, ok := groups[traceIDHex]
354+
if !ok {
355+
grp = &grouped{
356+
resourceSpans: &otlptracev1.ResourceSpans{},
357+
scopeSpans: &otlptracev1.ScopeSpans{},
358+
}
359+
grp.resourceSpans.Resource = &otlpresourcev1.Resource{
360+
Attributes: []*otlpcommonv1.KeyValue{
361+
{
362+
Key: ServiceName,
363+
Value: &otlpcommonv1.AnyValue{Value: &otlpcommonv1.AnyValue_StringValue{StringValue: query.ServiceName}},
364+
},
365+
},
366+
}
367+
groups[traceIDHex] = grp
368+
}
369+
grp.scopeSpans.Spans = append(grp.scopeSpans.Spans, span)
203370
}
204371
}
205-
resourceSpans.ScopeSpans = append(resourceSpans.ScopeSpans, scopeSpans)
206-
resourceSpans.Resource = &otlpresourcev1.Resource{
207-
Attributes: []*otlpcommonv1.KeyValue{
208-
{
209-
Key: ServiceName,
210-
Value: &otlpcommonv1.AnyValue{Value: &otlpcommonv1.AnyValue_StringValue{StringValue: query.ServiceName}},
211-
},
212-
},
372+
373+
for _, grp := range groups {
374+
grp.resourceSpans.ScopeSpans = append(grp.resourceSpans.ScopeSpans, grp.scopeSpans)
375+
td := &otlptracev1.TracesData{ResourceSpans: []*otlptracev1.ResourceSpans{grp.resourceSpans}}
376+
responseList = append(responseList, td)
213377
}
214-
response.ResourceSpans = append(response.ResourceSpans, resourceSpans)
215-
return response, nil
378+
379+
return responseList, nil
216380
}
217381

218382
func (o *OpenGeminiStorage) FindTraceIDs(ctx context.Context, request *jaeger_storage.FindTracesRequest) (*jaeger_storage.FindTraceIDsResponse, error) {
219-
//TODO implement me
220-
slog.Error("FindTraceIDs called")
221-
return &jaeger_storage.FindTraceIDsResponse{}, nil
383+
var queryBuilder = opengemini.CreateQueryBuilder()
384+
var query = request.GetQuery()
385+
if query.ServiceName != "" {
386+
queryBuilder = queryBuilder.From(query.ServiceName)
387+
}
388+
var conditions []opengemini.Condition
389+
if query.OperationName != "" {
390+
conditions = append(conditions, opengemini.NewComparisonCondition(SpanName, opengemini.Equals, query.OperationName))
391+
}
392+
if query.StartTimeMin != nil && query.StartTimeMin.AsTime().UnixNano() != 0 {
393+
duration := query.StartTimeMin.AsTime().UnixNano()
394+
conditions = append(conditions, opengemini.NewComparisonCondition(StartTimeUnixNano, opengemini.GreaterThan, duration))
395+
}
396+
if query.StartTimeMax != nil && query.StartTimeMax.AsTime().UnixNano() != 0 {
397+
duration := query.StartTimeMax.AsTime().UnixNano()
398+
conditions = append(conditions, opengemini.NewComparisonCondition(EndTimeUnixNano, opengemini.LessThan, duration))
399+
}
400+
if query.DurationMin != nil && query.DurationMin.AsDuration().Nanoseconds() != 0 {
401+
duration := query.DurationMin.AsDuration().Nanoseconds()
402+
conditions = append(conditions, opengemini.NewComparisonCondition(DurationUnixNano, opengemini.GreaterThan, duration))
403+
}
404+
if query.DurationMax != nil && query.DurationMax.AsDuration().Nanoseconds() != 0 {
405+
duration := query.DurationMax.AsDuration().Nanoseconds()
406+
conditions = append(conditions, opengemini.NewComparisonCondition(DurationUnixNano, opengemini.LessThan, duration))
407+
}
408+
if len(conditions) != 0 {
409+
queryBuilder = queryBuilder.Where(opengemini.NewCompositeCondition(opengemini.And, conditions...))
410+
}
411+
if query.SearchDepth != 0 {
412+
queryBuilder.Limit(int64(query.SearchDepth))
413+
}
414+
// 直接查询所有列,由代码侧抽取 trace_id 和时间窗口
415+
requestData := queryBuilder.Build()
416+
requestData.Database = o.config.Database
417+
resp, err := o.client.Query(*requestData)
418+
if err != nil {
419+
return nil, err
420+
}
421+
if resp.Error != "" {
422+
return nil, errors.New(resp.Error)
423+
}
424+
425+
result := &jaeger_storage.FindTraceIDsResponse{}
426+
if len(resp.Results) == 0 || resp.Results[0] == nil || resp.Results[0].Error != "" {
427+
return result, nil
428+
}
429+
seriesList := resp.Results[0].Series
430+
// 用 map 去重 trace_id,并记录最小 start / 最大 end
431+
type spanWindow struct{ start, end uint64 }
432+
windows := map[string]spanWindow{}
433+
for _, series := range seriesList {
434+
for _, columns := range series.Values {
435+
var tidHex string
436+
var start uint64
437+
var end uint64
438+
for idx, v := range columns {
439+
switch series.Columns[idx] {
440+
case TraceID:
441+
if v == nil {
442+
continue
443+
}
444+
tidHex = v.(string)
445+
case StartTimeUnixNano:
446+
if v != nil {
447+
start = uint64(v.(float64))
448+
}
449+
case EndTimeUnixNano:
450+
if v != nil {
451+
end = uint64(v.(float64))
452+
}
453+
}
454+
}
455+
if tidHex == "" {
456+
continue
457+
}
458+
w := windows[tidHex]
459+
if w.start == 0 || start < w.start {
460+
w.start = start
461+
}
462+
if end > w.end {
463+
w.end = end
464+
}
465+
windows[tidHex] = w
466+
}
467+
}
468+
for tidHex, w := range windows {
469+
idBytes, _ := hex.DecodeString(tidHex)
470+
fi := &jaeger_storage.FoundTraceID{TraceId: idBytes}
471+
if w.start != 0 {
472+
fi.Start = timestamppb.New(time.Unix(0, int64(w.start)))
473+
}
474+
if w.end != 0 {
475+
fi.End = timestamppb.New(time.Unix(0, int64(w.end)))
476+
}
477+
result.TraceIds = append(result.TraceIds, fi)
478+
}
479+
return result, nil
222480
}

0 commit comments

Comments
 (0)