Skip to content

Commit e3f379d

Browse files
authored
Merge pull request #4 from innogames/carbon_backend
Use string as backend instead of net.TCP
2 parents 4afb808 + feb95f8 commit e3f379d

File tree

6 files changed

+111
-102
lines changed

6 files changed

+111
-102
lines changed

client.go

Lines changed: 43 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,18 @@ func (c Client) createRetryDir() error {
3737
}
3838

3939
// Save []string to file.
40-
func (c Client) saveSliceToRetry(metrics []string, backend string) error {
40+
func (c Client) saveSliceToRetry(metrics []string, carbonAddr string) error {
4141
// If size of file is bigger, than max size we will remove lines from this file,
4242
// and will call this function again to check result and write to the file.
4343
// Recursion:)
4444

4545
c.Lc.lg.Printf("Resaving %d metrics back to the retry-file", len(metrics))
4646

47-
retFile := path.Join(c.Conf.RetryDir, backend)
47+
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
4848
f, err := os.OpenFile(retFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
4949
if err != nil {
5050
c.Lc.lg.Println(err)
51-
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, len(metrics))
51+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, len(metrics))
5252
return err
5353
}
5454
defer f.Close()
@@ -62,13 +62,13 @@ func (c Client) saveSliceToRetry(metrics []string, backend string) error {
6262
}
6363
}
6464
if dropped > 0 {
65-
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, dropped)
65+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, dropped)
6666
}
67-
return c.removeOldDataFromRetryFile(backend)
67+
return c.removeOldDataFromRetryFile(carbonAddr)
6868
}
6969

7070
// Save part of entire content of channel to file.
71-
func (c Client) saveChannelToRetry(ch chan string, size int, backend string) {
71+
func (c Client) saveChannelToRetry(ch chan string, size int, carbonAddr string) {
7272
// If size of file is bigger, than max size we will remove lines from this file,
7373
// and will call this function again to check result and write to the file.
7474
// Recursion:)
@@ -81,7 +81,7 @@ func (c Client) saveChannelToRetry(ch chan string, size int, backend string) {
8181

8282
c.Lc.lg.Printf("Saving %d metrics from channel to the retry-file", size)
8383

84-
retFile := path.Join(c.Conf.RetryDir, backend)
84+
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
8585
f, err := os.OpenFile(retFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
8686
if err != nil {
8787
c.Lc.lg.Println(err.Error())
@@ -99,31 +99,31 @@ func (c Client) saveChannelToRetry(ch chan string, size int, backend string) {
9999
}
100100
}
101101
if dropped > 0 {
102-
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, dropped)
102+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, dropped)
103103
}
104104
if saved > 0 {
105-
c.Mon.Increase(&c.Mon.clientStat[backend].saved, saved)
105+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].saved, saved)
106106
}
107-
c.removeOldDataFromRetryFile(backend)
107+
c.removeOldDataFromRetryFile(carbonAddr)
108108
}
109109

110110
// Cleaning up retry-file.
111111
// Entire file is sorted to have newest metrics at the beginning.
112-
func (c Client) removeOldDataFromRetryFile(backend string) error {
113-
retFile := path.Join(c.Conf.RetryDir, backend)
112+
func (c Client) removeOldDataFromRetryFile(carbonAddr string) error {
113+
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
114114
currentLinesInFile := getSizeInLinesFromFile(retFile)
115115
if currentLinesInFile > c.Lc.fileMetricSize {
116116
c.Lc.lg.Printf("I can not save to %s more, than %d. I will have to drop the rest (%d)",
117117
retFile, c.Lc.fileMetricSize, currentLinesInFile-c.Lc.fileMetricSize)
118118
// We save first c.Lc.fileMetricSize of metrics (newest)
119119
wholeFile, _ := readMetricsFromFile(retFile)
120-
return c.saveSliceToRetry(wholeFile[:c.Lc.fileMetricSize], backend)
120+
return c.saveSliceToRetry(wholeFile[:c.Lc.fileMetricSize], carbonAddr)
121121
}
122122
return nil
123123
}
124124

125125
// Attempt to send metric to graphite server via connection
126-
func (c *Client) tryToSendToGraphite(metric string, conn net.Conn) error {
126+
func (c *Client) tryToSendToGraphite(metric string, carbonAddr string, conn net.Conn) error {
127127
// If at any point "HOSTNAME" was used instead of real hostname - replace it
128128
metric = strings.Replace(metric, "HOSTNAME", c.Lc.hostname, -1)
129129

@@ -132,8 +132,7 @@ func (c *Client) tryToSendToGraphite(metric string, conn net.Conn) error {
132132
c.Lc.lg.Println("Write to server failed:", err.Error())
133133
return err
134134
}
135-
backend := conn.RemoteAddr().String()
136-
c.Mon.Increase(&c.Mon.clientStat[backend].sent, 1)
135+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].sent, 1)
137136
return nil
138137
}
139138

@@ -143,24 +142,24 @@ func (c *Client) tryToSendToGraphite(metric string, conn net.Conn) error {
143142
// 3) Send metrics from the main channel to carbon
144143
//
145144
// And save everything to the retryFile on any error
146-
func (c Client) runBackend(backend string) {
147-
retFile := path.Join(c.Conf.RetryDir, backend)
145+
func (c Client) runBackend(carbonAddr string) {
146+
retFile := path.Join(c.Conf.RetryDir, carbonAddr)
148147
chanLock.Lock()
149-
monChannel := c.monChannels[backend]
150-
mainChannel := c.mainChannels[backend]
148+
monChannel := c.monChannels[carbonAddr]
149+
mainChannel := c.mainChannels[carbonAddr]
151150
chanLock.Unlock()
152151
// TODO: think about graceful shutdown with flush channels
153152

154153
for ; ; time.Sleep(time.Duration(c.Conf.ClientSendInterval) * time.Second) {
155154
var connectionFailed bool
156155

157156
// Try to dial to Graphite server. If ClientSendInterval is 10 seconds - dial should be no longer than 1 second
158-
conn, err := net.DialTimeout("tcp", backend, time.Duration(c.Conf.ConnectTimeout)*time.Second)
157+
conn, err := net.DialTimeout("tcp", carbonAddr, time.Duration(c.Conf.ConnectTimeout)*time.Second)
159158
if err != nil {
160159
c.Lc.lg.Println("Can not connect to graphite server: ", err.Error())
161-
c.saveChannelToRetry(monChannel, len(monChannel), backend)
162-
c.saveChannelToRetry(mainChannel, len(mainChannel), backend)
163-
c.removeOldDataFromRetryFile(backend)
160+
c.saveChannelToRetry(monChannel, len(monChannel), carbonAddr)
161+
c.saveChannelToRetry(mainChannel, len(mainChannel), carbonAddr)
162+
c.removeOldDataFromRetryFile(carbonAddr)
164163
continue
165164
}
166165

@@ -178,20 +177,20 @@ func (c Client) runBackend(backend string) {
178177
retryFileMetrics, _ := readMetricsFromFile(retFile)
179178
for numOfMetricFromFile, metricFromFile := range retryFileMetrics {
180179
if numOfMetricFromFile < c.Lc.mainBufferSize {
181-
err = c.tryToSendToGraphite(metricFromFile, conn)
180+
err = c.tryToSendToGraphite(metricFromFile, carbonAddr, conn)
182181
if err != nil {
183182
c.Lc.lg.Printf("Error happened in the middle of writing retry metrics. Resaving %d metrics\n", len(retryFileMetrics[numOfMetricFromFile:]))
184183
// If we failed to write a metric to graphite - something is wrong with connection
185-
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:], backend)
184+
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:], carbonAddr)
186185
connectionFailed = true
187186
break
188187
} else {
189-
c.Mon.Increase(&c.Mon.clientStat[backend].fromRetry, 1)
188+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].fromRetry, 1)
190189
}
191190

192191
} else {
193192
c.Lc.lg.Printf("Can read only %d metrics from %s. Rest %d will be kept for the next run", numOfMetricFromFile, retFile, len(retryFileMetrics[numOfMetricFromFile:]))
194-
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:], backend)
193+
c.saveSliceToRetry(retryFileMetrics[numOfMetricFromFile:], carbonAddr)
195194
break
196195
}
197196
}
@@ -201,16 +200,16 @@ func (c Client) runBackend(backend string) {
201200
bufSize := len(monChannel)
202201
if !connectionFailed {
203202
for i := 0; i < bufSize; i++ {
204-
err = c.tryToSendToGraphite(<-monChannel, conn)
203+
err = c.tryToSendToGraphite(<-monChannel, carbonAddr, conn)
205204
if err != nil {
206205
c.Lc.lg.Println("Error happened in the middle of writing monitoring metrics. Saving...")
207-
c.saveChannelToRetry(monChannel, bufSize-i, backend)
206+
c.saveChannelToRetry(monChannel, bufSize-i, carbonAddr)
208207
connectionFailed = true
209208
break
210209
}
211210
}
212211
} else {
213-
c.saveChannelToRetry(monChannel, bufSize, backend)
212+
c.saveChannelToRetry(monChannel, bufSize, carbonAddr)
214213
}
215214

216215
// Main Buffer. We read it completely but send only part which fits in mainBufferSize
@@ -222,15 +221,15 @@ func (c Client) runBackend(backend string) {
222221
for processedMainBuff := 0; processedMainBuff < bufSize; processedMainBuff = processedMainBuff + 1 {
223222
metric := <-mainChannel
224223

225-
err = c.tryToSendToGraphite(metric, conn)
224+
err = c.tryToSendToGraphite(metric, carbonAddr, conn)
226225
if err != nil {
227226
c.Lc.lg.Printf("Error happened in the middle of writing metrics. Saving %d metrics\n", bufSize-processedMainBuff)
228-
c.saveChannelToRetry(mainChannel, bufSize-processedMainBuff, backend)
227+
c.saveChannelToRetry(mainChannel, bufSize-processedMainBuff, carbonAddr)
229228
break
230229
}
231230
}
232231
} else {
233-
c.saveChannelToRetry(mainChannel, bufSize, backend)
232+
c.saveChannelToRetry(mainChannel, bufSize, carbonAddr)
234233
}
235234
conn.Close()
236235
}
@@ -249,13 +248,12 @@ func (c Client) Run() {
249248
c.mainChannels = make(map[string]chan string)
250249
c.monChannels = make(map[string]chan string)
251250

252-
for _, carbonAddrTCP := range c.Lc.carbonAddrsTCP {
253-
backend := carbonAddrTCP.String()
251+
for _, carbonAddr := range c.Conf.CarbonAddrs {
254252
chanLock.Lock()
255-
c.mainChannels[backend] = make(chan string, cap(c.Lc.mainChannel))
256-
c.monChannels[backend] = make(chan string, cap(c.Lc.monitoringChannel))
253+
c.mainChannels[carbonAddr] = make(chan string, cap(c.Lc.mainChannel))
254+
c.monChannels[carbonAddr] = make(chan string, cap(c.Lc.monitoringChannel))
257255
chanLock.Unlock()
258-
go c.runBackend(backend)
256+
go c.runBackend(carbonAddr)
259257
}
260258

261259
sup := supervisor(c.Conf.Supervisor)
@@ -266,24 +264,22 @@ func (c Client) Run() {
266264
// write metrics from monitoring and main channels to the server specific channels
267265
for i := 0; i < len(c.Lc.mainChannel); i++ {
268266
metric := <-c.Lc.mainChannel
269-
for _, carbonAddrTCP := range c.Lc.carbonAddrsTCP {
270-
backend := carbonAddrTCP.String()
267+
for _, carbonAddr := range c.Conf.CarbonAddrs {
271268
select {
272-
case c.mainChannels[backend] <- metric:
269+
case c.mainChannels[carbonAddr] <- metric:
273270
default:
274-
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, 1)
271+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, 1)
275272
}
276273
}
277274
}
278275

279276
for i := 0; i < len(c.Lc.monitoringChannel); i++ {
280277
metric := <-c.Lc.monitoringChannel
281-
for _, carbonAddrTCP := range c.Lc.carbonAddrsTCP {
282-
backend := carbonAddrTCP.String()
278+
for _, carbonAddr := range c.Conf.CarbonAddrs {
283279
select {
284-
case c.monChannels[backend] <- metric:
280+
case c.monChannels[carbonAddr] <- metric:
285281
default:
286-
c.Mon.Increase(&c.Mon.clientStat[backend].dropped, 1)
282+
c.Mon.Increase(&c.Mon.clientStat[carbonAddr].dropped, 1)
287283
}
288284
}
289285
}

config.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,9 +114,6 @@ type LocalConfig struct {
114114
// Main logger.
115115
lg *log.Logger
116116

117-
// Carbon address as a Go type.
118-
carbonAddrsTCP []*net.TCPAddr
119-
120117
// Aggregation prefix regexp.
121118
allowedMetrics *regexp.Regexp
122119

@@ -194,6 +191,14 @@ func (conf *Config) prepareEnvironment() error {
194191
}
195192
}
196193

194+
// Check if servers in CarbonAddrs are resolvable
195+
for _, carbonAddr := range conf.CarbonAddrs {
196+
_, err := net.ResolveTCPAddr("tcp", carbonAddr)
197+
if err != nil {
198+
return errors.New("Could not resolve an address from CarbonAddrs: " + err.Error())
199+
}
200+
}
201+
197202
return nil
198203
}
199204

@@ -214,16 +219,6 @@ func (conf *Config) GenerateLocalConfig() (*LocalConfig, error) {
214219
return nil, errors.Wrap(err, "Can not prepare environment")
215220
}
216221

217-
carbonAddrsTCP := make([]*net.TCPAddr, 0, len(conf.CarbonAddrs))
218-
for _, carbonAddrString := range conf.CarbonAddrs {
219-
carbonAddrTCP, err := net.ResolveTCPAddr("tcp", carbonAddrString)
220-
if err != nil {
221-
return nil, errors.New("This is not a valid address: " + err.Error())
222-
}
223-
224-
carbonAddrsTCP = append(carbonAddrsTCP, carbonAddrTCP)
225-
}
226-
227222
/*
228223
Units - metric
229224
*/
@@ -258,7 +253,7 @@ func (conf *Config) GenerateLocalConfig() (*LocalConfig, error) {
258253
}
259254

260255
// There are 4 metrics per backend in client and 3 in server stats
261-
MonitorMetrics := 3 + len(carbonAddrsTCP)*4
256+
MonitorMetrics := 3 + len(conf.CarbonAddrs)*4
262257

263258
return &LocalConfig{
264259
hostname: hostname,
@@ -269,7 +264,6 @@ func (conf *Config) GenerateLocalConfig() (*LocalConfig, error) {
269264
*/
270265
fileMetricSize: conf.MetricsPerSecond * conf.ClientSendInterval * 10,
271266
lg: lg,
272-
carbonAddrsTCP: carbonAddrsTCP,
273267
allowedMetrics: regexp.MustCompile(conf.AllowedMetrics),
274268
aggrRegexp: regexp.MustCompile(fmt.Sprintf("^(%s|%s|%s|%s)..*", conf.AvgPrefix, conf.SumPrefix, conf.MinPrefix, conf.MaxPrefix)),
275269
overwriteRegexp: conf.generateRegexpsForOverwrite(),

grafsy.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ clientSendInterval = 10
55
metricsPerSecond = 1000
66

77
carbonAddrs = [
8-
"127.0.0.1:2003",
9-
"127.0.0.1:2004",
8+
"localhost:2003",
9+
"localhost:2004",
1010
]
1111
connectTimeout = 2
1212

0 commit comments

Comments
 (0)