@@ -20,10 +20,13 @@ package output
2020import (
2121 "C"
2222 "encoding/binary"
23+ "fmt"
2324 "reflect"
25+ "strings"
2426 "time"
2527 "unsafe"
2628
29+ "github.com/mitchellh/mapstructure"
2730 "github.com/ugorji/go/codec"
2831)
2932
@@ -55,6 +58,116 @@ func (f FLBTime) UpdateExt(dest interface{}, v interface{}) {
5558 panic ("unsupported" )
5659}
5760
61+ type AggregationType int64
62+
63+ const (
64+ UNSPECIFIED AggregationType = 0
65+ DELTA AggregationType = 1
66+ CUMMULATIVE AggregationType = 2
67+ )
68+
69+ func (at AggregationType ) String () string {
70+ switch at {
71+ case UNSPECIFIED :
72+ return "unspecified"
73+ case DELTA :
74+ return "delta"
75+ case CUMMULATIVE :
76+ return "cumulative"
77+ default :
78+ return ""
79+ }
80+ }
81+
82+ type MetricType int64
83+
84+ const (
85+ COUNTER MetricType = 0
86+ GAUGE MetricType = 1
87+ HISTOGRAM MetricType = 2
88+ SUMMARY MetricType = 3
89+ UNTYPED MetricType = 4
90+ )
91+
92+ func (mt MetricType ) String () string {
93+ switch mt {
94+ case COUNTER :
95+ return "counter"
96+ case GAUGE :
97+ return "gauge"
98+ case HISTOGRAM :
99+ return "histogram"
100+ case SUMMARY :
101+ return "summary"
102+ case UNTYPED :
103+ return "untyped"
104+ default :
105+ return ""
106+ }
107+ }
108+
109+ type CMetrics struct {
110+ Meta struct {
111+ Cmetrics map [string ]interface {} `mapstructure:"cmetrics"`
112+ External map [string ]interface {} `mapstructure:"external"`
113+ Processing struct {
114+ StaticLabels []interface {} `mapstructure:"static_labels"`
115+ } `mapstructure:"processing"`
116+ } `mapstructure:"meta"`
117+ Metrics []struct {
118+ Meta struct {
119+ AggregationType AggregationType `mapstructure:"aggregation_type"`
120+ Labels []string `mapstructure:"labels"`
121+ /* Formatted full qualified metric name is: namespace_subsystem_name */
122+ Opts struct {
123+ Desc string `mapstructure:"desc"`
124+ Name string `mapstructure:"name"`
125+ Namespace string `mapstructure:"ns"`
126+ Subsystem string `mapstructure:"ss"`
127+ } `mapstructure:"opts"`
128+ Type MetricType `mapstructure:"type"`
129+ Ver int `mapstructure:"ver"`
130+ } `mapstructure:"meta"`
131+ Values []struct {
132+ Hash int64 `mapstructure:"hash"`
133+ Labels []string `mapstructure:"labels"`
134+ Ts int64 `mapstructure:"ts"`
135+ Value float64 `mapstructure:"value"`
136+ } `mapstructure:"values"`
137+ } `mapstructure:"metrics"`
138+ }
139+
140+ // Use Prometheus text format when printing CMetrics
141+ func (cm CMetrics ) String () string {
142+ var ret strings.Builder
143+
144+ for _ , metric := range cm .Metrics {
145+ fullMetricName := fmt .Sprintf ("%s_%s_%s" , metric .Meta .Opts .Namespace , metric .Meta .Opts .Subsystem , metric .Meta .Opts .Name )
146+ ret .WriteString (fmt .Sprintf ("# HELP %s %s\n " , fullMetricName , metric .Meta .Opts .Desc ))
147+ ret .WriteString (fmt .Sprintf ("# TYPE %s %s\n " , fullMetricName , metric .Meta .Type ))
148+
149+ for _ , value := range metric .Values {
150+ ret .WriteString (fmt .Sprintf ("%s{" , fullMetricName ))
151+ for i , labelName := range metric .Meta .Labels {
152+ ret .WriteString (fmt .Sprintf ("%s=%s" , labelName , value .Labels [i ]))
153+ if i < len (metric .Meta .Labels )- 1 {
154+ ret .WriteString ("," )
155+ }
156+ }
157+ ret .WriteString (fmt .Sprintf ("} %.0f\n " , value .Value ))
158+ }
159+ }
160+
161+ return ret .String ()
162+ }
163+
164+ // ConvertRecordToCMetrics converts the data returned by GetRecord() to a CMetrics struct
165+ func ConvertRecordToCMetrics (record map [interface {}]interface {}) (cMetrics CMetrics ) {
166+ var result CMetrics
167+ mapstructure .WeakDecode (record , & result )
168+ return result
169+ }
170+
58171func NewDecoder (data unsafe.Pointer , length int ) * FLBDecoder {
59172 var b []byte
60173
@@ -77,33 +190,42 @@ func GetRecord(dec *FLBDecoder) (ret int, ts interface{}, rec map[interface{}]in
77190 return - 1 , 0 , nil
78191 }
79192
80- slice := reflect .ValueOf (m )
81- if slice . Kind () != reflect . Slice || slice .Len () != 2 {
193+ val := reflect .ValueOf (m )
194+ if val .Len () != 2 {
82195 return - 2 , 0 , nil
83196 }
84197
85- var t interface {}
86- ts = slice .Index (0 ).Interface ()
87- switch ty := ts .(type ) {
88- case FLBTime :
89- t = ty
90- case uint64 :
91- t = ty
92- case []interface {}: // for Fluent Bit V2 metadata type of format
93- s := reflect .ValueOf (ty )
94- if s .Kind () != reflect .Slice || s .Len () < 2 {
95- return - 4 , 0 , nil
198+ switch val .Kind () {
199+ case reflect .Map : // Metrics
200+ map_data := val .Interface ().(map [interface {}]interface {})
201+ return 0 , 0 , map_data
202+ case reflect .Slice : // Logs
203+ var t interface {}
204+ ts = val .Index (0 ).Interface ()
205+ switch ty := ts .(type ) {
206+ case FLBTime :
207+ t = ty
208+ case uint64 :
209+ t = ty
210+ case []interface {}: // for Fluent Bit V2 metadata type of format
211+ s := reflect .ValueOf (ty )
212+ if s .Kind () != reflect .Slice || s .Len () < 2 {
213+ return - 4 , 0 , nil
214+ }
215+ t = s .Index (0 ).Interface ()
216+ default :
217+ return - 5 , 0 , nil
96218 }
97- t = s .Index (0 ).Interface ()
98- default :
99- return - 5 , 0 , nil
100- }
101- data := slice .Index (1 )
219+ data := val .Index (1 )
102220
103- map_data , ok := data .Interface ().(map [interface {}]interface {})
104- if ! ok {
105- return - 3 , 0 , nil
106- }
221+ map_data , ok := data .Interface ().(map [interface {}]interface {})
222+ if ! ok {
223+ return - 3 , 0 , nil
224+ }
107225
108- return 0 , t , map_data
226+ return 0 , t , map_data
227+
228+ default : // if the interface is not a map or slice
229+ return - 2 , 0 , nil
230+ }
109231}
0 commit comments