Skip to content

Commit c015204

Browse files
committed
Fix retry_file statistic, now belongs to client
1 parent 12e3742 commit c015204

File tree

5 files changed

+74
-71
lines changed

5 files changed

+74
-71
lines changed

client.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (c Client) saveSliceToRetry(metrics []string, backend string) error {
5050
f, err := os.OpenFile(retFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
5151
if err != nil {
5252
c.Lc.lg.Println(err)
53-
c.Mon.Increase(&c.Mon.stat[backend].dropped, len(metrics))
53+
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, len(metrics))
5454
return err
5555
}
5656
defer f.Close()
@@ -64,7 +64,7 @@ func (c Client) saveSliceToRetry(metrics []string, backend string) error {
6464
}
6565
}
6666
if dropped > 0 {
67-
c.Mon.Increase(&c.Mon.stat[backend].dropped, dropped)
67+
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, dropped)
6868
}
6969
return c.removeOldDataFromRetryFile(backend)
7070
}
@@ -103,10 +103,10 @@ func (c Client) saveChannelToRetry(ch chan string, size int, backend string) {
103103
}
104104
}
105105
if dropped > 0 {
106-
c.Mon.Increase(&c.Mon.stat[backend].dropped, dropped)
106+
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, dropped)
107107
}
108108
if saved > 0 {
109-
c.Mon.Increase(&c.Mon.stat[backend].saved, saved)
109+
c.Mon.Increase(&c.Mon.clientStat[backend].saved, saved)
110110
}
111111
c.removeOldDataFromRetryFile(backend)
112112
}
@@ -137,7 +137,7 @@ func (c *Client) tryToSendToGraphite(metric string, conn net.Conn) error {
137137
return err
138138
}
139139
backend := conn.RemoteAddr().String()
140-
c.Mon.Increase(&c.Mon.stat[backend].sent, 1)
140+
c.Mon.Increase(&c.Mon.clientStat[backend].sent, 1)
141141
return nil
142142
}
143143

@@ -190,7 +190,7 @@ func (c Client) runBackend(backend string) {
190190
connectionFailed = true
191191
break
192192
} else {
193-
c.Mon.Increase(&c.Mon.got.retry, 1)
193+
c.Mon.Increase(&c.Mon.clientStat[backend].fromRetry, 1)
194194
}
195195

196196
} else {
@@ -277,7 +277,7 @@ func (c Client) Run() {
277277
select {
278278
case c.mainChannels[backend] <- metric:
279279
default:
280-
c.Mon.Increase(&c.Mon.stat[backend].dropped, 1)
280+
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, 1)
281281
}
282282
}
283283
}
@@ -289,7 +289,7 @@ func (c Client) Run() {
289289
select {
290290
case c.monChannels[backend] <- metric:
291291
default:
292-
c.Mon.Increase(&c.Mon.stat[backend].dropped, 1)
292+
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, 1)
293293
}
294294
}
295295
}

config.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ func (conf *Config) GenerateLocalConfig() (*LocalConfig, error) {
258258
}
259259

260260
// There are 3 metrics per backend
261-
MonitorMetrics := 4 + len(carbonAddrsTCP)*3
261+
MonitorMetrics := 3 + len(carbonAddrsTCP)*4
262262

263263
return &LocalConfig{
264264
hostname: hostname,

grafsy_internal_test.go

Lines changed: 28 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -12,33 +12,34 @@ import (
1212
var cleanMonitoring = &Monitoring{
1313
Conf: conf,
1414
Lc: lc,
15-
got: source{
16-
retry: 0,
17-
dir: 0,
18-
net: 0,
15+
serverStat: serverStat{
16+
dir: 0,
17+
invalid: 0,
18+
net: 0,
1919
},
20-
stat: map[string]*stat{
21-
"127.0.0.1:2003": &stat{
20+
clientStat: map[string]*clientStat{
21+
"127.0.0.1:2003": &clientStat{
2222
saved: 0,
2323
sent: 0,
2424
dropped: 0,
2525
},
26-
"127.0.0.1:2004": &stat{
26+
"127.0.0.1:2004": &clientStat{
2727
saved: 0,
2828
sent: 0,
2929
dropped: 0,
3030
},
3131
},
32-
invalid: 0,
3332
}
3433

3534
// These variables defined to prevent reading the config multiple times
3635
// and avoid code duplication
3736
var conf, lc, configError = getConfigs()
3837

3938
// There are 3 metrics per backend
40-
var MonitorMetrics = 4 + len(conf.CarbonAddrs)*3
4139
var mon, monError = generateMonitoringObject()
40+
var serverStatMetrics = reflect.ValueOf(mon.serverStat).NumField()
41+
var clientStatMetrics = reflect.TypeOf(mon.clientStat).Elem().Elem().NumField()
42+
var MonitorMetrics = serverStatMetrics + len(conf.CarbonAddrs)*clientStatMetrics
4243
var cli = Client{
4344
Conf: conf,
4445
Lc: lc,
@@ -56,29 +57,29 @@ var testMetrics = []string{
5657
}
5758

5859
func generateMonitoringObject() (*Monitoring, error) {
59-
s := source{
60-
net: 1,
61-
dir: 2,
62-
retry: 3,
63-
}
6460

6561
return &Monitoring{
6662
Conf: conf,
6763
Lc: lc,
68-
got: s,
69-
stat: map[string]*stat{
70-
"127.0.0.1:2003": &stat{
71-
dropped: 1,
72-
saved: 3,
73-
sent: 4,
64+
serverStat: serverStat{
65+
net: 1,
66+
invalid: 4,
67+
dir: 2,
68+
},
69+
clientStat: map[string]*clientStat{
70+
"127.0.0.1:2003": &clientStat{
71+
1,
72+
3,
73+
2,
74+
4,
7475
},
75-
"127.0.0.1:2004": &stat{
76-
dropped: 1,
77-
saved: 3,
78-
sent: 4,
76+
"127.0.0.1:2004": &clientStat{
77+
1,
78+
3,
79+
2,
80+
4,
7981
},
8082
},
81-
invalid: 2,
8283
}, nil
8384
}
8485

@@ -123,6 +124,7 @@ func TestMonitoring_generateOwnMonitoring(t *testing.T) {
123124

124125
mon.generateOwnMonitoring()
125126
if len(mon.Lc.monitoringChannel) != MonitorMetrics {
127+
t.Logf("Fix the amount of metrics for server stats to %v and client stats to %v\n", serverStatMetrics, clientStatMetrics)
126128
t.Errorf("Mismatch amount of the monitor metrics: expected=%v, gotten=%v", MonitorMetrics, len(mon.Lc.monitoringChannel))
127129
}
128130
}
@@ -209,7 +211,7 @@ func TestClient_tryToSendToGraphite(t *testing.T) {
209211
}
210212

211213
// Create monitoring structure for statistic
212-
cli.Mon.stat[conn.RemoteAddr().String()] = &stat{0, 0, 0}
214+
cli.Mon.clientStat[conn.RemoteAddr().String()] = &clientStat{0, 0, 0, 0}
213215

214216
for _, metric := range testMetrics {
215217
cli.tryToSendToGraphite(metric, conn)

monitoring.go

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,37 +18,37 @@ type Monitoring struct {
1818
Lc *LocalConfig
1919

2020
// Structure with amount of metrics from client.
21-
got source
21+
serverStat serverStat
2222

2323
// Statistic per carbon receiver
24-
stat map[string]*stat
24+
clientStat map[string]*clientStat
25+
}
26+
27+
// The source of metric daemon got.
28+
type serverStat struct {
29+
// Amount of metrics from directory.
30+
dir int
2531

2632
// Amount of invalid metrics.
2733
invalid int
28-
}
2934

30-
// The source of metric daemon got.
31-
type source struct {
3235
// Amount of metrics from network.
3336
net int
37+
}
3438

35-
// Amount of metrics from directory.
36-
dir int
39+
// The statistic of metrics per backend
40+
type clientStat struct {
41+
// Amount of dropped metrics.
42+
dropped int
3743

3844
// Amount of metrics from retry file.
39-
retry int
40-
}
45+
fromRetry int
4146

42-
// The statistic of metrics per backend
43-
type stat struct {
4447
// Amount of saved metrics.
4548
saved int
4649

4750
// Amount of sent metrics.
4851
sent int
49-
50-
// Amount of dropped metrics.
51-
dropped int
5252
}
5353

5454
var statLock sync.Mutex
@@ -61,18 +61,18 @@ func (m *Monitoring) generateOwnMonitoring() {
6161
statLock.Lock()
6262

6363
monitorSlice := []string{
64-
fmt.Sprintf("%s.got.net %v %v", path, m.got.net, now),
65-
fmt.Sprintf("%s.got.dir %v %v", path, m.got.dir, now),
66-
fmt.Sprintf("%s.got.retry %v %v", path, m.got.retry, now),
67-
fmt.Sprintf("%s.invalid %v %v", path, m.invalid, now),
64+
fmt.Sprintf("%s.got.net %v %v", path, m.serverStat.net, now),
65+
fmt.Sprintf("%s.got.dir %v %v", path, m.serverStat.dir, now),
66+
fmt.Sprintf("%s.invalid %v %v", path, m.serverStat.invalid, now),
6867
}
6968

7069
for _, carbonAddrTCP := range m.Lc.carbonAddrsTCP {
7170
backend := carbonAddrTCP.String()
7271
backendString := strings.Replace(backend, ".", "_", -1)
73-
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.saved %v %v", path, backendString, m.stat[backend].saved, now))
74-
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.sent %v %v", path, backendString, m.stat[backend].sent, now))
75-
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.dropped %v %v", path, backendString, m.stat[backend].dropped, now))
72+
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.dropped %v %v", path, backendString, m.clientStat[backend].dropped, now))
73+
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.from_retry %v %v", path, backendString, m.clientStat[backend].fromRetry, now))
74+
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.saved %v %v", path, backendString, m.clientStat[backend].saved, now))
75+
monitorSlice = append(monitorSlice, fmt.Sprintf("%s.%s.sent %v %v", path, backendString, m.clientStat[backend].sent, now))
7676
}
7777

7878
statLock.Unlock()
@@ -84,7 +84,7 @@ func (m *Monitoring) generateOwnMonitoring() {
8484
m.Lc.lg.Printf("Too many metrics in the MON queue! This is very bad")
8585
for _, carbonAddrTCP := range m.Lc.carbonAddrsTCP {
8686
backend := carbonAddrTCP.String()
87-
m.Increase(&m.stat[backend].dropped, 1)
87+
m.Increase(&m.clientStat[backend].dropped, 1)
8888
}
8989
}
9090
}
@@ -94,12 +94,12 @@ func (m *Monitoring) generateOwnMonitoring() {
9494
func (m *Monitoring) clean() {
9595
for _, carbonAddrTCP := range m.Lc.carbonAddrsTCP {
9696
backend := carbonAddrTCP.String()
97-
m.stat[backend].saved = 0
98-
m.stat[backend].sent = 0
99-
m.stat[backend].dropped = 0
97+
m.clientStat[backend].dropped = 0
98+
m.clientStat[backend].fromRetry = 0
99+
m.clientStat[backend].saved = 0
100+
m.clientStat[backend].sent = 0
100101
}
101-
m.invalid = 0
102-
m.got = source{0, 0, 0}
102+
m.serverStat = serverStat{0, 0, 0}
103103
}
104104

105105
// Increase metric value in the thread safe way
@@ -113,10 +113,11 @@ func (m *Monitoring) Increase(metric *int, value int) {
113113
// Should be run in separate goroutine.
114114
func (m *Monitoring) Run() {
115115
statLock.Lock()
116-
m.stat = make(map[string]*stat)
116+
m.clientStat = make(map[string]*clientStat)
117117
for _, carbonAddrTCP := range m.Lc.carbonAddrsTCP {
118118
backend := carbonAddrTCP.String()
119-
m.stat[backend] = &stat{
119+
m.clientStat[backend] = &clientStat{
120+
0,
120121
0,
121122
0,
122123
0,
@@ -128,8 +129,8 @@ func (m *Monitoring) Run() {
128129
statLock.Lock()
129130
for _, carbonAddrTCP := range m.Lc.carbonAddrsTCP {
130131
backend := carbonAddrTCP.String()
131-
if m.stat[backend].dropped != 0 {
132-
m.Lc.lg.Printf("Too many metrics in the main buffer of %s server. Had to drop incommings: %d", backend, m.stat[backend].dropped)
132+
if m.clientStat[backend].dropped != 0 {
133+
m.Lc.lg.Printf("Too many metrics in the main buffer of %s server. Had to drop incommings: %d", backend, m.clientStat[backend].dropped)
133134
}
134135
}
135136
m.clean()

server.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func (s Server) aggrMetricsWithPrefix() {
9494
if dropped > 0 {
9595
for _, carbonAddrTCP := range s.Lc.carbonAddrsTCP {
9696
backend := carbonAddrTCP.String()
97-
s.Mon.Increase(&s.Mon.stat[backend].dropped, dropped)
97+
s.Mon.Increase(&s.Mon.clientStat[backend].dropped, dropped)
9898
}
9999
}
100100
}
@@ -135,15 +135,15 @@ func (s Server) cleanAndUseIncomingData(metrics []string) {
135135
}
136136
} else {
137137
if metric != "" {
138-
s.Mon.Increase(&s.Mon.invalid, 1)
138+
s.Mon.Increase(&s.Mon.serverStat.invalid, 1)
139139
s.Lc.lg.Printf("Removing bad metric '%s' from the list", metric)
140140
}
141141
}
142142
}
143143
if dropped > 0 {
144144
for _, carbonAddrTCP := range s.Lc.carbonAddrsTCP {
145145
backend := carbonAddrTCP.String()
146-
s.Mon.Increase(&s.Mon.stat[backend].dropped, dropped)
146+
s.Mon.Increase(&s.Mon.clientStat[backend].dropped, dropped)
147147
}
148148
}
149149
}
@@ -153,7 +153,7 @@ func (s Server) handleRequest(conn net.Conn) {
153153
defer conn.Close()
154154
conBuf := bufio.NewReader(conn)
155155
for {
156-
s.Mon.Increase(&s.Mon.got.net, 1)
156+
s.Mon.Increase(&s.Mon.serverStat.net, 1)
157157
metric, err := conBuf.ReadString('\n')
158158
// Even if error occurred we still put "metric" into analysis, cause it can be a valid metric, but without \n
159159
s.cleanAndUseIncomingData([]string{strings.Replace(strings.Replace(metric, "\r", "", -1), "\n", "", -1)})
@@ -173,7 +173,7 @@ func (s Server) handleDirMetrics() {
173173
}
174174
for _, f := range files {
175175
resultsList, _ := readMetricsFromFile(s.Conf.MetricDir + "/" + f.Name())
176-
s.Mon.Increase(&s.Mon.got.dir, len(resultsList))
176+
s.Mon.Increase(&s.Mon.serverStat.dir, len(resultsList))
177177
s.cleanAndUseIncomingData(resultsList)
178178
}
179179

0 commit comments

Comments
 (0)