Skip to content

Commit 5ec6a62

Browse files
committed
Merge pull request #19 from abligh/datagrams-without-scanner
Do not use scanner to process datagrams; implement datagram channel
2 parents 0c7b4a4 + 29e6d7f commit 5ec6a62

File tree

2 files changed

+106
-52
lines changed

2 files changed

+106
-52
lines changed

server.go

Lines changed: 88 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@ var (
1616
RFC6587 = &format.RFC6587{} // RFC6587: http://www.ietf.org/rfc/rfc6587.txt
1717
)
1818

19+
const (
20+
datagramChannelBufferSize = 10
21+
datagramReadBufferSize = 64 * 1024
22+
)
23+
1924
type Server struct {
2025
listeners []*net.TCPListener
2126
connections []net.Conn
2227
wait sync.WaitGroup
2328
doneTcp chan bool
29+
datagramChannel chan DatagramMessage
2430
format format.Format
2531
handler Handler
2632
lastError error
@@ -58,6 +64,7 @@ func (s *Server) ListenUDP(addr string) error {
5864
if err != nil {
5965
return err
6066
}
67+
connection.SetReadBuffer(datagramReadBufferSize)
6168

6269
s.connections = append(s.connections, connection)
6370
return nil
@@ -74,6 +81,7 @@ func (s *Server) ListenUnixgram(addr string) error {
7481
if err != nil {
7582
return err
7683
}
84+
connection.SetReadBuffer(datagramReadBufferSize)
7785

7886
s.connections = append(s.connections, connection)
7987
return nil
@@ -110,8 +118,12 @@ func (s *Server) Boot() error {
110118
s.goAcceptConnection(listener)
111119
}
112120

121+
if len(s.connections) > 0 {
122+
s.goParseDatagrams()
123+
}
124+
113125
for _, connection := range s.connections {
114-
s.goScanConnection(connection, false)
126+
s.goReceiveDatagrams(connection)
115127
}
116128

117129
return nil
@@ -132,56 +144,44 @@ func (s *Server) goAcceptConnection(listener *net.TCPListener) {
132144
continue
133145
}
134146

135-
s.goScanConnection(connection, true)
147+
s.goScanConnection(connection)
136148
}
137149

138150
s.wait.Done()
139151
}(listener)
140152
}
141153

142-
func (s *Server) goScanConnection(connection net.Conn, needClose bool) {
154+
func (s *Server) goScanConnection(connection net.Conn) {
143155
scanner := bufio.NewScanner(connection)
144156
if sf := s.format.GetSplitFunc(); sf != nil {
145157
scanner.Split(sf)
146158
}
147159

148160
var scanCloser *ScanCloser
149-
if needClose {
150-
scanCloser = &ScanCloser{scanner, connection}
151-
} else {
152-
scanCloser = &ScanCloser{scanner, nil}
153-
}
161+
scanCloser = &ScanCloser{scanner, connection}
154162

155163
s.wait.Add(1)
156164
go s.scan(scanCloser)
157165
}
158166

159167
func (s *Server) scan(scanCloser *ScanCloser) {
160-
if scanCloser.closer == nil {
161-
// UDP
162-
for scanCloser.Scan() {
163-
s.parser([]byte(scanCloser.Text()))
168+
loop:
169+
for {
170+
select {
171+
case <-s.doneTcp:
172+
break loop
173+
default:
164174
}
165-
} else {
166-
// TCP
167-
loop:
168-
for {
169-
select {
170-
case <-s.doneTcp:
171-
break loop
172-
default:
173-
}
174-
if s.readTimeoutMilliseconds > 0 {
175-
scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond))
176-
}
177-
if scanCloser.Scan() {
178-
s.parser([]byte(scanCloser.Text()))
179-
} else {
180-
break loop
181-
}
175+
if s.readTimeoutMilliseconds > 0 {
176+
scanCloser.closer.SetReadDeadline(time.Now().Add(time.Duration(s.readTimeoutMilliseconds) * time.Millisecond))
177+
}
178+
if scanCloser.Scan() {
179+
s.parser([]byte(scanCloser.Text()))
180+
} else {
181+
break loop
182182
}
183-
scanCloser.closer.Close()
184183
}
184+
scanCloser.closer.Close()
185185

186186
s.wait.Done()
187187
}
@@ -220,6 +220,9 @@ func (s *Server) Kill() error {
220220
if s.doneTcp != nil {
221221
close(s.doneTcp)
222222
}
223+
if s.datagramChannel != nil {
224+
close(s.datagramChannel)
225+
}
223226
return nil
224227
}
225228

@@ -237,3 +240,58 @@ type ScanCloser struct {
237240
*bufio.Scanner
238241
closer TimeoutCloser
239242
}
243+
244+
type DatagramMessage struct {
245+
message []byte
246+
client string
247+
}
248+
249+
func (s *Server) goReceiveDatagrams(connection net.Conn) {
250+
packetconn, ok := connection.(net.PacketConn)
251+
if !ok {
252+
panic("Connection is not a packet connection")
253+
}
254+
s.wait.Add(1)
255+
go func() {
256+
defer s.wait.Done()
257+
for {
258+
buf := make([]byte, 65536)
259+
n, addr, err := packetconn.ReadFrom(buf)
260+
if err == nil {
261+
// Ignore trailing control characters and NULs
262+
for ; (n > 0) && (buf[n-1] < 32); n-- {
263+
}
264+
if n > 0 {
265+
s.datagramChannel <- DatagramMessage{buf[:n], addr.String()}
266+
}
267+
} else {
268+
// there has been an error. Either the server has been killed
269+
// or may be getting a transitory error due to (e.g.) the
270+
// interface being shutdown in which case sleep() to avoid busy wait.
271+
opError, ok := err.(*net.OpError)
272+
if (ok) && !opError.Temporary() && !opError.Timeout() {
273+
return
274+
}
275+
time.Sleep(10 * time.Millisecond)
276+
}
277+
}
278+
}()
279+
}
280+
281+
func (s *Server) goParseDatagrams() {
282+
s.datagramChannel = make(chan DatagramMessage, datagramChannelBufferSize)
283+
284+
s.wait.Add(1)
285+
go func() {
286+
defer s.wait.Done()
287+
for {
288+
select {
289+
case msg, ok := (<-s.datagramChannel):
290+
if !ok {
291+
return
292+
}
293+
s.parser(msg.message)
294+
}
295+
}
296+
}()
297+
}

server_test.go

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -109,30 +109,26 @@ func (c *ConnMock) SetWriteDeadline(t time.Time) error {
109109
}
110110

111111
func (s *ServerSuite) TestConnectionClose(c *C) {
112-
for _, closeConnection := range []bool{true, false} {
113-
handler := new(HandlerMock)
114-
server := NewServer()
115-
server.SetFormat(RFC3164)
116-
server.SetHandler(handler)
117-
con := ConnMock{ReadData: []byte(exampleSyslog)}
118-
server.goScanConnection(&con, closeConnection)
119-
server.Wait()
120-
c.Check(con.isClosed, Equals, closeConnection)
121-
}
112+
handler := new(HandlerMock)
113+
server := NewServer()
114+
server.SetFormat(RFC3164)
115+
server.SetHandler(handler)
116+
con := ConnMock{ReadData: []byte(exampleSyslog)}
117+
server.goScanConnection(&con)
118+
server.Wait()
119+
c.Check(con.isClosed, Equals, true)
122120
}
123121

124122
func (s *ServerSuite) TestConnectionUDPKill(c *C) {
125-
for _, closeConnection := range []bool{true, false} {
126-
handler := new(HandlerMock)
127-
server := NewServer()
128-
server.SetFormat(RFC5424)
129-
server.SetHandler(handler)
130-
con := ConnMock{ReadData: []byte(exampleSyslog)}
131-
server.goScanConnection(&con, closeConnection)
132-
server.Kill()
133-
server.Wait()
134-
c.Check(con.isClosed, Equals, closeConnection)
135-
}
123+
handler := new(HandlerMock)
124+
server := NewServer()
125+
server.SetFormat(RFC5424)
126+
server.SetHandler(handler)
127+
con := ConnMock{ReadData: []byte(exampleSyslog)}
128+
server.goScanConnection(&con)
129+
server.Kill()
130+
server.Wait()
131+
c.Check(con.isClosed, Equals, true)
136132
}
137133

138134
func (s *ServerSuite) TestTcpTimeout(c *C) {
@@ -143,7 +139,7 @@ func (s *ServerSuite) TestTcpTimeout(c *C) {
143139
server.SetTimeout(10)
144140
con := ConnMock{ReadData: []byte(exampleSyslog), ReturnTimeout: true}
145141
c.Check(con.isReadDeadline, Equals, false)
146-
server.goScanConnection(&con, true)
142+
server.goScanConnection(&con)
147143
server.Wait()
148144
c.Check(con.isReadDeadline, Equals, true)
149145
c.Check(handler.LastLogParts, IsNil)

0 commit comments

Comments
 (0)