Skip to content

Commit 03b182c

Browse files
authored
Fix long message handling with dynamic buffer growth (#158)
* Fix long message handling with dynamic buffer growth The socket transport plugin used a fixed 64KB buffer size which caused messages surpassing that size to be truncated For UDP/Unix datagram sockets, this resulted in parsing errors like "unexpected end of input" This change allows the buffer to grow (up to a limit depending on the protocol) to accommodate larger messages. * Create helper functions for TCP connection handling * Add helper sendUDPSocketMessage * Update TestUnixSocketTransport large message test This test verifies the dynamic buffer growth by sending three messages In each iteration the buffer grows from the initial size of 65535 bytes to 3 times the initial size. Also verifies the content of the received message Closes: OSPRH-23826
1 parent 5a4aece commit 03b182c

File tree

2 files changed

+398
-130
lines changed

2 files changed

+398
-130
lines changed

plugins/transport/socket/main.go

Lines changed: 58 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@ import (
2020
)
2121

2222
const (
23-
maxBufferSize = 65535
24-
udp = "udp"
25-
unix = "unix"
26-
tcp = "tcp"
27-
msgLengthSize = 8
23+
maxBufferSize = 65535 // 64KB - initial buffer size for all socket types and max for UDP (OS datagram limit)
24+
maxBufferSizeUnix = 10485760 // 10MB - max buffer size for Unix domain sockets
25+
maxBufferSizeTCP = 104857600 // 100MB - max buffer size for TCP (stream-based, can handle very large messages)
26+
udp = "udp"
27+
unix = "unix"
28+
tcp = "tcp"
29+
msgLengthSize = 8
2830
)
2931

3032
var (
@@ -138,6 +140,17 @@ func (s *Socket) initTCPSocket() *net.TCPListener {
138140
return pc
139141
}
140142

143+
func (s *Socket) getMaxBufferSize() int64 {
144+
switch s.conf.Type {
145+
case udp:
146+
return maxBufferSize
147+
case tcp:
148+
return maxBufferSizeTCP
149+
default:
150+
return maxBufferSizeUnix
151+
}
152+
}
153+
141154
func (s *Socket) WriteTCPMsg(w transport.WriteFn, msgBuffer []byte, n int) (int64, error) {
142155
var pos int64
143156
var length int64
@@ -165,10 +178,13 @@ func (s *Socket) WriteTCPMsg(w transport.WriteFn, msgBuffer []byte, n int) (int6
165178
return pos, nil
166179
}
167180

168-
func (s *Socket) ReceiveData(maxBuffSize int64, done chan bool, pc net.Conn, w transport.WriteFn) {
181+
func (s *Socket) ReceiveData(initialBuffSize int64, done chan bool, pc net.Conn, w transport.WriteFn) {
169182
defer pc.Close()
170-
msgBuffer := make([]byte, maxBuffSize)
183+
currentBuffSize := initialBuffSize
184+
maxBuffSize := s.getMaxBufferSize()
185+
msgBuffer := make([]byte, currentBuffSize)
171186
var remainingMsg []byte
187+
172188
for {
173189
n, err := pc.Read(msgBuffer)
174190
if err != nil || n < 1 {
@@ -180,17 +196,40 @@ func (s *Socket) ReceiveData(maxBuffSize int64, done chan bool, pc net.Conn, w t
180196
}
181197
return
182198
}
183-
msgBuffer = append(remainingMsg, msgBuffer...)
184199

185-
// whole buffer was used, so we are potentially handling larger message
186-
if n == len(msgBuffer) {
187-
s.logger.Warnf("full read buffer used")
200+
// Combine remaining data from previous iteration with newly read data
201+
var data []byte
202+
if len(remainingMsg) > 0 {
203+
data = make([]byte, len(remainingMsg)+n)
204+
copy(data, remainingMsg)
205+
copy(data[len(remainingMsg):], msgBuffer[:n])
206+
} else {
207+
data = msgBuffer[:n]
208+
}
209+
totalSize := len(data)
210+
211+
// Check if buffer was completely filled - message may have been truncated
212+
if n == int(currentBuffSize) {
213+
if s.conf.Type == tcp {
214+
s.logger.Debugf("full read buffer used (%d bytes), TCP will handle continuation if needed", n)
215+
} else {
216+
// For UDP/Unix sockets, buffer being full means message was likely truncated
217+
if currentBuffSize < maxBuffSize {
218+
newSize := currentBuffSize * 2
219+
if newSize > maxBuffSize {
220+
newSize = maxBuffSize
221+
}
222+
s.logger.Warnf("message may have been truncated (buffer filled with %d bytes), growing buffer from %d to %d bytes for next message", currentBuffSize, currentBuffSize, newSize)
223+
currentBuffSize = newSize
224+
msgBuffer = make([]byte, currentBuffSize)
225+
} else {
226+
s.logger.Errorf(nil, "message truncated: buffer size (%d bytes) exceeded for %s socket and already at maximum buffer size (%d bytes)", currentBuffSize, s.conf.Type, maxBuffSize)
227+
}
228+
}
188229
}
189-
190-
n += len(remainingMsg)
191230

192231
if s.conf.DumpMessages.Enabled {
193-
_, err := s.dumpBuf.Write(msgBuffer[:n])
232+
_, err := s.dumpBuf.Write(data)
194233
if err != nil {
195234
s.logger.Errorf(err, "writing to dump buffer")
196235
}
@@ -202,16 +241,17 @@ func (s *Socket) ReceiveData(maxBuffSize int64, done chan bool, pc net.Conn, w t
202241
}
203242

204243
if s.conf.Type == tcp {
205-
parsed, err := s.WriteTCPMsg(w, msgBuffer, n)
244+
parsed, err := s.WriteTCPMsg(w, data, totalSize)
206245
if err != nil {
207246
s.logger.Errorf(err, "error, while parsing messages")
208247
return
209248
}
210-
remainingMsg = make([]byte, int64(n)-parsed)
211-
copy(remainingMsg, msgBuffer[parsed:n])
249+
remainingMsg = make([]byte, int64(totalSize)-parsed)
250+
copy(remainingMsg, data[parsed:totalSize])
212251
} else {
213-
w(msgBuffer[:n])
252+
w(data)
214253
msgCount++
254+
remainingMsg = nil
215255
}
216256
}
217257
}

0 commit comments

Comments
 (0)