@@ -7,10 +7,11 @@ import (
77
88 "github.com/go-kit/kit/log"
99 "github.com/go-kit/kit/log/level"
10+ "golang.org/x/time/rate"
11+
1012 monitor "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/monitor/v20180724"
1113 "github.com/tencentyun/tencentcloud-exporter/pkg/client"
1214 "github.com/tencentyun/tencentcloud-exporter/pkg/config"
13- "golang.org/x/time/rate"
1415)
1516
1617var (
@@ -33,7 +34,10 @@ type TcmMetricRepositoryImpl struct {
3334 monitorClient * monitor.Client
3435 limiter * rate.Limiter // 限速
3536 ctx context.Context
36- logger log.Logger
37+
38+ queryMetricBatchSize int
39+
40+ logger log.Logger
3741}
3842
3943func (repo * TcmMetricRepositoryImpl ) GetMeta (namespace string , name string ) (meta * TcmMeta , err error ) {
@@ -136,83 +140,111 @@ func (repo *TcmMetricRepositoryImpl) GetSamples(s *TcmSeries, st int64, et int64
136140 return
137141}
138142
139- func (repo * TcmMetricRepositoryImpl ) ListSamples (m * TcmMetric , st int64 , et int64 ) (samplesList []* TcmSamples , err error ) {
140- for _ , seriesList := range m . GetSeriesSplitByBatch ( 10 ) {
141- ctx , cancel := context . WithCancel (repo .ctx )
142- err = repo .limiter . Wait ( ctx )
143+ func (repo * TcmMetricRepositoryImpl ) ListSamples (m * TcmMetric , st int64 , et int64 ) ([]* TcmSamples , error ) {
144+ var samplesList [] * TcmSamples
145+ for _ , seriesList := range m . GetSeriesSplitByBatch (repo .queryMetricBatchSize ) {
146+ sl , err : = repo .listSampleByBatch ( m , seriesList , st , et )
143147 if err != nil {
144- return
148+ level .Error (repo .logger ).Log ("msg" , err .Error ())
149+ continue
145150 }
151+ samplesList = append (samplesList , sl ... )
152+ }
153+ return samplesList , nil
154+ }
146155
147- request := monitor .NewGetMonitorDataRequest ()
148- request .Namespace = & m .Meta .Namespace
149- request .MetricName = & m .Meta .MetricName
150-
151- period := uint64 (m .Conf .StatPeriodSeconds )
152- request .Period = & period
153-
154- for _ , series := range seriesList {
155- ifilters := & monitor.Instance {
156- Dimensions : []* monitor.Dimension {},
157- }
158- for k , v := range series .QueryLabels {
159- tk := k
160- tv := v
161- ifilters .Dimensions = append (ifilters .Dimensions , & monitor.Dimension {Name : & tk , Value : & tv })
162- }
163- request .Instances = append (request .Instances , ifilters )
164- }
156+ func (repo * TcmMetricRepositoryImpl ) listSampleByBatch (
157+ m * TcmMetric ,
158+ seriesList []* TcmSeries ,
159+ st int64 ,
160+ et int64 ,
161+ ) ([]* TcmSamples , error ) {
162+ var samplesList []* TcmSamples
165163
166- stStr := time .Unix (st , 0 ).Format (timeStampFormat )
167- request .StartTime = & stStr
168- if et != 0 {
169- etStr := time .Unix (et , 0 ).Format (timeStampFormat )
170- request .StartTime = & etStr
171- }
164+ ctx , cancel := context .WithCancel (repo .ctx )
165+ defer cancel ()
172166
173- response , err := repo .monitorClient .GetMonitorData (request )
174- if err != nil {
175- return nil , err
167+ err := repo .limiter .Wait (ctx )
168+ if err != nil {
169+ return nil , err
170+ }
171+
172+ request := repo .buildGetMonitorDataRequest (m , seriesList , st , et )
173+ response , err := repo .monitorClient .GetMonitorData (request )
174+ if err != nil {
175+ return nil , err
176+ }
177+
178+ for _ , points := range response .Response .DataPoints {
179+ samples , ql , e := repo .buildSamples (m , points )
180+ if e != nil {
181+ level .Debug (repo .logger ).Log (
182+ "msg" , e .Error (),
183+ "metric" , m .Meta .MetricName ,
184+ "dimension" , fmt .Sprintf ("%v" , ql ))
185+ continue
176186 }
187+ samplesList = append (samplesList , samples )
188+ }
189+ return samplesList , nil
190+ }
177191
178- for _ , points := range response .Response .DataPoints {
179- ql := map [string ]string {}
180- for _ , dimension := range points .Dimensions {
181- if * dimension .Value != "" {
182- ql [* dimension .Name ] = * dimension .Value
183- }
184- }
185- sid , e := GetTcmSeriesId (m , ql )
186- if e != nil {
187- level .Warn (repo .logger ).Log (
188- "msg" , "Get series id fail" ,
189- "metric" , m .Meta .MetricName ,
190- "dimension" , fmt .Sprintf ("%v" , ql ))
191- continue
192- }
193- s , ok := m .Series [sid ]
194- if ! ok {
195- level .Warn (repo .logger ).Log (
196- "msg" , "Response data point not match series" ,
197- "metric" , m .Meta .MetricName ,
198- "dimension" , fmt .Sprintf ("%v" , ql ))
199- continue
200- }
201- samples , e := NewTcmSamples (s , points )
202- if e != nil {
203- level .Debug (repo .logger ).Log (
204- "msg" , "The instance has no metric data and may not have traffic" ,
205- "metric" , m .Meta .MetricName ,
206- "dimension" , fmt .Sprintf ("%v" , ql ))
207- } else {
208- samplesList = append (samplesList , samples )
209- }
192+ func (repo * TcmMetricRepositoryImpl ) buildGetMonitorDataRequest (
193+ m * TcmMetric ,
194+ seriesList []* TcmSeries ,
195+ st int64 , et int64 ,
196+ ) * monitor.GetMonitorDataRequest {
197+ request := monitor .NewGetMonitorDataRequest ()
198+ request .Namespace = & m .Meta .Namespace
199+ request .MetricName = & m .Meta .MetricName
200+
201+ period := uint64 (m .Conf .StatPeriodSeconds )
202+ request .Period = & period
210203
204+ for _ , series := range seriesList {
205+ ifilters := & monitor.Instance {
206+ Dimensions : []* monitor.Dimension {},
211207 }
208+ for k , v := range series .QueryLabels {
209+ tk := k
210+ tv := v
211+ ifilters .Dimensions = append (ifilters .Dimensions , & monitor.Dimension {Name : & tk , Value : & tv })
212+ }
213+ request .Instances = append (request .Instances , ifilters )
214+ }
215+
216+ stStr := time .Unix (st , 0 ).Format (timeStampFormat )
217+ request .StartTime = & stStr
218+ if et != 0 {
219+ etStr := time .Unix (et , 0 ).Format (timeStampFormat )
220+ request .StartTime = & etStr
221+ }
222+ return request
223+ }
212224
213- cancel ()
225+ func (repo * TcmMetricRepositoryImpl ) buildSamples (
226+ m * TcmMetric ,
227+ points * monitor.DataPoint ,
228+ ) (* TcmSamples , map [string ]string , error ) {
229+ ql := map [string ]string {}
230+ for _ , dimension := range points .Dimensions {
231+ if * dimension .Value != "" {
232+ ql [* dimension .Name ] = * dimension .Value
233+ }
214234 }
215- return
235+ sid , e := GetTcmSeriesId (m , ql )
236+ if e != nil {
237+ return nil , ql , fmt .Errorf ("get series id fail" )
238+ }
239+ s , ok := m .Series [sid ]
240+ if ! ok {
241+ return nil , ql , fmt .Errorf ("response data point not match series" )
242+ }
243+ samples , e := NewTcmSamples (s , points )
244+ if e != nil {
245+ return nil , ql , fmt .Errorf ("this instance may not have metric data" )
246+ }
247+ return samples , ql , nil
216248}
217249
218250func NewTcmMetricRepository (conf * config.TencentConfig , logger log.Logger ) (repo TcmMetricRepository , err error ) {
@@ -222,10 +254,11 @@ func NewTcmMetricRepository(conf *config.TencentConfig, logger log.Logger) (repo
222254 }
223255
224256 repo = & TcmMetricRepositoryImpl {
225- monitorClient : monitorClient ,
226- limiter : rate .NewLimiter (rate .Limit (conf .RateLimit ), 1 ),
227- ctx : context .Background (),
228- logger : logger ,
257+ monitorClient : monitorClient ,
258+ limiter : rate .NewLimiter (rate .Limit (conf .RateLimit ), 1 ),
259+ ctx : context .Background (),
260+ queryMetricBatchSize : conf .MetricQueryBatchSize ,
261+ logger : logger ,
229262 }
230263
231264 return
0 commit comments