Skip to content

Commit 7af2a45

Browse files
authored
[TT-100053]: aggregate graph aggregate records by api_id (#725)
* aggregate graph iaggregate records by api_id * fixed tests * fix dimensions test * fixed sharded table * fix goang ci lint
1 parent 092d669 commit 7af2a45

File tree

3 files changed

+40
-36
lines changed

3 files changed

+40
-36
lines changed

analytics/aggregate.go

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,20 @@ type SQLAnalyticsRecordAggregate struct {
147147
Code `json:"code" gorm:"embedded"`
148148
}
149149

150+
type GraphSQLAnalyticsRecordAggregate struct {
151+
ID string `gorm:"primaryKey"`
152+
153+
OrgID string `json:"org_id"`
154+
Dimension string `json:"dimension"`
155+
DimensionValue string `json:"dimension_value"`
156+
APIID string `json:"api_id"`
157+
158+
Counter `json:"counter" gorm:"embedded"`
159+
Code `json:"code" gorm:"embedded"`
160+
161+
TimeStamp int64 `json:"timestamp"`
162+
}
163+
150164
type Code struct {
151165
Code1x int `json:"1x" gorm:"1x"`
152166
Code200 int `json:"200" gorm:"200"`
@@ -611,6 +625,7 @@ func replaceUnsupportedChars(path string) string {
611625
return result
612626
}
613627

628+
// AggregateGraphData collects the graph records into a map of GraphRecordAggregate to apiID
614629
func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime int) map[string]GraphRecordAggregate {
615630
aggregateMap := make(map[string]GraphRecordAggregate)
616631

@@ -619,14 +634,13 @@ func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime
619634
if !ok {
620635
continue
621636
}
622-
623637
if !record.IsGraphRecord() {
624638
continue
625639
}
626640

627641
graphRec := record.ToGraphRecord()
628642

629-
aggregate, found := aggregateMap[record.OrgID]
643+
aggregate, found := aggregateMap[record.APIID]
630644
if !found {
631645
aggregate = NewGraphRecordAggregate()
632646

@@ -676,7 +690,7 @@ func AggregateGraphData(data []interface{}, dbIdentifier string, aggregationTime
676690
aggregate.RootFields[field].Identifier = field
677691
aggregate.RootFields[field].HumanIdentifier = field
678692
}
679-
aggregateMap[record.OrgID] = aggregate
693+
aggregateMap[record.APIID] = aggregate
680694
}
681695
return aggregateMap
682696
}

analytics/aggregate_test.go

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,8 @@ func TestTrimTag(t *testing.T) {
9090
}
9191

9292
func TestAggregateGraphData(t *testing.T) {
93+
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
94+
rawResponse := `{"data":{"characters":{"info":{"count":758}}}}`
9395
sampleRecord := AnalyticsRecord{
9496
TimeStamp: time.Date(2022, 1, 1, 0, 0, 0, 0, time.UTC),
9597
Method: "POST",
@@ -109,6 +111,8 @@ func TestAggregateGraphData(t *testing.T) {
109111
APIKey: "test-key",
110112
TrackPath: true,
111113
OauthID: "test-id",
114+
RawRequest: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query))),
115+
RawResponse: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(rawResponse), rawResponse))),
112116
}
113117

114118
compareFields := func(r *require.Assertions, expected, actual map[string]*Counter) {
@@ -133,16 +137,12 @@ func TestAggregateGraphData(t *testing.T) {
133137
records := make([]interface{}, 3)
134138
for i := range records {
135139
record := sampleRecord
136-
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
137-
response := `{"data":{"characters":{"info":{"count":758}}}}`
138-
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
139-
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
140140
records[i] = record
141141
}
142142
return records
143143
},
144144
expectedAggregate: map[string]GraphRecordAggregate{
145-
"test-org": {
145+
"test-api": {
146146
Types: map[string]*Counter{
147147
"Characters": {Hits: 3, ErrorTotal: 0, Success: 3},
148148
"Info": {Hits: 3, ErrorTotal: 0, Success: 3},
@@ -163,10 +163,6 @@ func TestAggregateGraphData(t *testing.T) {
163163
records := make([]interface{}, 3)
164164
for i := range records {
165165
record := sampleRecord
166-
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
167-
response := `{"data":{"characters":{"info":{"count":758}}}}`
168-
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
169-
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
170166
if i == 1 {
171167
record.Tags = []string{}
172168
}
@@ -175,7 +171,7 @@ func TestAggregateGraphData(t *testing.T) {
175171
return records
176172
},
177173
expectedAggregate: map[string]GraphRecordAggregate{
178-
"test-org": {
174+
"test-api": {
179175
Types: map[string]*Counter{
180176
"Characters": {Hits: 2, ErrorTotal: 0, Success: 2},
181177
"Info": {Hits: 2, ErrorTotal: 0, Success: 2},
@@ -196,19 +192,16 @@ func TestAggregateGraphData(t *testing.T) {
196192
records := make([]interface{}, 3)
197193
for i := range records {
198194
record := sampleRecord
199-
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
200-
response := `{"data":{"characters":{"info":{"count":758}}}}`
201195
if i == 1 {
202-
response = graphErrorResponse
196+
response := graphErrorResponse
197+
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
203198
}
204-
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
205-
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
206199
records[i] = record
207200
}
208201
return records
209202
},
210203
expectedAggregate: map[string]GraphRecordAggregate{
211-
"test-org": {
204+
"test-api": {
212205
Types: map[string]*Counter{
213206
"Characters": {Hits: 3, ErrorTotal: 1, Success: 2},
214207
"Info": {Hits: 3, ErrorTotal: 1, Success: 2},
@@ -229,10 +222,6 @@ func TestAggregateGraphData(t *testing.T) {
229222
records := make([]interface{}, 5)
230223
for i := range records {
231224
record := sampleRecord
232-
query := `{"query":"query{\n characters(filter: {\n \n }){\n info{\n count\n }\n }\n}"}`
233-
response := `{"data":{"characters":{"info":{"count":758}}}}`
234-
record.RawRequest = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(requestTemplate, len(query), query)))
235-
record.RawResponse = base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(responseTemplate, len(response), response)))
236225
if i == 2 || i == 4 {
237226
record.ResponseCode = 500
238227
}
@@ -241,7 +230,7 @@ func TestAggregateGraphData(t *testing.T) {
241230
return records
242231
},
243232
expectedAggregate: map[string]GraphRecordAggregate{
244-
"test-org": {
233+
"test-api": {
245234
Types: map[string]*Counter{
246235
"Characters": {Hits: 5, ErrorTotal: 2, Success: 3},
247236
"Info": {Hits: 5, ErrorTotal: 2, Success: 3},
@@ -325,7 +314,7 @@ func TestAggregateGraphData_Dimension(t *testing.T) {
325314
r := require.New(t)
326315
aggregated := AggregateGraphData(records, "", 1)
327316
r.Len(aggregated, 1)
328-
aggre := aggregated["test-org"]
317+
aggre := aggregated["test-api"]
329318
dimensions := aggre.Dimensions()
330319
fmt.Println(dimensions)
331320
for d, values := range responsesCheck {
@@ -337,7 +326,7 @@ func TestAggregateGraphData_Dimension(t *testing.T) {
337326
}
338327
}
339328
if !found {
340-
t.Errorf("item missing from dimensions: NameL %s, Value: %s, Hits:3", d, v)
329+
t.Errorf("item missing from dimensions: Name: %s, Value: %s, Hits:3", d, v)
341330
}
342331
}
343332
}

pumps/graph_sql_aggregate.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func (s *GraphSQLAggregatePump) Init(conf interface{}) error {
6868
}
6969
s.db = db
7070
if !s.SQLConf.TableSharding {
71-
if err := s.db.Table(analytics.AggregateGraphSQLTable).AutoMigrate(&analytics.SQLAnalyticsRecordAggregate{}); err != nil {
71+
if err := s.db.Table(analytics.AggregateGraphSQLTable).AutoMigrate(&analytics.GraphSQLAnalyticsRecordAggregate{}); err != nil {
7272
s.log.WithError(err).Warn("error migrating table")
7373
}
7474
}
@@ -115,7 +115,7 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{
115115
table = analytics.AggregateGraphSQLTable + "_" + recDate
116116
s.db = s.db.Table(table)
117117
if !s.db.Migrator().HasTable(table) {
118-
if err := s.db.AutoMigrate(&analytics.SQLAnalyticsRecordAggregate{}); err != nil {
118+
if err := s.db.AutoMigrate(&analytics.GraphSQLAnalyticsRecordAggregate{}); err != nil {
119119
s.log.WithError(err).Warn("error running auto migration")
120120
}
121121
}
@@ -132,11 +132,11 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{
132132
aggregationTime = 60
133133
}
134134

135-
analyticsPerOrg := analytics.AggregateGraphData(data[startIndex:endIndex], "", aggregationTime)
135+
analyticsPerAPI := analytics.AggregateGraphData(data[startIndex:endIndex], "", aggregationTime)
136136

137-
for orgID := range analyticsPerOrg {
138-
ag := analyticsPerOrg[orgID]
139-
err := s.DoAggregatedWriting(ctx, table, orgID, &ag)
137+
for apiID := range analyticsPerAPI {
138+
ag := analyticsPerAPI[apiID]
139+
err := s.DoAggregatedWriting(ctx, table, ag.OrgID, apiID, &ag)
140140
if err != nil {
141141
s.log.WithError(err).Error("error writing record")
142142
return err
@@ -150,14 +150,15 @@ func (s *GraphSQLAggregatePump) WriteData(ctx context.Context, data []interface{
150150
return nil
151151
}
152152

153-
func (s *GraphSQLAggregatePump) DoAggregatedWriting(ctx context.Context, table, orgID string, ag *analytics.GraphRecordAggregate) error {
154-
recs := []analytics.SQLAnalyticsRecordAggregate{}
153+
func (s *GraphSQLAggregatePump) DoAggregatedWriting(ctx context.Context, table, orgID, apiID string, ag *analytics.GraphRecordAggregate) error {
154+
var recs []analytics.GraphSQLAnalyticsRecordAggregate
155155

156156
dimensions := ag.Dimensions()
157157
for _, d := range dimensions {
158-
rec := analytics.SQLAnalyticsRecordAggregate{
159-
ID: hex.EncodeToString([]byte(fmt.Sprintf("%v", ag.TimeStamp.Unix()) + orgID + d.Name + d.Value)),
158+
rec := analytics.GraphSQLAnalyticsRecordAggregate{
159+
ID: hex.EncodeToString([]byte(fmt.Sprintf("%v", ag.TimeStamp.Unix()) + apiID + d.Name + d.Value)),
160160
OrgID: orgID,
161+
APIID: apiID,
161162
TimeStamp: ag.TimeStamp.Unix(),
162163
Counter: *d.Counter,
163164
Dimension: d.Name,

0 commit comments

Comments
 (0)