@@ -20,11 +20,14 @@ import (
2020)
2121
2222const (
23- maxBufferSize = 65535
24- udp = "udp"
25- unix = "unix"
26- tcp = "tcp"
27- msgLengthSize = 8
23+ defaultBufferSize = 65535
24+ minBufferSize = 1024
25+ maxBufferSize = 10485760 // 10MB - max configurable buffer size
26+ maxMessageSize = 104857600 // 100MB - max single message size for TCP
27+ udp = "udp"
28+ unix = "unix"
29+ tcp = "tcp"
30+ msgLengthSize = 8
2831)
2932
3033var (
@@ -42,6 +45,7 @@ type configT struct {
4245 Path string `validate:"required_without=Socketaddr"`
4346 Type string
4447 Socketaddr string `validate:"required_without=Path"`
48+ BufferSize int64 `yaml:"bufferSize"` // read buffer size in bytes (default: 65535, min: 1024, max: 10485760)
4549 DumpMessages struct {
4650 Enabled bool
4751 Path string
@@ -180,17 +184,66 @@ func (s *Socket) ReceiveData(maxBuffSize int64, done chan bool, pc net.Conn, w t
180184 }
181185 return
182186 }
183- msgBuffer = append (remainingMsg , msgBuffer ... )
187+
188+ // Combine remaining data from previous iteration with newly read data
189+ var data []byte
190+ if len (remainingMsg ) > 0 {
191+ data = append (remainingMsg , msgBuffer [:n ]... )
192+ } else {
193+ data = msgBuffer [:n ]
194+ }
195+ totalSize := len (data )
184196
185197 // whole buffer was used, so we are potentially handling larger message
186- if n == len (msgBuffer ) {
187- s .logger .Warnf ("full read buffer used" )
198+ if n == int (maxBuffSize ) {
199+ if s .conf .Type == tcp {
200+ s .logger .Debugf ("full read buffer used (%d bytes read), will read more if needed" , n )
201+ } else {
202+ // For UDP/Unix sockets, this means the message was truncated by the OS
203+ s .logger .Errorf (nil , "message truncated: buffer size (%d bytes) exceeded for %s socket - increase bufferSize configuration or message will be incomplete" , maxBuffSize , s .conf .Type )
204+ }
188205 }
189206
190- n += len (remainingMsg )
207+ // For TCP, check if we need to read more data for a large message
208+ if s .conf .Type == tcp && len (data ) >= msgLengthSize {
209+ // Peek at the message length header
210+ reader := bytes .NewReader (data [:msgLengthSize ])
211+ var msgLength int64
212+ err := binary .Read (reader , binary .LittleEndian , & msgLength )
213+ if err == nil && msgLength > 0 {
214+ // Validate message size
215+ if msgLength > maxMessageSize {
216+ s .logger .Errorf (nil , "rejecting TCP message: size (%d bytes) exceeds maximum allowed (%d bytes)" , msgLength , maxMessageSize )
217+ return
218+ }
219+ requiredSize := msgLengthSize + int (msgLength )
220+ // If the message is larger than our current data, read more
221+ if requiredSize > len (data ) {
222+ s .logger .Debugf ("large TCP message detected, size: %d bytes, reading more data" , msgLength )
223+ // Read additional data until we have the complete message
224+ additionalNeeded := requiredSize - len (data )
225+ for additionalNeeded > 0 {
226+ readSize := maxBuffSize
227+ if int64 (additionalNeeded ) < maxBuffSize {
228+ readSize = int64 (additionalNeeded )
229+ }
230+ tempBuf := make ([]byte , readSize )
231+ n , err := pc .Read (tempBuf )
232+ if err != nil || n < 1 {
233+ s .logger .Errorf (err , "failed to read continuation of large TCP message (expected %d more bytes)" , additionalNeeded )
234+ return
235+ }
236+ data = append (data , tempBuf [:n ]... )
237+ additionalNeeded = requiredSize - len (data )
238+ }
239+ totalSize = len (data )
240+ s .logger .Debugf ("completed reading large TCP message, total size: %d bytes" , totalSize )
241+ }
242+ }
243+ }
191244
192245 if s .conf .DumpMessages .Enabled {
193- _ , err := s .dumpBuf .Write (msgBuffer [: n ] )
246+ _ , err := s .dumpBuf .Write (data )
194247 if err != nil {
195248 s .logger .Errorf (err , "writing to dump buffer" )
196249 }
@@ -202,22 +255,29 @@ func (s *Socket) ReceiveData(maxBuffSize int64, done chan bool, pc net.Conn, w t
202255 }
203256
204257 if s .conf .Type == tcp {
205- parsed , err := s .WriteTCPMsg (w , msgBuffer , n )
258+ parsed , err := s .WriteTCPMsg (w , data , totalSize )
206259 if err != nil {
207260 s .logger .Errorf (err , "error, while parsing messages" )
208261 return
209262 }
210- remainingMsg = make ([]byte , int64 (n )- parsed )
211- copy (remainingMsg , msgBuffer [parsed :n ])
263+ remainingMsg = make ([]byte , int64 (totalSize )- parsed )
264+ copy (remainingMsg , data [parsed :totalSize ])
212265 } else {
213- w (msgBuffer [: n ] )
266+ w (data )
214267 msgCount ++
268+ remainingMsg = nil
215269 }
216270 }
217271}
218272
219273// Run implements type Transport
220274func (s * Socket ) Run (ctx context.Context , w transport.WriteFn , done chan bool ) {
275+ // Use default buffer size if not configured (e.g., in tests)
276+ bufferSize := s .conf .BufferSize
277+ if bufferSize == 0 {
278+ bufferSize = defaultBufferSize
279+ }
280+
221281 var pc net.Conn
222282 switch s .conf .Type {
223283 case udp :
@@ -226,7 +286,7 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
226286 s .logger .Errorf (nil , "Failed to initialize socket transport plugin with type: " + s .conf .Type )
227287 return
228288 }
229- go s .ReceiveData (maxBufferSize , done , pc , w )
289+ go s .ReceiveData (bufferSize , done , pc , w )
230290
231291 case tcp :
232292 TCPSocket := s .initTCPSocket ()
@@ -246,7 +306,7 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
246306 continue
247307 }
248308 }
249- go s .ReceiveData (maxBufferSize , done , pc , w )
309+ go s .ReceiveData (bufferSize , done , pc , w )
250310 }
251311 }()
252312 case unix :
@@ -257,7 +317,7 @@ func (s *Socket) Run(ctx context.Context, w transport.WriteFn, done chan bool) {
257317 s .logger .Errorf (nil , "Failed to initialize socket transport plugin with type: " + s .conf .Type )
258318 return
259319 }
260- go s .ReceiveData (maxBufferSize , done , pc , w )
320+ go s .ReceiveData (bufferSize , done , pc , w )
261321 }
262322
263323 for {
@@ -291,14 +351,23 @@ func (s *Socket) Config(c []byte) error {
291351 }{
292352 Path : "/dev/stdout" ,
293353 },
294- Type : unix ,
354+ Type : unix ,
355+ BufferSize : defaultBufferSize ,
295356 }
296357
297358 err := config .ParseConfig (bytes .NewReader (c ), & s .conf )
298359 if err != nil {
299360 return err
300361 }
301362
363+ // Validate buffer size
364+ if s .conf .BufferSize < minBufferSize {
365+ return fmt .Errorf ("bufferSize must be at least %d bytes, got %d" , minBufferSize , s .conf .BufferSize )
366+ }
367+ if s .conf .BufferSize > maxBufferSize {
368+ return fmt .Errorf ("bufferSize must be at most %d bytes, got %d" , maxBufferSize , s .conf .BufferSize )
369+ }
370+
302371 if s .conf .DumpMessages .Enabled {
303372 s .dumpFile , err = os .OpenFile (s .conf .DumpMessages .Path , os .O_RDWR | os .O_CREATE | os .O_APPEND , 0666 )
304373 if err != nil {
0 commit comments