Skip to content

Commit 3b55161

Browse files
authored
fix loss of order of messages (#426)
* fix loss of order of messages using goPool causes loss of order of messages. For example coap notfications comes in order, but goroutines from gopool are executed underministic. - WithGoPool was removed - WithProcessReceivedMessageFunc to modify core cc.ProcessReceivedMessage
1 parent 01faa43 commit 3b55161

25 files changed

+996
-470
lines changed

dtls/client_test.go

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -304,9 +304,8 @@ func TestConnPost(t *testing.T) {
304304
buf, errH := io.ReadAll(r.Body())
305305
require.NoError(t, errH)
306306
assert.Len(t, buf, 7000)
307-
308-
err = w.SetResponse(codes.BadRequest, message.TextPlain, bytes.NewReader(make([]byte, 5330)))
309-
require.NoError(t, err)
307+
errH = w.SetResponse(codes.BadRequest, message.TextPlain, bytes.NewReader(make([]byte, 5330)))
308+
require.NoError(t, errH)
310309
require.NotEmpty(t, w.Conn())
311310
}))
312311
require.NoError(t, err)
@@ -782,44 +781,29 @@ func TestClientKeepAliveMonitor(t *testing.T) {
782781

783782
ld, err := coapNet.NewDTLSListener("udp4", "", serverCgf)
784783
require.NoError(t, err)
785-
defer func() {
786-
errC := ld.Close()
787-
require.NoError(t, errC)
788-
}()
789784

790-
checkClose := semaphore.NewWeighted(2)
791-
err = checkClose.Acquire(ctx, 2)
785+
checkClose := semaphore.NewWeighted(1)
786+
err = checkClose.Acquire(ctx, 1)
792787
require.NoError(t, err)
793-
sd := dtls.NewServer(
794-
options.WithOnNewConn(func(cc *client.Conn) {
795-
t.Log("server - new connection")
796-
cc.AddOnClose(func() {
797-
t.Log("server - client is closed")
798-
checkClose.Release(1)
799-
})
800-
}),
801-
options.WithGoPool(func(processReqFunc config.ProcessRequestFunc[*client.Conn], req *pool.Message, cc *client.Conn, handler config.HandlerFunc[*client.Conn]) error {
802-
time.Sleep(time.Millisecond * 500)
803-
processReqFunc(req, cc, handler)
804-
return nil
805-
}),
806-
options.WithPeriodicRunner(periodic.New(ctx.Done(), time.Millisecond*10)),
807-
options.WithInactivityMonitor(Timeout/2, func(c *client.Conn) {
808-
t.Log("server - close for inactivity")
809-
_ = c.Close()
810-
}),
811-
)
812788

813789
var serverWg sync.WaitGroup
814790
serverWg.Add(1)
815791
go func() {
816792
defer serverWg.Done()
817-
errS := sd.Serve(ld)
818-
require.NoError(t, errS)
793+
for {
794+
c, errA := ld.AcceptWithContext(ctx)
795+
if errA != nil {
796+
if errors.Is(errA, coapNet.ErrListenerIsClosed) {
797+
return
798+
}
799+
}
800+
defer c.Close()
801+
require.NoError(t, errA)
802+
}
819803
}()
820804
defer func() {
821-
sd.Stop()
822-
serverWg.Wait()
805+
errC := ld.Close()
806+
require.NoError(t, errC)
823807
}()
824808

825809
cc, err := dtls.Dial(
@@ -832,10 +816,13 @@ func TestClientKeepAliveMonitor(t *testing.T) {
832816
require.NoError(t, errC)
833817
}),
834818
options.WithPeriodicRunner(periodic.New(ctx.Done(), time.Millisecond*10)),
819+
options.WithReceivedMessageQueueSize(32),
820+
options.WithProcessReceivedMessageFunc(func(req *pool.Message, cc *client.Conn, handler config.HandlerFunc[*client.Conn]) {
821+
cc.ProcessReceivedMessageWithHandler(req, handler)
822+
}),
835823
)
836824
require.NoError(t, err)
837825
cc.AddOnClose(func() {
838-
t.Log("client is closed")
839826
checkClose.Release(1)
840827
})
841828

@@ -845,7 +832,7 @@ func TestClientKeepAliveMonitor(t *testing.T) {
845832
err = cc.Ping(ctxPing)
846833
require.Error(t, err)
847834

848-
err = checkClose.Acquire(ctx, 2)
835+
err = checkClose.Acquire(ctx, 1)
849836
require.NoError(t, err)
850837
require.True(t, inactivityDetected.Load())
851838
}

dtls/server/config.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ type HandlerFunc = func(*responsewriter.ResponseWriter[*udpClient.Conn], *pool.M
1919

2020
type ErrorFunc = func(error)
2121

22-
type GoPoolFunc = config.GoPoolFunc[*udpClient.Conn]
23-
2422
// OnNewConnFunc is the callback for new connections.
2523
type OnNewConnFunc = func(cc *udpClient.Conn)
2624

dtls/server/server.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,11 +214,12 @@ func (s *Server) createConn(connection *coapNet.Conn, monitor udpClient.Inactivi
214214
cfg.TransmissionMaxRetransmit = s.cfg.TransmissionMaxRetransmit
215215
cfg.Handler = s.cfg.Handler
216216
cfg.BlockwiseSZX = s.cfg.BlockwiseSZX
217-
cfg.GoPool = s.cfg.GoPool
218217
cfg.Errors = s.cfg.Errors
219218
cfg.GetMID = s.cfg.GetMID
220219
cfg.GetToken = s.cfg.GetToken
221220
cfg.MessagePool = s.cfg.MessagePool
221+
cfg.ReceivedMessageQueueSize = s.cfg.ReceivedMessageQueueSize
222+
cfg.ProcessReceivedMessage = s.cfg.ProcessReceivedMessage
222223
cc := udpClient.NewConn(
223224
session,
224225
createBlockWise,

dtls/server/session.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,6 @@ func (s *Session) Context() context.Context {
9595

9696
// SetContextValue stores the value associated with key to context of connection.
9797
func (s *Session) SetContextValue(key interface{}, val interface{}) {
98-
s.mutex.Lock()
99-
defer s.mutex.Unlock()
10098
ctx := context.WithValue(s.Context(), key, val)
10199
s.ctx.Store(&ctx)
102100
}

dtls/server_test.go

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"crypto/x509"
88
"fmt"
99
"math/big"
10+
"net"
1011
"sync"
1112
"testing"
1213
"time"
@@ -23,6 +24,7 @@ import (
2324
"github.com/plgd-dev/go-coap/v3/options/config"
2425
"github.com/plgd-dev/go-coap/v3/pkg/runner/periodic"
2526
"github.com/plgd-dev/go-coap/v3/udp/client"
27+
"github.com/plgd-dev/go-coap/v3/udp/coder"
2628
"github.com/stretchr/testify/require"
2729
"go.uber.org/atomic"
2830
"golang.org/x/sync/semaphore"
@@ -216,6 +218,10 @@ func TestServerInactiveMonitor(t *testing.T) {
216218
require.NoError(t, errC)
217219
}),
218220
options.WithPeriodicRunner(periodic.New(ctx.Done(), time.Millisecond*10)),
221+
options.WithReceivedMessageQueueSize(32),
222+
options.WithProcessReceivedMessageFunc(func(req *pool.Message, cc *client.Conn, handler config.HandlerFunc[*client.Conn]) {
223+
cc.ProcessReceivedMessageWithHandler(req, handler)
224+
}),
219225
)
220226

221227
var serverWg sync.WaitGroup
@@ -271,8 +277,8 @@ func TestServerKeepAliveMonitor(t *testing.T) {
271277
require.NoError(t, errC)
272278
}()
273279

274-
checkClose := semaphore.NewWeighted(2)
275-
err = checkClose.Acquire(ctx, 2)
280+
checkClose := semaphore.NewWeighted(1)
281+
err = checkClose.Acquire(ctx, 1)
276282
require.NoError(t, err)
277283

278284
sd := dtls.NewServer(
@@ -302,27 +308,22 @@ func TestServerKeepAliveMonitor(t *testing.T) {
302308
require.NoError(t, errS)
303309
}()
304310

305-
cc, err := dtls.Dial(
306-
ld.Addr().String(),
307-
clientCgf,
308-
options.WithGoPool(func(processReqFunc config.ProcessRequestFunc[*client.Conn], req *pool.Message, cc *client.Conn, handler config.HandlerFunc[*client.Conn]) error {
309-
time.Sleep(time.Millisecond * 500)
310-
processReqFunc(req, cc, handler)
311-
return nil
312-
}),
313-
)
311+
cc, err := piondtls.Dial("udp", ld.Addr().(*net.UDPAddr), clientCgf)
314312
require.NoError(t, err)
315-
cc.AddOnClose(func() {
316-
checkClose.Release(1)
317-
})
318313

319-
// send ping to create serverside connection
320-
ctxPing, cancel := context.WithTimeout(ctx, time.Second)
321-
defer cancel()
322-
err = cc.Ping(ctxPing)
314+
p := pool.NewMessage(ctx)
315+
p.SetCode(codes.GET)
316+
err = p.SetPath("/")
323317
require.NoError(t, err)
318+
p.SetMessageID(12345)
319+
p.SetType(message.NonConfirmable)
324320

325-
err = checkClose.Acquire(ctx, 2)
321+
data, err := p.MarshalWithEncoder(coder.DefaultCoder)
322+
require.NoError(t, err)
323+
_, err = cc.Write(data)
324+
require.NoError(t, err)
325+
326+
err = checkClose.Acquire(ctx, 1)
326327
require.NoError(t, err)
327328
require.True(t, inactivityDetected.Load())
328329
}

go.mod

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.18
44

55
require (
66
github.com/dsnet/golib/memfile v1.0.0
7+
github.com/hashicorp/go-multierror v1.1.1
78
github.com/pion/dtls/v2 v2.1.6-0.20230104045405-f40c61d83b5f
89
github.com/pion/udp v0.1.2-0.20221201030934-a2465bb5d508
910
github.com/stretchr/testify v1.8.1
@@ -15,6 +16,7 @@ require (
1516

1617
require (
1718
github.com/davecgh/go-spew v1.1.1 // indirect
19+
github.com/hashicorp/errwrap v1.0.0 // indirect
1820
github.com/pion/logging v0.2.2 // indirect
1921
github.com/pion/transport v0.14.1 // indirect
2022
github.com/pmezard/go-difflib v1.0.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
33
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
44
github.com/dsnet/golib/memfile v1.0.0 h1:J9pUspY2bDCbF9o+YGwcf3uG6MdyITfh/Fk3/CaEiFs=
55
github.com/dsnet/golib/memfile v1.0.0/go.mod h1:tXGNW9q3RwvWt1VV2qrRKlSSz0npnh12yftCSCy2T64=
6+
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
7+
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
8+
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
9+
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
610
github.com/pion/dtls/v2 v2.1.6-0.20230104045405-f40c61d83b5f h1:KBWDYx2XKp8oaAOxdjcay8YRoumElppzUAqqQKdSS1c=
711
github.com/pion/dtls/v2 v2.1.6-0.20230104045405-f40c61d83b5f/go.mod h1:VNVUbakoOWubuZf9s+NWLFTzk1BLBjs1t16mPkBtCoA=
812
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=

message/pool/message.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99

10+
multierror "github.com/hashicorp/go-multierror"
1011
"github.com/plgd-dev/go-coap/v3/message"
1112
"github.com/plgd-dev/go-coap/v3/message/codes"
1213
"go.uber.org/atomic"
@@ -560,3 +561,36 @@ func (r *Message) SetupPut(path string, token message.Token, contentFormat messa
560561
func (r *Message) SetupDelete(path string, token message.Token, opts ...message.Option) error {
561562
return r.setupCommon(codes.DELETE, path, token, opts...)
562563
}
564+
565+
func (r *Message) Clone(msg *Message) error {
566+
msg.SetCode(r.Code())
567+
msg.SetToken(r.Token())
568+
msg.ResetOptionsTo(r.Options())
569+
msg.SetType(r.Type())
570+
msg.SetMessageID(r.MessageID())
571+
572+
if r.Body() != nil {
573+
buf := bytes.NewBuffer(nil)
574+
n, err := r.Body().Seek(0, io.SeekCurrent)
575+
if err != nil {
576+
return err
577+
}
578+
_, err = io.Copy(buf, r.Body())
579+
if err != nil {
580+
var errs *multierror.Error
581+
errs = multierror.Append(errs, err)
582+
_, errS := r.Body().Seek(n, io.SeekStart)
583+
if errS != nil {
584+
errs = multierror.Append(errs, errS)
585+
}
586+
return errs.ErrorOrNil()
587+
}
588+
_, err = r.Body().Seek(n, io.SeekStart)
589+
if err != nil {
590+
return err
591+
}
592+
r := bytes.NewReader(buf.Bytes())
593+
msg.SetBody(r)
594+
}
595+
return nil
596+
}

message/pool/message_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package pool_test
22

33
import (
4+
"bytes"
45
"context"
6+
"errors"
57
"strings"
68
"testing"
79

@@ -295,3 +297,66 @@ func TestMessageETag(t *testing.T) {
295297
require.NoError(t, err)
296298
require.Equal(t, etag, value)
297299
}
300+
301+
type malFuncSeeker struct{}
302+
303+
func (m malFuncSeeker) Read(p []byte) (n int, err error) {
304+
return 0, nil
305+
}
306+
307+
func (m malFuncSeeker) Seek(offset int64, whence int) (int64, error) {
308+
return 0, errors.New("seek error")
309+
}
310+
311+
type malFuncReader struct{}
312+
313+
func (m malFuncReader) Read(p []byte) (n int, err error) {
314+
return 0, errors.New("read error")
315+
}
316+
317+
func (m malFuncReader) Seek(offset int64, whence int) (int64, error) {
318+
return 0, nil
319+
}
320+
321+
func TestMessageClone(t *testing.T) {
322+
original := pool.NewMessage(context.Background())
323+
original.SetMessageID(1)
324+
original.SetType(message.Confirmable)
325+
err := original.SetPath("/test")
326+
require.NoError(t, err)
327+
original.AddQuery("q1")
328+
original.AddQuery("q2")
329+
original.SetBody(bytes.NewReader([]byte("test body")))
330+
331+
cloned := pool.NewMessage(original.Context())
332+
err = original.Clone(cloned)
333+
require.NoError(t, err)
334+
335+
require.Equal(t, original.MessageID(), cloned.MessageID())
336+
require.Equal(t, original.Type(), cloned.Type())
337+
originalPath, err := original.Path()
338+
require.NoError(t, err)
339+
clonedPath, err := cloned.Path()
340+
require.NoError(t, err)
341+
require.Equal(t, originalPath, clonedPath)
342+
originalQueries, err := original.Queries()
343+
require.NoError(t, err)
344+
clonedQueries, err := cloned.Queries()
345+
require.NoError(t, err)
346+
require.Equal(t, originalQueries, clonedQueries)
347+
originalBody, err := original.ReadBody()
348+
require.NoError(t, err)
349+
clonedBody, err := cloned.ReadBody()
350+
require.NoError(t, err)
351+
require.Equal(t, originalBody, clonedBody)
352+
353+
original.SetBody(malFuncSeeker{})
354+
err = original.Clone(cloned)
355+
require.Error(t, err)
356+
require.Equal(t, err.Error(), "seek error")
357+
358+
original.SetBody(malFuncReader{})
359+
err = original.Clone(cloned)
360+
require.Error(t, err)
361+
require.Contains(t, err.Error(), "read error")
362+
}

0 commit comments

Comments
 (0)