Skip to content

Commit 221923d

Browse files
improved loggers , remote logger
- Fixed issue when configuring several loggers. - Remote logger: - open as many connections with the remote server as specified in the Workers field. - open and handle every connection in each own goroutine. - get rid of the locks.
1 parent f54534b commit 221923d

File tree

4 files changed

+89
-48
lines changed

4 files changed

+89
-48
lines changed

daemon/log/loggers/logger.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -103,15 +103,15 @@ func (l *LoggerManager) Load(configs []LoggerConfig) {
103103
for _, cfg := range configs {
104104
switch cfg.Name {
105105
case LOGGER_REMOTE:
106-
if lgr, err := NewRemote(&cfg); err == nil {
106+
if lgr, err := NewRemote(cfg); err == nil {
107107
l.loggers[fmt.Sprint(lgr.Name, lgr.cfg.Server, lgr.cfg.Protocol)] = lgr
108108
}
109109
case LOGGER_REMOTE_SYSLOG:
110-
if lgr, err := NewRemoteSyslog(&cfg); err == nil {
110+
if lgr, err := NewRemoteSyslog(cfg); err == nil {
111111
l.loggers[fmt.Sprint(lgr.Name, lgr.cfg.Server, lgr.cfg.Protocol)] = lgr
112112
}
113113
case LOGGER_SYSLOG:
114-
if lgr, err := NewSyslog(&cfg); err == nil {
114+
if lgr, err := NewSyslog(cfg); err == nil {
115115
l.loggers[lgr.Name] = lgr
116116
}
117117
}
@@ -143,6 +143,7 @@ func (l *LoggerManager) Stop() {
143143
func (l *LoggerManager) write(args ...interface{}) {
144144
//l.mu.RLock()
145145
//defer l.mu.RUnlock()
146+
// FIXME: leak when configuring the loggers.
146147
for _, logger := range l.loggers {
147148
logger.Write(logger.Transform(args...))
148149
}

daemon/log/loggers/remote.go

Lines changed: 82 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package loggers
22

33
import (
4+
"context"
45
"fmt"
56
"log/syslog"
67
"net"
@@ -19,28 +20,42 @@ const (
1920
LOGGER_REMOTE = "remote"
2021
)
2122

22-
// Remote defines the logger that writes events to a generic remote server.
23-
// It can write to the local or a remote daemon, UDP or TCP.
23+
// Remote defines a logger that writes events to a generic remote server.
24+
// It can write to a local or a remote daemon, UDP or TCP.
2425
// It supports writing events in RFC5424, RFC3164, CSV and JSON formats.
2526
type Remote struct {
26-
mu *sync.RWMutex
27-
Writer *syslog.Writer
28-
cfg *LoggerConfig
29-
logFormat formats.LoggerFormat
30-
netConn net.Conn
31-
Name string
32-
Tag string
33-
Hostname string
34-
Timeout time.Duration
27+
mu *sync.RWMutex
28+
Writer *syslog.Writer
29+
cfg LoggerConfig
30+
ctx context.Context
31+
cancel context.CancelFunc
32+
logFormat formats.LoggerFormat
33+
34+
netConn net.Conn
35+
36+
// Name of the logger
37+
Name string
38+
39+
// channel used to write mesages
40+
writerChan chan string
41+
42+
Tag string
43+
44+
// Name of the host where the daemon is running
45+
Hostname string
46+
47+
// Write timeouts
48+
Timeout time.Duration
49+
50+
// Connect timeout
3551
ConnectTimeout time.Duration
36-
errors uint32
37-
maxErrors uint32
38-
status uint32
52+
53+
status uint32
3954
}
4055

4156
// NewRemote returns a new object that manipulates and prints outbound connections
42-
// to a remote syslog server, with the given format (RFC5424 by default)
43-
func NewRemote(cfg *LoggerConfig) (*Remote, error) {
57+
// to a remote server, with the given format (RFC5424 by default)
58+
func NewRemote(cfg LoggerConfig) (*Remote, error) {
4459
var err error
4560
log.Info("NewRemote logger: %v", cfg)
4661

@@ -49,6 +64,7 @@ func NewRemote(cfg *LoggerConfig) (*Remote, error) {
4964
}
5065
sys.Name = LOGGER_REMOTE
5166
sys.cfg = cfg
67+
sys.ctx, sys.cancel = context.WithCancel(context.Background())
5268

5369
// list of allowed formats for this logger
5470
sys.logFormat = formats.NewRfc5424()
@@ -78,18 +94,26 @@ func NewRemote(cfg *LoggerConfig) (*Remote, error) {
7894
sys.ConnectTimeout = connTimeout
7995
}
8096

97+
// initial connection test
8198
if err = sys.Open(); err != nil {
8299
log.Error("Error loading logger [%s]: %s", sys.Name, err)
83100
return nil, err
84101
}
85102
log.Info("[%s] initialized: %v", sys.Name, cfg)
86103

104+
sys.writerChan = make(chan string)
105+
if sys.cfg.Workers == 0 {
106+
sys.cfg.Workers = 1
107+
}
108+
for i := 0; i < sys.cfg.Workers; i++ {
109+
go writerWorker(i, *sys, sys.writerChan, sys.ctx.Done())
110+
}
111+
87112
return sys, err
88113
}
89114

90115
// Open opens a new connection with a server or with the daemon.
91116
func (s *Remote) Open() (err error) {
92-
atomic.StoreUint32(&s.errors, 0)
93117
if s.cfg.Server == "" {
94118
return fmt.Errorf("[%s] Server address must not be empty", s.Name)
95119
}
@@ -112,7 +136,7 @@ func (s *Remote) Dial(proto, addr string, connTimeout time.Duration) (netConn ne
112136
return nil, err
113137
}
114138
default:
115-
return nil, fmt.Errorf("[%s] Network protocol %s not supported", s.Name, proto)
139+
return nil, fmt.Errorf("[%s] Network protocol %s not supported (use 'tcp' or 'udp')", s.Name, proto)
116140
}
117141

118142
return netConn, nil
@@ -124,6 +148,7 @@ func (s *Remote) Close() (err error) {
124148
err = s.netConn.Close()
125149
s.netConn = nil
126150
}
151+
s.cancel()
127152
atomic.StoreUint32(&s.status, DISCONNECTED)
128153
return
129154
}
@@ -156,30 +181,7 @@ func (s *Remote) Transform(args ...interface{}) (out string) {
156181
}
157182

158183
func (s *Remote) Write(msg string) {
159-
deadline := time.Now().Add(s.Timeout)
160-
161-
// BUG: it's fairly common to have write timeouts via udp/tcp.
162-
// Reopening the connection with the server helps to resume sending events to the server,
163-
// and have a continuous stream of events. Otherwise it'd stop working.
164-
// I haven't figured out yet why these write errors ocurr.
165-
s.mu.Lock()
166-
defer s.mu.Unlock()
167-
s.netConn.SetWriteDeadline(deadline)
168-
if s.netConn == nil {
169-
s.ReOpen()
170-
return
171-
}
172-
_, err := s.netConn.Write([]byte(msg))
173-
if err == nil {
174-
return
175-
}
176-
177-
log.Debug("[%s] %s write error: %v", s.Name, s.cfg.Protocol, err.(net.Error))
178-
atomic.AddUint32(&s.errors, 1)
179-
if atomic.LoadUint32(&s.errors) > maxAllowedErrors {
180-
s.ReOpen()
181-
return
182-
}
184+
s.writerChan <- msg
183185
}
184186

185187
func (s *Remote) formatLine(msg string) string {
@@ -189,3 +191,41 @@ func (s *Remote) formatLine(msg string) string {
189191
}
190192
return core.ConcatStrings(msg, nl)
191193
}
194+
195+
// each worker opens a new connection with the remote server, and waits for
196+
// incoming messages to be forwarded to the server.
197+
func writerWorker(id int, sys Remote, msgs <-chan string, done <-chan struct{}) {
198+
errors := 0
199+
conn, err := sys.Dial(sys.cfg.Protocol, sys.cfg.Server, sys.ConnectTimeout)
200+
if err != nil {
201+
log.Error("[%s] Error opening connection, worker %d", sys.Name, id)
202+
return
203+
}
204+
log.Debug("[%s] worker %d, connection opened", sys.Name, id)
205+
206+
for {
207+
select {
208+
case <-done:
209+
goto Exit
210+
case msg := <-msgs:
211+
log.Trace("[%s] %d writing writes", sys.Name, id)
212+
213+
// define a write timeout for this operation from Now.
214+
deadline := time.Now().Add(sys.Timeout)
215+
conn.SetWriteDeadline(deadline)
216+
b, err := conn.Write([]byte(msg))
217+
if err != nil {
218+
log.Trace("[%s] error writing via writer %d (%d): %s", sys.Name, id, b, err)
219+
// TODO: reopen the connection on max errors
220+
errors++
221+
if errors > maxAllowedErrors {
222+
log.Important("[%s] writer %d: too much errors, review the configuration and / or connectivity with the remote server", sys.Name, id)
223+
goto Exit
224+
}
225+
}
226+
}
227+
}
228+
Exit:
229+
log.Debug("[%s] %d connection closed (errors: %d)", sys.Name, id, errors)
230+
conn.Close()
231+
}

daemon/log/loggers/remote_syslog.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ type RemoteSyslog struct {
4646

4747
// NewRemoteSyslog returns a new object that manipulates and prints outbound connections
4848
// to a remote syslog server, with the given format (RFC5424 by default)
49-
func NewRemoteSyslog(cfg *LoggerConfig) (*RemoteSyslog, error) {
49+
func NewRemoteSyslog(cfg LoggerConfig) (*RemoteSyslog, error) {
5050
var err error
5151
log.Info("NewSyslog logger: %v", cfg)
5252

daemon/log/loggers/syslog.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,16 +14,16 @@ const (
1414
// Syslog defines the logger that writes traces to the syslog.
1515
// It can write to the local or a remote daemon.
1616
type Syslog struct {
17-
cfg *LoggerConfig
1817
Writer *syslog.Writer
18+
cfg LoggerConfig
1919
logFormat formats.LoggerFormat
2020
Name string
2121
Tag string
2222
}
2323

2424
// NewSyslog returns a new object that manipulates and prints outbound connections
2525
// to syslog (local or remote), with the given format (RFC5424 by default)
26-
func NewSyslog(cfg *LoggerConfig) (*Syslog, error) {
26+
func NewSyslog(cfg LoggerConfig) (*Syslog, error) {
2727
var err error
2828
log.Info("NewSyslog logger: %v", cfg)
2929

0 commit comments

Comments
 (0)