Skip to content

Commit e36f20c

Browse files
export outQueueSize in client/server configs (#248)
* export outQueueSize in client/server configs * add default queue size in peers * make outque constants for client/router --------- Co-authored-by: Muzzammil Shahid <muzzammil@xconn.io>
1 parent db46cc2 commit e36f20c

File tree

9 files changed

+108
-41
lines changed

9 files changed

+108
-41
lines changed

acceptor.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,12 @@ func UpgradeWebSocket(conn net.Conn, config *WebSocketServerConfig) (Peer, error
195195
Server: true,
196196
KeepAliveInterval: config.KeepAliveInterval,
197197
KeepAliveTimeout: config.KeepAliveTimeout,
198+
OutQueueSize: config.OutQueueSize,
198199
}
200+
if peerConfig.OutQueueSize == 0 {
201+
peerConfig.OutQueueSize = RouterOutQueueSizeDefault
202+
}
203+
199204
peer, err := NewWebSocketPeer(conn, peerConfig)
200205
if err != nil {
201206
return nil, fmt.Errorf("failed to init reader/writer: %w", err)
@@ -246,8 +251,8 @@ func (r *RawSocketAcceptor) Spec(serializerID SerializerID) (serializers.Seriali
246251
return serializer, nil
247252
}
248253

249-
func (r *RawSocketAcceptor) Accept(conn net.Conn) (BaseSession, error) {
250-
peer, serializerID, err := UpgradeRawSocket(conn)
254+
func (r *RawSocketAcceptor) Accept(conn net.Conn, config *RawSocketServerConfig) (BaseSession, error) {
255+
peer, serializerID, err := UpgradeRawSocket(conn, config)
251256
if err != nil {
252257
return nil, fmt.Errorf("failed to init reader/writer: %w", err)
253258
}
@@ -265,7 +270,7 @@ func (r *RawSocketAcceptor) Accept(conn net.Conn) (BaseSession, error) {
265270
return Accept(peer, hello, serializer, r.Authenticator)
266271
}
267272

268-
func UpgradeRawSocket(conn net.Conn) (Peer, transports.Serializer, error) {
273+
func UpgradeRawSocket(conn net.Conn, config *RawSocketServerConfig) (Peer, transports.Serializer, error) {
269274
maxMessageSize := transports.ProtocolMaxMsgSize
270275

271276
handshakeRequestRaw := make([]byte, 4)
@@ -290,8 +295,17 @@ func UpgradeRawSocket(conn net.Conn) (Peer, transports.Serializer, error) {
290295
return nil, 0, err
291296
}
292297

298+
if config == nil {
299+
config = DefaultRawSocketServerConfig()
300+
}
301+
293302
peerConfig := RawSocketPeerConfig{
294-
Serializer: handshakeResponse.Serializer(),
303+
Serializer: handshakeResponse.Serializer(),
304+
OutQueueSize: config.OutQueueSize,
305+
}
306+
307+
if peerConfig.OutQueueSize == 0 {
308+
peerConfig.OutQueueSize = RouterOutQueueSizeDefault
295309
}
296310

297311
return NewRawSocketPeer(conn, peerConfig), handshakeRequest.Serializer(), nil

client.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type Client struct {
2424
DialTimeout time.Duration
2525
KeepAliveInterval time.Duration
2626
KeepAliveTimeout time.Duration
27+
OutQueueSize int
2728
}
2829

2930
func (c *Client) Connect(ctx context.Context, uri string, realm string) (*Session, error) {
@@ -39,6 +40,7 @@ func (c *Client) Connect(ctx context.Context, uri string, realm string) (*Sessio
3940
NetDial: c.NetDial,
4041
KeepAliveInterval: c.KeepAliveInterval,
4142
KeepAliveTimeout: c.KeepAliveTimeout,
43+
OutQueueSize: c.OutQueueSize,
4244
}
4345

4446
joiner := &WebSocketJoiner{
@@ -64,6 +66,7 @@ func (c *Client) Connect(ctx context.Context, uri string, realm string) (*Sessio
6466
NetDial: c.NetDial,
6567
KeepAliveInterval: c.KeepAliveInterval,
6668
KeepAliveTimeout: c.KeepAliveTimeout,
69+
OutQueueSize: c.OutQueueSize,
6770
}
6871

6972
joiner := &RawSocketJoiner{
@@ -116,13 +119,13 @@ func ConnectCryptosign(ctx context.Context, uri, realm, authid, privateKey strin
116119
}
117120

118121
func ConnectInMemoryBase(router *Router, realm, authID, authRole string,
119-
serializer serializers.Serializer) (BaseSession, error) {
122+
serializer serializers.Serializer, outQueueSize int) (BaseSession, error) {
120123

121124
if serializer == nil {
122125
return nil, fmt.Errorf("serializer must not be nil")
123126
}
124127

125-
clientPeer, routerPeer := NewInMemoryPeerPair()
128+
clientPeer, routerPeer := NewInMemoryPeerPair(outQueueSize)
126129
sessionID := wampproto.GenerateID()
127130
routerSession := NewBaseSession(
128131
sessionID,
@@ -168,8 +171,9 @@ func ConnectInMemoryBase(router *Router, realm, authID, authRole string,
168171
func ConnectInMemory(router *Router, realm string) (*Session, error) {
169172
authID := fmt.Sprintf("%012x", rand.Uint64())[:12] // #nosec
170173
authRole := "trusted"
174+
outQueueSize := 16
171175

172-
base, err := ConnectInMemoryBase(router, realm, authID, authRole, &serializers.MsgPackSerializer{})
176+
base, err := ConnectInMemoryBase(router, realm, authID, authRole, &serializers.MsgPackSerializer{}, outQueueSize)
173177
if err != nil {
174178
return nil, err
175179
}

client_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func connectInMemory(t *testing.T, router *xconn.Router, realm string,
3737
authID := fmt.Sprintf("%012x", rand.Uint64())[:12]
3838
authRole := "trusted"
3939

40-
base, err := xconn.ConnectInMemoryBase(router, realm, authID, authRole, serializer)
40+
base, err := xconn.ConnectInMemoryBase(router, realm, authID, authRole, serializer, 0)
4141
require.NoError(t, err)
4242

4343
return xconn.NewSession(base, base.Serializer())

joiner.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ func DialWebSocket(ctx context.Context, url *netURL.URL, config *WSDialerConfig)
9292
Server: false,
9393
KeepAliveInterval: config.KeepAliveInterval,
9494
KeepAliveTimeout: config.KeepAliveTimeout,
95+
OutQueueSize: config.OutQueueSize,
9596
}
9697
wsPeer, err := NewWebSocketPeer(conn, peerConfig)
9798
if err != nil {
@@ -156,7 +157,10 @@ func DialRawSocket(ctx context.Context, url *netURL.URL, config *RawSocketDialer
156157
return nil, err
157158
}
158159

159-
return NewRawSocketPeer(conn, RawSocketPeerConfig{Serializer: config.Serializer}), nil
160+
return NewRawSocketPeer(conn, RawSocketPeerConfig{
161+
Serializer: config.Serializer,
162+
OutQueueSize: config.OutQueueSize,
163+
}), nil
160164
}
161165

162166
func Join(cl Peer, realm string, serializer serializers.Serializer,

peer.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ import (
1818
"github.com/xconnio/wampproto-go/transports"
1919
)
2020

21+
const (
22+
ClientOutQueueSizeDefault = 16
23+
RouterOutQueueSizeDefault = 64
24+
)
25+
2126
func NewBaseSession(id uint64, realm, authID, authRole string, cl Peer, serializer serializers.Serializer) BaseSession {
2227
return &baseSession{
2328
id: id,
@@ -194,14 +199,18 @@ func NewWebSocketPeer(conn net.Conn, peerConfig WSPeerConfig) (Peer, error) {
194199
msgOpCode = ws.OpText
195200
}
196201

202+
if peerConfig.OutQueueSize == 0 {
203+
peerConfig.OutQueueSize = ClientOutQueueSizeDefault
204+
}
205+
197206
peer := &WebSocketPeer{
198207
transportType: TransportWebSocket,
199208
protocol: peerConfig.Protocol,
200209
conn: conn,
201210
pingCh: make(chan struct{}, 1),
202211
wsMsgOp: msgOpCode,
203212
server: peerConfig.Server,
204-
writeChan: make(chan []byte, 64),
213+
writeChan: make(chan []byte, peerConfig.OutQueueSize),
205214
ctrlChan: make(chan wsMsg, 2),
206215
doneWriting: make(chan struct{}),
207216
closeChan: make(chan struct{}),
@@ -435,11 +444,15 @@ type RawSocketPeer struct {
435444
}
436445

437446
func NewRawSocketPeer(conn net.Conn, peerConfig RawSocketPeerConfig) Peer {
447+
if peerConfig.OutQueueSize == 0 {
448+
peerConfig.OutQueueSize = ClientOutQueueSizeDefault
449+
}
450+
438451
peer := &RawSocketPeer{
439452
transportType: TransportRawSocket,
440453
conn: conn,
441454
serializer: peerConfig.Serializer,
442-
writeChan: make(chan []byte, 64),
455+
writeChan: make(chan []byte, peerConfig.OutQueueSize),
443456
ctrlChan: make(chan rsMsg, 2),
444457
doneWriting: make(chan struct{}),
445458
}
@@ -731,26 +744,26 @@ func (l *localPeer) Close() error {
731744
return l.conn.Close()
732745
}
733746

734-
func newLocalPeer(conn, otherSide net.Conn, bufferSize int) *localPeer {
735-
if bufferSize <= 0 {
736-
bufferSize = 16 // default fallback
747+
func newLocalPeer(conn, otherSide net.Conn, outQueueSize int) *localPeer {
748+
if outQueueSize == 0 {
749+
outQueueSize = ClientOutQueueSizeDefault
737750
}
738751

739752
p := &localPeer{
740753
conn: conn,
741754
other: otherSide,
742-
writeChan: make(chan []byte, bufferSize),
755+
writeChan: make(chan []byte, outQueueSize),
743756
doneWriting: make(chan struct{}),
744757
}
745758

746759
go p.writer()
747760
return p
748761
}
749762

750-
func NewInMemoryPeerPair() (Peer, Peer) {
763+
func NewInMemoryPeerPair(outQueueSize int) (Peer, Peer) {
751764
conn1, conn2 := net.Pipe()
752-
peer1 := newLocalPeer(conn1, conn2, 16)
753-
peer2 := newLocalPeer(conn2, conn1, 16)
765+
peer1 := newLocalPeer(conn1, conn2, outQueueSize)
766+
peer2 := newLocalPeer(conn2, conn1, outQueueSize)
754767

755768
return peer1, peer2
756769
}

peer_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
)
1111

1212
func TestInMemoryPeer(t *testing.T) {
13-
client, server := xconn.NewInMemoryPeerPair()
13+
client, server := xconn.NewInMemoryPeerPair(0)
1414

1515
clientChan := make(chan []byte)
1616
clientCloseChan := make(chan struct{})

router_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,11 @@ func TestRouterMetaKillByAuthID(t *testing.T) {
7474
require.NoError(t, err)
7575

7676
// Create sessions with authid "test"
77-
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "trusted", &serializers.JSONSerializer{})
77+
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "trusted", &serializers.JSONSerializer{}, 0)
7878
require.NoError(t, err)
7979
session1 := xconn.NewSession(baseSession, baseSession.Serializer())
8080

81-
baseSession1, err := xconn.ConnectInMemoryBase(router, realmName, "test", "trusted", &serializers.JSONSerializer{})
81+
baseSession1, err := xconn.ConnectInMemoryBase(router, realmName, "test", "trusted", &serializers.JSONSerializer{}, 0)
8282
require.NoError(t, err)
8383
session2 := xconn.NewSession(baseSession1, baseSession.Serializer())
8484

@@ -116,11 +116,11 @@ func TestRouterMetaKillByAuthRole(t *testing.T) {
116116
require.NoError(t, err)
117117

118118
// Create sessions with authrole "test"
119-
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "test", &serializers.JSONSerializer{})
119+
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "test", &serializers.JSONSerializer{}, 0)
120120
require.NoError(t, err)
121121
session1 := xconn.NewSession(baseSession, baseSession.Serializer())
122122

123-
baseSession1, err := xconn.ConnectInMemoryBase(router, realmName, "test", "test", &serializers.JSONSerializer{})
123+
baseSession1, err := xconn.ConnectInMemoryBase(router, realmName, "test", "test", &serializers.JSONSerializer{}, 0)
124124
require.NoError(t, err)
125125
session2 := xconn.NewSession(baseSession1, baseSession.Serializer())
126126

@@ -198,7 +198,7 @@ func TestRouterMetaSessionCount(t *testing.T) {
198198
})
199199

200200
// Connect second session with role=admin
201-
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "admin", &serializers.JSONSerializer{})
201+
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "admin", &serializers.JSONSerializer{}, 0)
202202
require.NoError(t, err)
203203
session2 := xconn.NewSession(baseSession, baseSession.Serializer())
204204

@@ -241,7 +241,7 @@ func TestRouterMetaSessionList(t *testing.T) {
241241
})
242242

243243
// Connect second session with role=admin
244-
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "admin", &serializers.JSONSerializer{})
244+
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, "test", "admin", &serializers.JSONSerializer{}, 0)
245245
require.NoError(t, err)
246246
session2 := xconn.NewSession(baseSession, baseSession.Serializer())
247247

@@ -305,7 +305,7 @@ func TestAuthorization(t *testing.T) {
305305

306306
createSession := func(role string) *xconn.Session {
307307
authID := fmt.Sprintf("%012x", rand.Uint64())[:12] // #nosec
308-
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, authID, role, &serializers.JSONSerializer{})
308+
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, authID, role, &serializers.JSONSerializer{}, 0)
309309
require.NoError(t, err)
310310
return xconn.NewSession(baseSession, baseSession.Serializer())
311311
}
@@ -487,7 +487,7 @@ func TestCustomAuthorizer(t *testing.T) {
487487

488488
createSession := func(role string) *xconn.Session {
489489
authID := fmt.Sprintf("%012x", rand.Uint64())[:12] // #nosec
490-
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, authID, role, &serializers.JSONSerializer{})
490+
baseSession, err := xconn.ConnectInMemoryBase(router, realmName, authID, role, &serializers.JSONSerializer{}, 0)
491491
require.NoError(t, err)
492492
return xconn.NewSession(baseSession, baseSession.Serializer())
493493
}
@@ -663,7 +663,7 @@ func TestBlockedLocalClient(t *testing.T) {
663663

664664
// subscribe to topic 'io.xconn.test'
665665
authID := fmt.Sprintf("%012x", rand.Uint64())[:12] // #nosec
666-
baseSession, err := xconn.ConnectInMemoryBase(router, "realm1", authID, "trusted", &serializers.MsgPackSerializer{})
666+
baseSession, err := xconn.ConnectInMemoryBase(router, "realm1", authID, "trusted", &serializers.MsgPackSerializer{}, 0)
667667
require.NoError(t, err)
668668

669669
// publisher client

server.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ const (
2020
ListenerWebSocket ListenerType = iota
2121
ListenerRawSocket
2222
ListenerUniversalTCP
23+
24+
ServerOutQueueSizeDefault = 8
2325
)
2426

2527
type Network string
@@ -36,6 +38,7 @@ type Server struct {
3638
throttle *Throttle
3739
keepAliveInterval time.Duration
3840
keepAliveTimeout time.Duration
41+
outQueueSize int
3942
}
4043

4144
func NewServer(router *Router, authenticator auth.ServerAuthenticator, config *ServerConfig) *Server {
@@ -57,6 +60,7 @@ func NewServer(router *Router, authenticator auth.ServerAuthenticator, config *S
5760
server.throttle = config.Throttle
5861
server.keepAliveInterval = config.KeepAliveInterval
5962
server.keepAliveTimeout = config.KeepAliveTimeout
63+
server.outQueueSize = config.OutQueueSize
6064
}
6165

6266
return server
@@ -136,14 +140,19 @@ func (s *Server) HandleClient(conn net.Conn, listener ListenerType) {
136140
}
137141

138142
if magicArray[0] == transports.MAGIC {
139-
base, err = s.rsAcceptor.Accept(wrapped)
143+
config := DefaultRawSocketServerConfig()
144+
config.KeepAliveInterval = s.keepAliveInterval
145+
config.KeepAliveTimeout = s.keepAliveTimeout
146+
config.OutQueueSize = s.outQueueSize
147+
base, err = s.rsAcceptor.Accept(wrapped, config)
140148
if err != nil {
141149
return
142150
}
143151
} else {
144152
config := DefaultWebSocketServerConfig()
145153
config.KeepAliveInterval = s.keepAliveInterval
146154
config.KeepAliveTimeout = s.keepAliveTimeout
155+
config.OutQueueSize = s.outQueueSize
147156
base, err = s.wsAcceptor.Accept(wrapped, s.router, config)
148157
if err != nil {
149158
return
@@ -153,13 +162,18 @@ func (s *Server) HandleClient(conn net.Conn, listener ListenerType) {
153162
config := DefaultWebSocketServerConfig()
154163
config.KeepAliveInterval = s.keepAliveInterval
155164
config.KeepAliveTimeout = s.keepAliveTimeout
165+
config.OutQueueSize = s.outQueueSize
156166
base, err = s.wsAcceptor.Accept(conn, s.router, config)
157167
if err != nil {
158168
log.Debugf("failed to accept websocket connection: %v", err)
159169
return
160170
}
161171
case ListenerRawSocket:
162-
base, err = s.rsAcceptor.Accept(conn)
172+
config := DefaultRawSocketServerConfig()
173+
config.KeepAliveInterval = s.keepAliveInterval
174+
config.KeepAliveTimeout = s.keepAliveTimeout
175+
config.OutQueueSize = s.outQueueSize
176+
base, err = s.rsAcceptor.Accept(conn, config)
163177
if err != nil {
164178
log.Debugf("failed to accept rawsocket connection: %v", err)
165179
return

0 commit comments

Comments
 (0)