Skip to content

Commit 4e780ec

Browse files
carlhoerbergclaude
andcommitted
Add MQTT 3.1 protocol support
This change adds: - Support for MQTT 3.1 protocol (MQIsdp) alongside 3.1.1 (MQTT) - Proper version detection in Connect packet - Test coverage for MQTT 3.1 protocol - Fix for protocol reading in specs 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent e63d91b commit 4e780ec

File tree

2 files changed

+69
-10
lines changed

2 files changed

+69
-10
lines changed

spec/packets_spec.cr

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ describe MQTT::Protocol::Packet do
5050
io.write_string "to long"
5151
mio.rewind
5252

53-
expect_raises(MQTT::Protocol::Error::PacketDecode, /invalid protocol length/) do
53+
expect_raises(MQTT::Protocol::Error::PacketDecode, /invalid protocol/) do
5454
MQTT::Protocol::Packet.from_io(mio)
5555
end
5656
end
@@ -171,6 +171,52 @@ describe MQTT::Protocol::Packet do
171171
connect.password.should eq password
172172
connect.will.not_nil!.topic.should eq wtopic
173173
end
174+
175+
it "supports MQTT 3.1 protocol" do
176+
# Test MQTT 3.1 protocol by directly writing a Connect packet with version 0x03
177+
# and verifying it's read back correctly
178+
mio = IO::Memory.new
179+
io = MQTT::Protocol::IO.new(mio)
180+
181+
# Write packet header
182+
io.write_byte(0b00010000u8) # Connect packet
183+
184+
# Build the packet content in a separate IO to calculate length
185+
content = IO::Memory.new
186+
content_io = MQTT::Protocol::IO.new(content)
187+
content_io.write_string("MQIsdp")
188+
content_io.write_byte(0x03u8) # MQTT 3.1 version
189+
content_io.write_byte(0b00000010u8) # Connect flags - clean session
190+
content_io.write_int(30u16) # Keepalive
191+
content_io.write_string("mqtt31-client") # Client ID
192+
193+
# Write the remaining length and content
194+
io.write_remaining_length(content.size)
195+
io.write_bytes_raw(content.to_slice)
196+
197+
# Rewind to read
198+
mio.rewind
199+
200+
# Read back and verify
201+
packet = MQTT::Protocol::Packet.from_io(io)
202+
packet.should be_a MQTT::Protocol::Connect
203+
204+
# Reset and read again to verify protocol name and version
205+
mio.rewind
206+
header_byte = mio.read_byte # Skip type/flags
207+
remaining_length = mio.read_byte # Skip remaining length
208+
209+
# Read protocol name
210+
protocol_len = mio.read_bytes(UInt16, IO::ByteFormat::NetworkEndian)
211+
protocol_name_bytes = Bytes.new(protocol_len)
212+
mio.read_fully(protocol_name_bytes)
213+
protocol_name = String.new(protocol_name_bytes)
214+
protocol_name.should eq "MQIsdp"
215+
216+
# Read protocol version
217+
protocol_version = mio.read_byte
218+
protocol_version.should eq 0x03
219+
end
174220
end
175221
end
176222

src/mqtt/protocol/packets/connect.cr

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@ module MQTT
1111
@username : String?
1212
@password : Bytes?
1313
@will : Will?
14+
@version : UInt8
1415

15-
getter client_id, keepalive, username, password, will
16+
getter client_id, keepalive, username, password, will, version
1617
getter? clean_session
1718

18-
def initialize(@client_id, @clean_session, @keepalive, @username, @password, @will)
19+
def initialize(@client_id, @clean_session, @keepalive, @username, @password, @will, @version = 0x04)
1920
# Remaining length is at least 10:
2021
# protocol name (str) + protocol version (byte) + connect flags (byte) + keep alive (int)
2122
@remaining_length = 10
@@ -41,13 +42,20 @@ module MQTT
4142
decode_assert flags.zero?, MQTT::Protocol::Error::InvalidFlags, flags
4243

4344
protocol_len = io.read_int
44-
decode_assert protocol_len == 4, "invalid protocol length: #{protocol_len}"
45-
4645
protocol = io.read_string(protocol_len)
47-
decode_assert protocol == "MQTT", "invalid protocol: #{protocol.inspect}"
46+
47+
# MQIsdp is for MQTT 3.1, MQTT is for MQTT 3.1.1
48+
if protocol == "MQTT"
49+
decode_assert protocol_len == 4, "invalid protocol length for MQTT 3.1.1: #{protocol_len}"
50+
elsif protocol == "MQIsdp"
51+
decode_assert protocol_len == 6, "invalid protocol length for MQTT 3.1: #{protocol_len}"
52+
else
53+
decode_assert false, "invalid protocol: #{protocol.inspect}"
54+
end
4855

4956
version = io.read_byte
50-
decode_assert version == 0x04, Error::UnacceptableProtocolVersion
57+
decode_assert version == 0x04 || version == 0x03, Error::UnacceptableProtocolVersion
58+
# @version will be set when we create a new instance at the end of this method
5159

5260
connect_flags = io.read_byte
5361
decode_assert connect_flags.bit(0) == 0, "reserved connect flag set"
@@ -80,7 +88,7 @@ module MQTT
8088
username = io.read_string if has_username
8189
password = io.read_bytes if has_password
8290

83-
self.new(client_id, clean_session, keepalive, username, password, will)
91+
self.new(client_id, clean_session, keepalive, username, password, will, version)
8492
end
8593

8694
def to_io(io)
@@ -101,8 +109,13 @@ module MQTT
101109
connect_flags |= 0b0000_0010u8 if clean_session?
102110
io.write_byte(TYPE << 4)
103111
io.write_remaining_length remaining_length
104-
io.write_string "MQTT"
105-
io.write_byte 4u8 # "protocol version"
112+
if version == 0x03
113+
io.write_string "MQIsdp"
114+
io.write_byte 3u8 # "protocol version"
115+
else
116+
io.write_string "MQTT"
117+
io.write_byte 4u8 # "protocol version"
118+
end
106119
io.write_byte connect_flags
107120
io.write_int keepalive
108121
io.write_string client_id if client_id

0 commit comments

Comments
 (0)