Skip to content

Commit 29e6d7f

Browse files
committed
Do not use scanner to process datagrams; implement datagram channel
Don't use a scanner to process datagrams received from syslog, because a) datagrams are not meant to end with \n (or contain control characters at all), but they often do; previously relied on them doing so b) spurious \0 (also often sent after \n) caused issues c) junk from one datagram could affect subsequent datagrams by leaving the junk within the text field Pass UDP syslog source onwards towards the parser (parser does not yet handle it) Resolves (hopefully) issue #10 and issue #15
1 parent 0c7b4a4 commit 29e6d7f

File tree

2 files changed

+106
-52
lines changed

2 files changed

+106
-52
lines changed

server.go

Lines changed: 88 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@ var (
1616
RFC6587 = &format.RFC6587{} // RFC6587: http://www.ietf.org/rfc/rfc6587.txt
1717
)
1818

19+
const (
20+
datagramChannelBufferSize = 10
21+
datagramReadBufferSize = 64 * 1024
22+
)
23+
1924
type Server struct {
2025
listeners []*net.TCPListener
2126
connections []net.Conn
2227
wait sync.WaitGroup
2328
doneTcp chan bool
29+
datagramChannel chan DatagramMessage
2430
format format.Format
2531
handler Handler
2632
lastError error
@@ -58,6 +64,7 @@ func (s *Server) ListenUDP(addr string) error {
5864
if err != nil {
5965
return err
6066
}
67+
connection.SetReadBuffer(datagramReadBufferSize)
6168

6269
s.connections = append(s.connections, connection)
6370
return nil
@@ -74,6 +81,7 @@ func (s *Server) ListenUnixgram(addr string) error {
7481
if err != nil {
7582
return err
7683
}
84+
connection.SetReadBuffer(datagramReadBufferSize)
7785

7886
s.connections = append(s.connections, connection)
7987
return nil
@@ -110,8 +118,12 @@ func (s *Server) Boot() error {
110118
s.goAcceptConnection(listener)
111119
}
112120

121+
if len(s.connections) > 0 {
122+
s.goParseDatagrams()
123+
}
124+
113125
for _, connection := range s.connections {
114-
s.goScanConnection(connection, false)
126+
s.goReceiveDatagrams(connection)
115127
}
116128

117129
return nil
@@ -132,56 +144,44 @@ func (s *Server) goAcceptConnection(listener *net.TCPListener) {
132144
continue
133145
}
134146

135-
s.goScanConnection(connection, true)
147+
s.goScanConnection(connection)
136148
}
137149

138150
s.wait.Done()
139151
}(listener)
140152
}
141153

142-
func (s *Server) goScanConnection(connection net.Conn, needClose bool) {
154+
func (s *Server) goScanConnection(connection net.Conn) {
143155
scanner := bufio.NewScanner(connection)
144156
if sf := s.format.GetSplitFunc(); sf != nil {
145157
scanner.Split(sf)
146158
}
147159

148160
var scanCloser *ScanCloser
149-
if needClose {
150-
scanCloser = &ScanCloser{scanner, connection}
151-
} else {
152-
scanCloser = &ScanCloser{scanner, nil}
153-
}
161+
scanCloser = &ScanCloser{scanner, connection}
154162

155163
s.wait.Add(1)
156164
go s.scan(scanCloser)
157165
}
158166

159167
func (s *Server) scan(scanCloser *ScanCloser) {
160-
if scanCloser.closer == nil {
161-
// UDP
162-
for scanCloser.Scan() {
163-
s.parser([]byte(scanCloser.Text()))
168+
loop:
169+
for {
170+
select {
171+
case <-s.doneTcp:
172+
break loop
173+
default:
164174
}
165-
} else {
166-
// TCP
167-
loop:
168-
for {
169-
select {
170-
case <-s.doneTcp:
171-
break loop
172-
default:
173-
}
174-
if s.readTimeoutMilliseconds > 0 {
175-
scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond))
176-
}
177-
if scanCloser.Scan() {
178-
s.parser([]byte(scanCloser.Text()))
179-
} else {
180-
break loop
181-
}
175+
if s.readTimeoutMilliseconds > 0 {
176+
scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond))
177+
}
178+
if scanCloser.Scan() {
179+
s.parser([]byte(scanCloser.Text()))
180+
} else {
181+
break loop
182182
}
183-
scanCloser.closer.Close()
184183
}
184+
scanCloser.closer.Close()
185185

186186
s.wait.Done()
187187
}
@@ -220,6 +220,9 @@ func (s *Server) Kill() error {
220220
if s.doneTcp != nil {
221221
close(s.doneTcp)
222222
}
223+
if s.datagramChannel != nil {
224+
close(s.datagramChannel)
225+
}
223226
return nil
224227
}
225228

@@ -237,3 +240,58 @@ type ScanCloser struct {
237240
*bufio.Scanner
238241
closer TimeoutCloser
239242
}
243+
244+
type DatagramMessage struct {
245+
message []byte
246+
client string
247+
}
248+
249+
func (s *Server) goReceiveDatagrams(connection net.Conn) {
250+
packetconn, ok := connection.(net.PacketConn)
251+
if !ok {
252+
panic("Connection is not a packet connection")
253+
}
254+
s.wait.Add(1)
255+
go func() {
256+
defer s.wait.Done()
257+
for {
258+
buf := make([]byte, 65536)
259+
n, addr, err := packetconn.ReadFrom(buf)
260+
if err == nil {
261+
// Ignore trailing control characters and NULs
262+
for ; (n > 0) && (buf[n-1] < 32); n-- {
263+
}
264+
if n > 0 {
265+
s.datagramChannel <- DatagramMessage{buf[:n], addr.String()}
266+
}
267+
} else {
268+
// there has been an error. Either the server has been killed
269+
// or may be getting a transitory error due to (e.g.) the
270+
// interface being shutdown in which case sleep() to avoid busy wait.
271+
opError, ok := err.(*net.OpError)
272+
if (ok) && !opError.Temporary() && !opError.Timeout() {
273+
return
274+
}
275+
time.Sleep(10 * time.Millisecond)
276+
}
277+
}
278+
}()
279+
}
280+
281+
func (s *Server) goParseDatagrams() {
282+
s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize)
283+
284+
s.wait.Add(1)
285+
go func() {
286+
defer s.wait.Done()
287+
for {
288+
select {
289+
case msg, ok := (<-s.datagramChannel):
290+
if !ok {
291+
return
292+
}
293+
s.parser(msg.message)
294+
}
295+
}
296+
}()
297+
}

server_test.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,30 +109,26 @@ func (c *ConnMock) SetWriteDeadline(t time.Time) error {
109109
}
110110

111111
func (s *ServerSuite) TestConnectionClose(c *C) {
112-
for _, closeConnection := range []bool{true, false} {
113-
handler := new(HandlerMock)
114-
server := NewServer()
115-
server.SetFormat(RFC3164)
116-
server.SetHandler(handler)
117-
con := ConnMock{ReadData: []byte(exampleSyslog)}
118-
server.goScanConnection(&con, closeConnection)
119-
server.Wait()
120-
c.Check(con.isClosed, Equals, closeConnection)
121-
}
112+
handler := new(HandlerMock)
113+
server := NewServer()
114+
server.SetFormat(RFC3164)
115+
server.SetHandler(handler)
116+
con := ConnMock{ReadData: []byte(exampleSyslog)}
117+
server.goScanConnection(&con)
118+
server.Wait()
119+
c.Check(con.isClosed, Equals, true)
122120
}
123121

124122
func (s *ServerSuite) TestConnectionUDPKill(c *C) {
125-
for _, closeConnection := range []bool{true, false} {
126-
handler := new(HandlerMock)
127-
server := NewServer()
128-
server.SetFormat(RFC5424)
129-
server.SetHandler(handler)
130-
con := ConnMock{ReadData: []byte(exampleSyslog)}
131-
server.goScanConnection(&con, closeConnection)
132-
server.Kill()
133-
server.Wait()
134-
c.Check(con.isClosed, Equals, closeConnection)
135-
}
123+
handler := new(HandlerMock)
124+
server := NewServer()
125+
server.SetFormat(RFC5424)
126+
server.SetHandler(handler)
127+
con := ConnMock{ReadData: []byte(exampleSyslog)}
128+
server.goScanConnection(&con)
129+
server.Kill()
130+
server.Wait()
131+
c.Check(con.isClosed, Equals, true)
136132
}
137133

138134
func (s *ServerSuite) TestTcpTimeout(c *C) {
@@ -143,7 +139,7 @@ func (s *ServerSuite) TestTcpTimeout(c *C) {
143139
server.SetTimeout(10)
144140
con := ConnMock{ReadData: []byte(exampleSyslog), ReturnTimeout: true}
145141
c.Check(con.isReadDeadline, Equals, false)
146-
server.goScanConnection(&con, true)
142+
server.goScanConnection(&con)
147143
server.Wait()
148144
c.Check(con.isReadDeadline, Equals, true)
149145
c.Check(handler.LastLogParts, IsNil)

0 commit comments

Comments
 (0)