Skip to content

Commit c373ab4

Browse files
committed
appliance of session-present flag (issue #2)
1 parent c5b4876 commit c373ab4

File tree

1 file changed

+53
-18
lines changed

1 file changed

+53
-18
lines changed

client.go

Lines changed: 53 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"io"
1111
"net"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415
)
1516

@@ -233,6 +234,11 @@ func (c *Config) newCONNREQ(clientID []byte) []byte {
233234
type Client struct {
234235
Config // read-only
235236

237+
// InNewSession is flagged when a (re)connect confirms a CleanSession
238+
// request. InNewSession is also flagged when a (re)connect states that
239+
// the server has no stored session state for the client identifier.
240+
InNewSession atomic.Bool
241+
236242
persistence Persistence // tracks the session
237243

238244
// Signal channels are closed once their respective state occurs.
@@ -890,12 +896,10 @@ func (c *Client) connect() error {
890896
}
891897

892898
func (c *Client) dialAndConnect(config *Config) (net.Conn, *bufio.Reader, error) {
893-
// connect request packet
894899
clientID, err := c.persistence.Load(clientIDKey)
895900
if err != nil {
896901
return nil, nil, err
897902
}
898-
packet := config.newCONNREQ(clientID)
899903

900904
// network connection
901905
ctx := c.ctx
@@ -931,7 +935,7 @@ func (c *Client) dialAndConnect(config *Config) (net.Conn, *bufio.Reader, error)
931935
}
932936
}()
933937

934-
bufr, err := c.handshake(conn, packet)
938+
bufr, err := c.handshake(conn, config, clientID)
935939
// ⚠️ delayed error check
936940

937941
done <- struct{}{}
@@ -978,15 +982,16 @@ func (c *Client) resend(conn net.Conn, seqNoOffset uint, seq *seq, space uint) e
978982
return nil
979983
}
980984

981-
func (c *Client) handshake(conn net.Conn, requestPacket []byte) (*bufio.Reader, error) {
982-
err := write(conn, requestPacket, c.PauseTimeout)
985+
func (c *Client) handshake(conn net.Conn, config *Config, clientID []byte) (*bufio.Reader, error) {
986+
// send request
987+
err := write(conn, config.newCONNREQ(clientID), c.PauseTimeout)
983988
if err != nil {
984989
return nil, err
985990
}
986991

987992
r := bufio.NewReaderSize(conn, readBufSize)
988993

989-
// Apply the deadline to the "entire" 4-byte response.
994+
// Apply the timeout to the "entire" 4-byte response.
990995
if c.PauseTimeout != 0 {
991996
err := conn.SetReadDeadline(time.Now().Add(c.PauseTimeout))
992997
if err != nil {
@@ -999,21 +1004,51 @@ func (c *Client) handshake(conn net.Conn, requestPacket []byte) (*bufio.Reader,
9991004
// CONNACK Packet.”
10001005
// — MQTT Version 3.1.1, conformance statement MQTT-3.2.0-1
10011006
packet, err := r.Peek(4)
1002-
switch {
1003-
case len(packet) > 1 && (packet[0] != typeCONNACK<<4 || packet[1] != 2):
1007+
// A smaller packet may cause timeout errors. 😉
1008+
if len(packet) > 1 && (packet[0] != typeCONNACK<<4 || packet[1] != 2) {
10041009
return nil, fmt.Errorf("%w: want fixed CONNACK header 0x2002, got %#x", errProtoReset, packet)
1005-
case len(packet) > 3 && connectReturn(packet[3]) != accepted:
1006-
return nil, connectReturn(packet[3])
1007-
case err == nil:
1008-
r.Discard(len(packet)) // no errors guaranteed
1009-
return r, nil
1010-
case errors.Is(err, io.EOF): // doesn't match io.ErrUnexpectedEOF
1011-
err = errBrokerTerm
10121010
}
1013-
if len(packet) != 4 {
1014-
err = fmt.Errorf("%w; CONNECT not confirmed", err)
1011+
if err != nil {
1012+
if errors.Is(err, io.EOF) {
1013+
err = errBrokerTerm
1014+
}
1015+
return nil, fmt.Errorf("%w; CONNECT not confirmed", err)
10151016
}
1016-
return nil, err
1017+
1018+
// Codes other than accepted indicate rejection.
1019+
if r := connectReturn(packet[3]); r != accepted {
1020+
return nil, r
1021+
}
1022+
1023+
// CONNACK flags
1024+
switch flags := packet[2]; flags {
1025+
// “Bits 7-1 are reserved and MUST be set to 0.”
1026+
default:
1027+
return nil, fmt.Errorf("%w: CONNACK with reserved flags %b",
1028+
errProtoReset, flags)
1029+
1030+
// no "session present"
1031+
case 0:
1032+
// “If the Server does not have stored Session state, it MUST
1033+
// set Session Present to 0 in the CONNACK packet.”
1034+
// — MQTT Version 3.1.1, conformance statement MQTT-3.2.2-3
1035+
c.InNewSession.Store(true)
1036+
1037+
// "session present"
1038+
case 1:
1039+
// “If the Server accepts a connection with CleanSession set to
1040+
// 1, the Server MUST set Session Present to 0 in the CONNACK …”
1041+
// — MQTT Version 3.1.1, conformance statement MQTT-3.2.2-1
1042+
if config.CleanSession {
1043+
return nil, fmt.Errorf("%w: CONNACK with session-present for clean-session request",
1044+
errProtoReset)
1045+
}
1046+
1047+
// don't clear InNewSession (on reconnects)
1048+
}
1049+
1050+
r.Discard(len(packet)) // no errors guaranteed
1051+
return r, nil
10171052
}
10181053

10191054
// ReadSlices should be invoked consecutively from a single goroutine until

0 commit comments

Comments
 (0)