Skip to content

Commit f102312

Browse files
authored
Halfpipe Timeout Handling (#152)
* reset conn timeouts on a write in either direction * lint fixes
1 parent b12cc34 commit f102312

File tree

2 files changed

+168
-0
lines changed

2 files changed

+168
-0
lines changed

application/lib/proxies.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,19 @@ func halfPipe(src net.Conn, dst net.Conn,
105105
}
106106
break
107107
}
108+
109+
// refresh stall timeout - set both because it only happens on write
110+
// so if connection is sending traffic unidirectionally we prevent
111+
// the receiving side from timing out.
112+
err := src.SetDeadline(time.Now().Add(proxyStallTimeout))
113+
if err != nil {
114+
logger.Errorln("error setting deadline for src conn: ", tag)
115+
}
116+
err = dst.SetDeadline(time.Now().Add(proxyStallTimeout))
117+
if err != nil {
118+
logger.Errorln("error setting deadline for dst conn: ", tag)
119+
}
120+
108121
}
109122
return totWritten, err
110123

application/lib/proxies_test.go

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,17 @@ package lib
77
import (
88
"bytes"
99
"errors"
10+
"fmt"
11+
"io"
1012
"net"
1113
"os"
1214
"sync"
1315
"syscall"
1416
"testing"
1517
"time"
1618

19+
"github.com/stretchr/testify/require"
20+
1721
"github.com/refraction-networking/conjure/application/log"
1822
)
1923

@@ -114,3 +118,154 @@ func (m *mockConn) SetReadDeadline(t time.Time) error {
114118
func (m *mockConn) SetWriteDeadline(t time.Time) error {
115119
return errNotExist
116120
}
121+
122+
func TestHalfpipeDeadlineEcho(t *testing.T) {
123+
if os.Getenv("HALFPIPE") == "" {
124+
t.Skip("Skipping slow tests involving halfpipe timeouts")
125+
}
126+
127+
clientClient, clientStation := net.Pipe()
128+
stationCovert, covertCovert := net.Pipe()
129+
130+
logger := log.New(os.Stdout, "", 0)
131+
logger.SetLevel(log.TraceLevel)
132+
wg := sync.WaitGroup{}
133+
oncePrintErr := sync.Once{}
134+
wg.Add(2)
135+
136+
go halfPipe(clientStation, stationCovert, &wg, &oncePrintErr, logger, "Up "+"XXXXXX")
137+
go halfPipe(stationCovert, clientStation, &wg, &oncePrintErr, logger, "Down "+"XXXXXX")
138+
139+
go func() {
140+
defer covertCovert.Close()
141+
_, _ = io.Copy(covertCovert, covertCovert)
142+
}()
143+
144+
start := time.Now()
145+
for i := 0; i < 10; i++ {
146+
147+
_, err := clientClient.Write([]byte(fmt.Sprintf("%d", i)))
148+
if err != nil {
149+
t.Fatalf("received '%v' at client", err)
150+
}
151+
152+
b := make([]byte, 10)
153+
n, err := clientClient.Read(b)
154+
if errors.Is(err, io.EOF) {
155+
t.Fatalf("received EOF at client")
156+
} else if e, ok := err.(net.Error); ok && e.Timeout() {
157+
t.Fatalf("received Timeout at client")
158+
} else if err != nil {
159+
t.Fatalf("received '%v' at client", err)
160+
}
161+
162+
t.Logf("%s, %d - %s", time.Since(start), n, string(b))
163+
164+
time.Sleep(4 * time.Second)
165+
}
166+
167+
clientClient.Close()
168+
wg.Wait()
169+
}
170+
171+
func TestHalfpipeDeadlineUpload(t *testing.T) {
172+
if os.Getenv("HALFPIPE") == "" {
173+
t.Skip("Skipping slow tests involving halfpipe timeouts")
174+
}
175+
176+
clientClient, clientStation := net.Pipe()
177+
stationCovert, covertCovert := net.Pipe()
178+
179+
logger := log.New(os.Stdout, "", 0)
180+
logger.SetLevel(log.TraceLevel)
181+
wg := sync.WaitGroup{}
182+
oncePrintErr := sync.Once{}
183+
wg.Add(2)
184+
185+
go halfPipe(clientStation, stationCovert, &wg, &oncePrintErr, logger, "Up "+"XXXXXX")
186+
go halfPipe(stationCovert, clientStation, &wg, &oncePrintErr, logger, "Down "+"XXXXXX")
187+
188+
go func() {
189+
defer covertCovert.Close()
190+
_, _ = io.Copy(io.Discard, covertCovert)
191+
}()
192+
193+
start := time.Now()
194+
for i := 0; i < 10; i++ {
195+
196+
n, err := clientClient.Write([]byte(fmt.Sprintf("%d", i)))
197+
if errors.Is(err, io.EOF) {
198+
t.Fatalf("received EOF at client")
199+
} else if e, ok := err.(net.Error); ok && e.Timeout() {
200+
t.Fatalf("received Timeout at client")
201+
} else if err != nil {
202+
t.Fatalf("received '%v' at client", err)
203+
}
204+
205+
t.Logf("%s, %d %d", time.Since(start), n, i)
206+
207+
time.Sleep(4 * time.Second)
208+
}
209+
210+
clientClient.Close()
211+
covertCovert.Close()
212+
wg.Wait()
213+
}
214+
215+
// Test that we actually timeout after one side (client) stalls too long.
216+
func TestHalfpipeDeadlineActual(t *testing.T) {
217+
if os.Getenv("HALFPIPE") == "" {
218+
t.Skip("Skipping slow tests involving halfpipe timeouts")
219+
}
220+
221+
clientClient, clientStation := net.Pipe()
222+
stationCovert, covertCovert := net.Pipe()
223+
224+
logger := log.New(os.Stdout, "", 0)
225+
logger.SetLevel(log.TraceLevel)
226+
wg := sync.WaitGroup{}
227+
oncePrintErr := sync.Once{}
228+
wg.Add(2)
229+
230+
go halfPipe(clientStation, stationCovert, &wg, &oncePrintErr, logger, "Up "+"XXXXXX")
231+
go halfPipe(stationCovert, clientStation, &wg, &oncePrintErr, logger, "Down "+"XXXXXX")
232+
233+
var serverErr error
234+
go func() {
235+
defer covertCovert.Close()
236+
for {
237+
b := make([]byte, 10)
238+
_, serverErr = covertCovert.Read(b)
239+
if serverErr != nil {
240+
return
241+
}
242+
}
243+
}()
244+
245+
start := time.Now()
246+
for i := 0; i < 3; i++ {
247+
248+
n, err := clientClient.Write([]byte(fmt.Sprintf("%d", i)))
249+
if errors.Is(err, io.EOF) {
250+
t.Fatalf("received EOF at client")
251+
} else if e, ok := err.(net.Error); ok && e.Timeout() {
252+
t.Fatalf("received Timeout at client")
253+
} else if err != nil {
254+
t.Fatalf("received '%v' at client", err)
255+
}
256+
257+
t.Logf("%s, %d %d", time.Since(start), n, i)
258+
259+
time.Sleep(4 * time.Second)
260+
}
261+
262+
// sleep 27 + 4 = 31 > proxyStallTimeout
263+
time.Sleep(27 * time.Second)
264+
265+
// covertStation will Timeout and send an EOF to covertCovert
266+
require.ErrorIs(t, io.EOF, serverErr)
267+
268+
clientClient.Close()
269+
covertCovert.Close()
270+
wg.Wait()
271+
}

0 commit comments

Comments
 (0)