Skip to content

Commit fc30187

Browse files
committed
Create helper functions for TCP connection handling
1 parent 0235de5 commit fc30187

File tree

1 file changed

+41
-92
lines changed

1 file changed

+41
-92
lines changed

plugins/transport/socket/main_test.go

Lines changed: 41 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,27 @@ func TestUdpSocketTransport(t *testing.T) {
365365
})
366366
}
367367

368+
// Helper function to connect to TCP with retries
369+
func connectTCPWithRetry(t *testing.T, addr string) net.Conn {
370+
wskt, err := net.Dial("tcp", addr)
371+
if err != nil {
372+
for retries := 0; err != nil && retries < 3; retries++ {
373+
time.Sleep(500 * time.Millisecond)
374+
wskt, err = net.Dial("tcp", addr)
375+
}
376+
}
377+
require.NoError(t, err)
378+
return wskt
379+
}
380+
381+
// Helper function to create a TCP message with length header
382+
func createTCPMessage(t *testing.T, content []byte) []byte {
383+
msgLength := new(bytes.Buffer)
384+
err := binary.Write(msgLength, binary.LittleEndian, uint64(len(content)))
385+
require.NoError(t, err)
386+
return append(msgLength.Bytes(), content...)
387+
}
388+
368389
func TestTcpSocketTransport(t *testing.T) {
369390
tmpdir, err := os.MkdirTemp(".", "socket_test_tmp")
370391
require.NoError(t, err)
@@ -393,11 +414,7 @@ func TestTcpSocketTransport(t *testing.T) {
393414
marker := []byte("--TCP-END--")
394415
copy(msgContent[len(msgContent)-len(marker):], marker)
395416

396-
// Prepend length header for TCP
397-
msgLength := new(bytes.Buffer)
398-
err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msgContent)))
399-
require.NoError(t, err)
400-
fullMsg := append(msgLength.Bytes(), msgContent...)
417+
fullMsg := createTCPMessage(t, msgContent)
401418

402419
// Setup message verification
403420
ctx, cancel := context.WithCancel(context.Background())
@@ -414,15 +431,7 @@ func TestTcpSocketTransport(t *testing.T) {
414431
time.Sleep(100 * time.Millisecond)
415432

416433
// Connect and send
417-
wskt, err := net.Dial("tcp", "127.0.0.1:8660")
418-
if err != nil {
419-
for retries := 0; err != nil && retries < 3; retries++ {
420-
time.Sleep(500 * time.Millisecond)
421-
wskt, err = net.Dial("tcp", "127.0.0.1:8660")
422-
}
423-
}
424-
require.NoError(t, err)
425-
434+
wskt := connectTCPWithRetry(t, "127.0.0.1:8660")
426435
_, err = wskt.Write(fullMsg)
427436
require.NoError(t, err)
428437

@@ -452,11 +461,7 @@ func TestTcpSocketTransport(t *testing.T) {
452461
marker := []byte("--LARGE-TCP--")
453462
copy(msgContent[len(msgContent)-len(marker):], marker)
454463

455-
// Prepend length header for TCP
456-
msgLength := new(bytes.Buffer)
457-
err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msgContent)))
458-
require.NoError(t, err)
459-
fullMsg := append(msgLength.Bytes(), msgContent...)
464+
fullMsg := createTCPMessage(t, msgContent)
460465

461466
// Setup message verification
462467
ctx, cancel := context.WithCancel(context.Background())
@@ -474,15 +479,7 @@ func TestTcpSocketTransport(t *testing.T) {
474479
time.Sleep(100 * time.Millisecond)
475480

476481
// Connect and send
477-
wskt, err := net.Dial("tcp", "127.0.0.1:8661")
478-
if err != nil {
479-
for retries := 0; err != nil && retries < 3; retries++ {
480-
time.Sleep(500 * time.Millisecond)
481-
wskt, err = net.Dial("tcp", "127.0.0.1:8661")
482-
}
483-
}
484-
require.NoError(t, err)
485-
482+
wskt := connectTCPWithRetry(t, "127.0.0.1:8661")
486483
_, err = wskt.Write(fullMsg)
487484
require.NoError(t, err)
488485

@@ -512,11 +509,7 @@ func TestTcpSocketTransport(t *testing.T) {
512509
marker := []byte("--MEGA-TCP--")
513510
copy(msgContent[len(msgContent)-len(marker):], marker)
514511

515-
// Prepend length header for TCP
516-
msgLength := new(bytes.Buffer)
517-
err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msgContent)))
518-
require.NoError(t, err)
519-
fullMsg := append(msgLength.Bytes(), msgContent...)
512+
fullMsg := createTCPMessage(t, msgContent)
520513

521514
// Setup message verification
522515
ctx, cancel := context.WithCancel(context.Background())
@@ -534,15 +527,7 @@ func TestTcpSocketTransport(t *testing.T) {
534527
time.Sleep(100 * time.Millisecond)
535528

536529
// Connect and send
537-
wskt, err := net.Dial("tcp", "127.0.0.1:8662")
538-
if err != nil {
539-
for retries := 0; err != nil && retries < 3; retries++ {
540-
time.Sleep(500 * time.Millisecond)
541-
wskt, err = net.Dial("tcp", "127.0.0.1:8662")
542-
}
543-
}
544-
require.NoError(t, err)
545-
530+
wskt := connectTCPWithRetry(t, "127.0.0.1:8662")
546531
_, err = wskt.Write(fullMsg)
547532
require.NoError(t, err)
548533

@@ -574,13 +559,7 @@ func TestTcpSocketTransport(t *testing.T) {
574559
for j := 0; j < messageSizes[i]; j++ {
575560
msgContent[j] = fillByte
576561
}
577-
578-
// Write length header
579-
msgLength := new(bytes.Buffer)
580-
err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msgContent)))
581-
require.NoError(t, err)
582-
combinedMsg.Write(msgLength.Bytes())
583-
combinedMsg.Write(msgContent)
562+
combinedMsg.Write(createTCPMessage(t, msgContent))
584563
}
585564

586565
// Setup message verification
@@ -621,15 +600,7 @@ func TestTcpSocketTransport(t *testing.T) {
621600
time.Sleep(100 * time.Millisecond)
622601

623602
// Connect and send all messages
624-
wskt, err := net.Dial("tcp", "127.0.0.1:8663")
625-
if err != nil {
626-
for retries := 0; err != nil && retries < 3; retries++ {
627-
time.Sleep(500 * time.Millisecond)
628-
wskt, err = net.Dial("tcp", "127.0.0.1:8663")
629-
}
630-
}
631-
require.NoError(t, err)
632-
603+
wskt := connectTCPWithRetry(t, "127.0.0.1:8663")
633604
_, err = wskt.Write(combinedMsg.Bytes())
634605
require.NoError(t, err)
635606

@@ -655,16 +626,13 @@ func TestTcpSocketTransport(t *testing.T) {
655626
},
656627
}
657628

658-
msg := make([]byte, regularBuffSize)
629+
msgContent := make([]byte, regularBuffSize)
659630
for i := 0; i < regularBuffSize; i++ {
660-
msg[i] = byte('X')
631+
msgContent[i] = byte('X')
661632
}
662-
msg[regularBuffSize-1] = byte('$')
663-
msg = append(msg, []byte(addition)...)
664-
msgLength := new(bytes.Buffer)
665-
err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msg)))
666-
require.NoError(t, err)
667-
msg = append(msgLength.Bytes(), msg...)
633+
msgContent[regularBuffSize-1] = byte('$')
634+
msgContent = append(msgContent, []byte(addition)...)
635+
msg := createTCPMessage(t, msgContent)
668636

669637
// verify transport
670638
ctx, cancel := context.WithCancel(context.Background())
@@ -681,15 +649,7 @@ func TestTcpSocketTransport(t *testing.T) {
681649
time.Sleep(100 * time.Millisecond)
682650

683651
// write to socket
684-
wskt, err := net.Dial("tcp", "127.0.0.1:8664")
685-
if err != nil {
686-
// The socket might not be listening yet, wait a little bit and try to connect again
687-
for retries := 0; err != nil && retries < 3; retries++ {
688-
time.Sleep(500 * time.Millisecond)
689-
wskt, err = net.Dial("tcp", "127.0.0.1:8664")
690-
}
691-
}
692-
require.NoError(t, err)
652+
wskt := connectTCPWithRetry(t, "127.0.0.1:8664")
693653
_, err = wskt.Write(msg)
694654
require.NoError(t, err)
695655

@@ -710,16 +670,13 @@ func TestTcpSocketTransport(t *testing.T) {
710670
},
711671
}
712672

713-
msg := make([]byte, regularBuffSize)
673+
msgContent := make([]byte, regularBuffSize)
714674
for i := 0; i < regularBuffSize; i++ {
715-
msg[i] = byte('X')
675+
msgContent[i] = byte('X')
716676
}
717-
msg[regularBuffSize-1] = byte('$')
718-
msg = append(msg, []byte(addition)...)
719-
msgLength := new(bytes.Buffer)
720-
err := binary.Write(msgLength, binary.LittleEndian, uint64(len(msg)))
721-
require.NoError(t, err)
722-
msg = append(msgLength.Bytes(), msg...)
677+
msgContent[regularBuffSize-1] = byte('$')
678+
msgContent = append(msgContent, []byte(addition)...)
679+
msg := createTCPMessage(t, msgContent)
723680

724681
// verify transport
725682
ctx, cancel := context.WithCancel(context.Background())
@@ -736,15 +693,7 @@ func TestTcpSocketTransport(t *testing.T) {
736693
time.Sleep(100 * time.Millisecond)
737694

738695
// write to socket
739-
wskt1, err := net.Dial("tcp", "127.0.0.1:8665")
740-
if err != nil {
741-
// The socket might not be listening yet, wait a little bit and try to connect again
742-
for retries := 0; err != nil && retries < 3; retries++ {
743-
time.Sleep(500 * time.Millisecond)
744-
wskt1, err = net.Dial("tcp", "127.0.0.1:8665")
745-
}
746-
}
747-
require.NoError(t, err)
696+
wskt1 := connectTCPWithRetry(t, "127.0.0.1:8665")
748697

749698
// We shouldn't need to retry the second connection, if this fails, then something is wrong
750699
wskt2, err := net.Dial("tcp", "127.0.0.1:8665")

0 commit comments

Comments
 (0)