Skip to content

Commit 98b8b06

Browse files
remote loggers improvements
- Convert remote syslog to remote. There's no real difference between both, and the code was the same. Maybe we could limit the logging formats allowed. - Remote logger: Added option to reopen the connection after n errors trying to send messages.
1 parent da4761f commit 98b8b06

File tree

2 files changed

+48
-194
lines changed

2 files changed

+48
-194
lines changed

daemon/log/loggers/remote.go

Lines changed: 40 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,24 @@ import (
1818

1919
const (
2020
LOGGER_REMOTE = "remote"
21+
22+
// restart syslog connection after these amount of errors
23+
maxAllowedErrors = 10
24+
maxConnRetries = 600
25+
)
26+
27+
var (
28+
// default write / connect timeouts
29+
writeTimeout, _ = time.ParseDuration("1s")
30+
connTimeout, _ = time.ParseDuration("5s")
31+
reopenInterval, _ = time.ParseDuration("5s")
32+
)
33+
34+
// connection status
35+
const (
36+
DISCONNECTED = iota
37+
CONNECTED
38+
CONNECTING
2139
)
2240

2341
// Remote defines a logger that writes events to a generic remote server.
@@ -153,23 +171,6 @@ func (s *Remote) Close() (err error) {
153171
return
154172
}
155173

156-
// ReOpen tries to reestablish the connection with the writer
157-
func (s *Remote) ReOpen() {
158-
if atomic.LoadUint32(&s.status) == CONNECTING {
159-
return
160-
}
161-
atomic.StoreUint32(&s.status, CONNECTING)
162-
if err := s.Close(); err != nil {
163-
log.Debug("[%s] error closing Close(): %s", s.Name, err)
164-
}
165-
166-
if err := s.Open(); err != nil {
167-
log.Debug("[%s] ReOpen() error: %s", s.Name, err)
168-
} else {
169-
log.Debug("[%s] ReOpen() ok", s.Name)
170-
}
171-
}
172-
173174
// Transform transforms data for proper ingestion.
174175
func (s *Remote) Transform(args ...interface{}) (out string) {
175176
if s.logFormat != nil {
@@ -196,36 +197,50 @@ func (s *Remote) formatLine(msg string) string {
196197
// incoming messages to be forwarded to the server.
197198
func writerWorker(id int, sys Remote, msgs <-chan string, done <-chan struct{}) {
198199
errors := 0
200+
connRetries := 0
201+
202+
Reopen:
203+
errors = 0
204+
199205
conn, err := sys.Dial(sys.cfg.Protocol, sys.cfg.Server, sys.ConnectTimeout)
200206
if err != nil {
201-
log.Error("[%s] Error opening connection, worker %d", sys.Name, id)
202-
return
207+
log.Debug("[%s] Error opening connection, worker %d, retrying... (%d/%d)", sys.Name, id, connRetries, maxConnRetries)
208+
connRetries++
209+
if connRetries > maxConnRetries {
210+
log.Error("[%s] Error opening connection, worker %d, giving up", sys.Name, id)
211+
return
212+
}
213+
214+
// wait time before reopen attempt
215+
time.Sleep(reopenInterval)
216+
goto Reopen
203217
}
218+
connRetries = 0
204219
log.Debug("[%s] worker %d, connection opened", sys.Name, id)
205220

206221
for {
207222
select {
208223
case <-done:
209224
goto Exit
210225
case msg := <-msgs:
211-
log.Trace("[%s] %d writing writes", sys.Name, id)
226+
//log.Trace("[%s] %d writing writes", sys.Name, id)
212227

213228
// define a write timeout for this operation from Now.
214229
deadline := time.Now().Add(sys.Timeout)
215230
conn.SetWriteDeadline(deadline)
216-
b, err := conn.Write([]byte(msg))
231+
_, err := conn.Write([]byte(msg))
217232
if err != nil {
218-
log.Trace("[%s] error writing via writer %d (%d): %s", sys.Name, id, b, err)
219-
// TODO: reopen the connection on max errors
233+
log.Trace("[%s] error writing via writer %d: %s", sys.Name, id, err)
220234
errors++
221235
if errors > maxAllowedErrors {
222236
log.Important("[%s] writer %d: too much errors, review the configuration and / or connectivity with the remote server", sys.Name, id)
223-
goto Exit
237+
goto Reopen
224238
}
225239
}
226240
}
227241
}
242+
228243
Exit:
229-
log.Debug("[%s] %d connection closed (errors: %d)", sys.Name, id, errors)
244+
log.Debug("[%s logger] %d connection closed (errors: %d)", sys.Name, id, errors)
230245
conn.Close()
231246
}
Lines changed: 8 additions & 169 deletions
Original file line numberDiff line numberDiff line change
@@ -1,193 +1,32 @@
11
package loggers
22

33
import (
4-
"fmt"
5-
"net"
6-
"os"
7-
"sync"
8-
"sync/atomic"
9-
"time"
10-
114
"github.com/evilsocket/opensnitch/daemon/log"
12-
"github.com/evilsocket/opensnitch/daemon/log/formats"
135
)
146

157
const (
168
LOGGER_REMOTE_SYSLOG = "remote_syslog"
17-
// restart syslog connection after these amount of errors
18-
maxAllowedErrors = 10
19-
)
20-
21-
var (
22-
// default write / connect timeouts
23-
writeTimeout, _ = time.ParseDuration("1s")
24-
connTimeout, _ = time.ParseDuration("5s")
25-
)
26-
27-
// connection status
28-
const (
29-
DISCONNECTED = iota
30-
CONNECTED
31-
CONNECTING
329
)
3310

34-
// RemoteSyslog defines the logger that writes traces to the syslog.
35-
// It can write to the local or a remote daemon.
3611
type RemoteSyslog struct {
37-
Syslog
38-
mu *sync.RWMutex
39-
netConn net.Conn
40-
Hostname string
41-
Timeout time.Duration
42-
ConnectTimeout time.Duration
43-
errors uint32
44-
status uint32
12+
Remote
4513
}
4614

4715
// NewRemoteSyslog returns a new object that manipulates and prints outbound connections
4816
// to a remote syslog server, with the given format (RFC5424 by default)
4917
func NewRemoteSyslog(cfg LoggerConfig) (*RemoteSyslog, error) {
50-
var err error
51-
log.Info("NewSyslog logger: %v", cfg)
52-
53-
sys := &RemoteSyslog{
54-
mu: &sync.RWMutex{},
55-
}
56-
sys.Name = LOGGER_REMOTE_SYSLOG
57-
sys.cfg = cfg
58-
59-
// list of allowed formats for this logger
60-
sys.logFormat = formats.NewRfc5424()
61-
if cfg.Format == formats.RFC3164 {
62-
sys.logFormat = formats.NewRfc3164()
63-
} else if cfg.Format == formats.CSV {
64-
sys.logFormat = formats.NewCSV()
65-
}
66-
67-
sys.Tag = logTag
68-
if cfg.Tag != "" {
69-
sys.Tag = cfg.Tag
70-
}
71-
sys.Hostname, err = os.Hostname()
72-
if err != nil {
73-
sys.Hostname = "localhost"
74-
}
75-
sys.Timeout, err = time.ParseDuration(cfg.WriteTimeout)
76-
if err != nil || cfg.WriteTimeout == "" {
77-
sys.Timeout = writeTimeout
78-
}
79-
80-
sys.ConnectTimeout, err = time.ParseDuration(cfg.ConnectTimeout)
81-
if err != nil || cfg.ConnectTimeout == "" {
82-
sys.ConnectTimeout = connTimeout
83-
}
84-
85-
if err = sys.Open(); err != nil {
86-
log.Error("Error loading logger [%s]: %s", sys.Name, err)
87-
return nil, err
88-
}
89-
log.Info("[%s] initialized: %v", sys.Name, cfg)
90-
91-
return sys, err
92-
}
93-
94-
// Open opens a new connection with a server or with the daemon.
95-
func (s *RemoteSyslog) Open() (err error) {
96-
atomic.StoreUint32(&s.errors, 0)
97-
if s.cfg.Server == "" {
98-
return fmt.Errorf("[%s] Server address must not be empty", s.Name)
99-
}
100-
s.mu.Lock()
101-
s.netConn, err = s.Dial(s.cfg.Protocol, s.cfg.Server, s.ConnectTimeout)
102-
s.mu.Unlock()
18+
log.Info("NewRemoteSyslog logger: %v", cfg)
10319

104-
if err == nil {
105-
atomic.StoreUint32(&s.status, CONNECTED)
106-
}
107-
return err
108-
}
109-
110-
// Dial opens a new connection with a syslog server.
111-
func (s *RemoteSyslog) Dial(proto, addr string, connTimeout time.Duration) (netConn net.Conn, err error) {
112-
switch proto {
113-
case "udp", "tcp":
114-
netConn, err = net.DialTimeout(proto, addr, connTimeout)
115-
if err != nil {
116-
return nil, err
117-
}
118-
default:
119-
return nil, fmt.Errorf("[%s] Network protocol %s not supported", s.Name, proto)
120-
}
121-
122-
return netConn, nil
123-
}
124-
125-
// Close closes the writer object
126-
func (s *RemoteSyslog) Close() (err error) {
127-
//s.mu.RLock()
128-
if s.netConn != nil {
129-
err = s.netConn.Close()
130-
s.netConn = nil
131-
}
132-
//s.mu.RUnlock()
133-
atomic.StoreUint32(&s.status, DISCONNECTED)
134-
return
135-
}
136-
137-
// ReOpen tries to reestablish the connection with the writer
138-
func (s *RemoteSyslog) ReOpen() {
139-
if atomic.LoadUint32(&s.status) == CONNECTING {
140-
return
141-
}
142-
atomic.StoreUint32(&s.status, CONNECTING)
143-
if err := s.Close(); err != nil {
144-
log.Debug("[%s] error closing Close(): %s", s.Name, err)
20+
r, err := NewRemote(cfg)
21+
r.Name = LOGGER_REMOTE_SYSLOG
22+
rs := &RemoteSyslog{
23+
Remote: *r,
14524
}
14625

147-
if err := s.Open(); err != nil {
148-
log.Debug("[%s] ReOpen() error: %s", s.Name, err)
149-
return
150-
}
151-
}
152-
153-
// Transform transforms data for proper ingestion.
154-
func (s *RemoteSyslog) Transform(args ...interface{}) (out string) {
155-
if s.logFormat != nil {
156-
args = append(args, s.Hostname)
157-
args = append(args, s.Tag)
158-
out = s.logFormat.Transform(args...)
159-
}
160-
return
161-
}
162-
163-
func (s *RemoteSyslog) Write(msg string) {
164-
deadline := time.Now().Add(s.Timeout)
165-
166-
// BUG: it's fairly common to have write timeouts via udp/tcp.
167-
// Reopening the connection with the server helps to resume sending events to syslog,
168-
// and have a continuous stream of events. Otherwise it'd stop working.
169-
// I haven't figured out yet why these write errors ocurr.
170-
s.mu.RLock()
171-
defer s.mu.RUnlock()
172-
if s.netConn == nil {
173-
s.ReOpen()
174-
return
175-
}
176-
s.netConn.SetWriteDeadline(deadline)
177-
_, err := s.netConn.Write([]byte(msg))
178-
if err == nil {
179-
return
180-
}
181-
182-
log.Debug("[%s] %s write error: %v", s.Name, s.cfg.Protocol, err.(net.Error))
183-
atomic.AddUint32(&s.errors, 1)
184-
if atomic.LoadUint32(&s.errors) > maxAllowedErrors {
185-
s.ReOpen()
186-
return
187-
}
26+
return rs, err
18827
}
18928

19029
// https://cs.opensource.google/go/go/+/refs/tags/go1.18.2:src/log/syslog/syslog.go;l=286;drc=0a1a092c4b56a1d4033372fbd07924dad8cbb50b
191-
func (s *RemoteSyslog) formatLine(msg string) string {
30+
func (rs *RemoteSyslog) formatLine(msg string) string {
19231
return msg
19332
}

0 commit comments

Comments
 (0)