-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgrowth.go
More file actions
179 lines (159 loc) · 4.59 KB
/
growth.go
File metadata and controls
179 lines (159 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
package esquery
import (
"encoding/json"
"math"
"time"
)
// BucketValue 桶聚合的值
type BucketValue struct {
Value float64 `json:"value"`
}
// DateBucket 日期桶聚合的数据
type DateBucket struct {
KeyAsString string `json:"key_as_string"`
Key int64 `json:"key"`
DocCount int `json:"doc_count"`
DocCountGrowth float64 `json:"doc_count_growth"` // 文档数量的增长率
AggsValue map[string]float64 `json:"aggs_value,omitempty"` // 动态聚合字段名与值
AggsGrowth map[string]float64 `json:"aggs_growth,omitempty"` // 动态聚合字段名与增长率
}
// 解析单个 date_histogram bucket 并提取所有子聚合的数值
func parseDateBucket(data json.RawMessage) (*DateBucket, error) {
var base struct {
KeyAsString string `json:"key_as_string"`
Key int64 `json:"key"`
DocCount int `json:"doc_count"`
}
if err := json.Unmarshal(data, &base); err != nil {
return &DateBucket{}, err
}
var raw map[string]json.RawMessage
if err := json.Unmarshal(data, &raw); err != nil {
return &DateBucket{}, err
}
aggsValue := make(map[string]float64)
for key, val := range raw {
if key == "key" || key == "key_as_string" || key == "doc_count" {
continue
}
var v BucketValue
if err := json.Unmarshal(val, &v); err == nil {
aggsValue[key] = v.Value
}
}
return &DateBucket{
KeyAsString: base.KeyAsString,
Key: base.Key,
DocCount: base.DocCount,
AggsValue: aggsValue,
}, nil
}
// getDateHistBuckets 从聚合结果提取结构体
func getDateHistBuckets(aggs map[string]json.RawMessage) map[string][]*DateBucket {
// 提取值
buckets := map[string][]*DateBucket{}
for key, aggData := range aggs {
aggs := map[string][]json.RawMessage{}
json.Unmarshal(aggData, &aggs)
for akey, avalue := range aggs {
if akey == "buckets" {
for _, bdata := range avalue {
db, _ := parseDateBucket(bdata)
buckets[key] = append(buckets[key], db)
}
}
}
}
return buckets
}
// AggsDateHistGrowth 计算聚合增长率(count)
func AggsDateHistGrowth(aggs map[string]json.RawMessage) map[string][]*DateBucket {
// 提取值
buckets := getDateHistBuckets(aggs)
// 计算增长率
for _, dbs := range buckets {
preCnt := 0
for _, value := range dbs {
value.DocCountGrowth = CntGrowth(preCnt, value.DocCount)
preCnt = value.DocCount
}
}
return buckets
}
// AggsDateHistStatsGrowth 计算聚合环比增长率
func AggsDateHistStatsGrowth(aggs map[string]json.RawMessage) map[string][]*DateBucket {
// 提取值
buckets := getDateHistBuckets(aggs)
// 计算增长率
for _, dbs := range buckets {
var preCnt int
var preAggs map[string]float64
for _, value := range dbs {
value.DocCountGrowth = CntGrowth(preCnt, value.DocCount)
value.AggsGrowth = CalcGrowth(preAggs, value.AggsValue)
preCnt = value.DocCount
preAggs = value.AggsValue
}
}
return buckets
}
// AggsYoYHistStatsGrowth 计算聚合同比增长率
func AggsYoYHistStatsGrowth(aggs map[string]json.RawMessage) map[string][]*DateBucket {
// 提取值
buckets := getDateHistBuckets(aggs)
// 计算增长率
for _, dbs := range buckets {
// 转成map结果方便查询和提速
mapBuckets := map[int64]*DateBucket{}
for _, db := range dbs {
mapBuckets[db.Key] = db
}
for _, value := range dbs {
preCnt, preAggs := getPreCntAndAggs(mapBuckets, value)
value.DocCountGrowth = CntGrowth(preCnt, value.DocCount)
value.AggsGrowth = CalcGrowth(preAggs, value.AggsValue)
}
}
return buckets
}
// 获取前一个时间的分桶数据
func getPreCntAndAggs(mapBuckets map[int64]*DateBucket, db *DateBucket) (int, map[string]float64) {
var preCnt int
var preAggs map[string]float64
curTime := time.UnixMilli(db.Key)
preTime := curTime.AddDate(-1, 0, 0)
pre := mapBuckets[preTime.UnixMilli()]
if pre != nil {
preCnt = pre.DocCount
preAggs = pre.AggsValue
}
return preCnt, preAggs
}
// CalcGrowth 计算相邻bucket的增长率
func CalcGrowth(pre, suf map[string]float64) map[string]float64 {
growth := map[string]float64{}
// 特殊第一个bucket,增长率都为0
if len(pre) == 0 {
for k := range suf {
growth[k] = 0
}
return growth
}
// 相邻两个bucket计算增长率
for k, sufValue := range suf {
preValue := pre[k]
if preValue == 0 {
growth[k] = 0
} else {
growth[k] = math.Round(10000*(sufValue-preValue)/preValue) / 10000
}
}
return growth
}
// CntGrowth 计算相邻bucket的数量增长率
func CntGrowth(pre, suf int) float64 {
if pre == 0 {
return 0
}
return math.Round(10000*float64(suf-pre)/float64(pre)) / 10000
}