From 21fd2908463d26865335a19bdd2a737b07b0289f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 1 Nov 2019 14:01:51 +0000 Subject: [PATCH] handle timeouts using i/o deadlines. Both initiator and responder now time out if reads and/or writes are blocked for 30 seconds (default negotiation timeout). Introduces two tests to validate the new behaviour. Fixes #47. --- client.go | 58 +++++++++++++++++++++++++++++---------------- multistream.go | 12 ++++++++++ multistream_test.go | 43 +++++++++++++++++++++++++++++++++ timeout.go | 32 +++++++++++++++++++++++++ 4 files changed, 124 insertions(+), 21 deletions(-) create mode 100644 timeout.go diff --git a/client.go b/client.go index 02e096e..922bb9e 100644 --- a/client.go +++ b/client.go @@ -19,27 +19,12 @@ var ErrNoProtocols = errors.New("no protocols specified") // on this ReadWriteCloser. It returns an error if, for example, // the muxer does not know how to handle this protocol. func SelectProtoOrFail(proto string, rwc io.ReadWriteCloser) error { - errCh := make(chan error, 1) - go func() { - var buf bytes.Buffer - delimWrite(&buf, []byte(ProtocolID)) - delimWrite(&buf, []byte(proto)) - _, err := io.Copy(rwc, &buf) - errCh <- err - }() - // We have to read *both* errors. - err1 := readMultistreamHeader(rwc) - err2 := readProto(proto, rwc) - if werr := <-errCh; werr != nil { - return werr - } - if err1 != nil { - return err1 - } - if err2 != nil { - return err2 + if clearFn, err := setDeadline(rwc); err != nil { + return err + } else { + defer clearFn() } - return nil + return selectSingleProtocol(proto, rwc) } // SelectOneOf will perform handshakes with the protocols on the given slice @@ -49,12 +34,18 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) { return "", ErrNoProtocols } + if clearFn, err := setDeadline(rwc); err != nil { + return "", err + } else { + defer clearFn() + } + // Use SelectProtoOrFail to pipeline the /multistream/1.0.0 handshake // with an attempt to negotiate the first protocol. If that fails, we // can continue negotiating the rest of the protocols normally. // // This saves us a round trip. - switch err := SelectProtoOrFail(protos[0], rwc); err { + switch err := selectSingleProtocol(protos[0], rwc); err { case nil: return protos[0], nil case ErrNotSupported: // try others @@ -74,6 +65,31 @@ func SelectOneOf(protos []string, rwc io.ReadWriteCloser) (string, error) { return "", ErrNotSupported } +// selectSingleProtocol attempts to select a single protocol. +func selectSingleProtocol(proto string, rwc io.ReadWriteCloser) error { + errCh := make(chan error, 1) + go func() { + var buf bytes.Buffer + delimWrite(&buf, []byte(ProtocolID)) + delimWrite(&buf, []byte(proto)) + _, err := io.Copy(rwc, &buf) + errCh <- err + }() + // We have to read *both* errors. + err1 := readMultistreamHeader(rwc) + err2 := readProto(proto, rwc) + if werr := <-errCh; werr != nil { + return werr + } + if err1 != nil { + return err1 + } + if err2 != nil { + return err2 + } + return nil +} + func handshake(rw io.ReadWriter) error { errCh := make(chan error, 1) go func() { diff --git a/multistream.go b/multistream.go index ccfb336..49569b8 100644 --- a/multistream.go +++ b/multistream.go @@ -205,6 +205,12 @@ func (msm *MultistreamMuxer) NegotiateLazy(rwc io.ReadWriteCloser) (io.ReadWrite writeErr := make(chan error, 1) defer close(pval) + if clearFn, err := setDeadline(rwc); err != nil { + return nil, "", nil, err + } else { + defer clearFn() + } + lzc := &lazyServerConn{ con: rwc, } @@ -292,6 +298,12 @@ loop: // Negotiate performs protocol selection and returns the protocol name and // the matching handler function for it (or an error). func (msm *MultistreamMuxer) Negotiate(rwc io.ReadWriteCloser) (string, HandlerFunc, error) { + if clearFn, err := setDeadline(rwc); err != nil { + return "", nil, err + } else { + defer clearFn() + } + // Send our protocol ID err := delimWriteBuffered(rwc, []byte(ProtocolID)) if err != nil { diff --git a/multistream_test.go b/multistream_test.go index 58b2cff..4d1e752 100644 --- a/multistream_test.go +++ b/multistream_test.go @@ -7,6 +7,7 @@ import ( "io" "net" "sort" + "strings" "testing" "time" ) @@ -684,3 +685,45 @@ func TestNegotiateFail(t *testing.T) { t.Fatal("got wrong protocol") } } + +func TestInitiatorTimeout(t *testing.T) { + a, _ := newPipe(t) + + var old time.Duration + old, NegotiationTimeout = NegotiationTimeout, 1*time.Second + defer func() { NegotiationTimeout = old }() + + mux := NewMultistreamMuxer() + mux.AddHandler("/a", func(p string, rwc io.ReadWriteCloser) error { + t.Error("shouldnt execute this handler") + return nil + }) + + ch := make(chan error) + go func() { + defer close(ch) + err := SelectProtoOrFail("/a", a) + ch <- err + }() + + // nothing is reading from b. + + if err := <-ch; !strings.Contains(err.Error(), "i/o timeout") { + t.Fatal("expected a timeout error") + } +} + +func TestResponderTimeout(t *testing.T) { + _, b := newPipe(t) + + var old time.Duration + old, NegotiationTimeout = NegotiationTimeout, 1*time.Second + defer func() { NegotiationTimeout = old }() + + mux := NewMultistreamMuxer() + // nothing is sending from a. + err := mux.Handle(b) + if !strings.Contains(err.Error(), "i/o timeout") { + t.Fatal("expected a timeout error") + } +} diff --git a/timeout.go b/timeout.go new file mode 100644 index 0000000..a685094 --- /dev/null +++ b/timeout.go @@ -0,0 +1,32 @@ +package multistream + +import ( + "fmt" + "io" + "time" +) + +// NegotiationTimeout is the maximum time a protocol negotiation atempt is +// allowed to be inflight before it fails. +var NegotiationTimeout = 30 * time.Second + +// setDeadline attempts to set a read and write deadline on the underlying IO +// object, if it supports it. +func setDeadline(rwc io.ReadWriteCloser) (func(), error) { + // rwc could be: + // - a net.Conn or a libp2p Stream, both of which satisfy this interface. + // - something else (e.g. testing), in which case we skip over setting + // a deadline. + type deadline interface { + SetDeadline(time.Time) error + } + if d, ok := rwc.(deadline); ok { + if err := d.SetDeadline(time.Now().Add(NegotiationTimeout)); err != nil { + // this should not happen; if it does, something is broken and we + // should fail immediately. + return nil, fmt.Errorf("failed while setting a deadline: %w", err) + } + return func() { d.SetDeadline(time.Time{}) }, nil + } + return func() {}, nil +}