Skip to content

Commit 723f6e7

Browse files
committed
wip
1 parent 4369fe9 commit 723f6e7

File tree

6 files changed

+190
-130
lines changed

6 files changed

+190
-130
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
/libs
1+
/libs
2+
/TODO.md

README.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
# mqtt-broker-lib
2-
WIP! A Mqtt Broker for Turtles
2+
3+
_A MQTT-Broker for CCTweaked_
4+
5+
**Just started development, not usable at this state**

definitions.lua

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
---@meta
2+
13
---@class MQTT_message
24
---@field id? string | nil Will be set by the broker
35
---@field payload? any Can be anything
@@ -6,19 +8,24 @@
68
---@field target? number[] Table of targetID's. Optional. If set, only target computers will receive message (IF SUBSCRIBED to topic!)
79
---@field ttl? number Time to Live in Milliseconds. Default: 10 minutes, unless retain=true
810
---@field retain? boolean Will always send first, even if subscribed after message was send. Only one per Topic!
9-
---@field timestamp? number will be set by the broker
10-
---@field readBy? table<number,boolean> will be managed by the broker
11+
---@field timestamp? number will be set by the broker
12+
---@field readBy? table<number,boolean> will be managed by the broker
13+
14+
---@class MQTT_Topic
15+
---@field messages MQTT_message[]
16+
---@field subscribedClients MQTTBroker_ClientList
17+
---@field retainedMessage MQTT_message
1118

12-
---@class MQTT_messages
13-
---@field [string] MQTT_message string => Topic
19+
---@class MQTT_TopicList
20+
---@field topic {[string]: MQTT_Topic} string => Topic
1421

1522
---@class MQTTBroker_Client
16-
---@field id number
1723
---@field timeoutDuration number Timeout in seconds - Overwritten in function "registerLastWillMessage"
1824
---@field lastSeen number Updated in any request the Client inits
25+
---@field subscribedToTopic string[]
1926

2027
---@class MQTTBroker_ClientList
21-
---@field clients MQTTBroker_Client[]
28+
---@field clients {[number]: MQTTBroker_Client} number => ComputerID
2229

2330
---@type MQTT_message
2431
local message;

fetch-deps.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ libs=(
1212
"helperFunctions-lib"
1313
"testSuite-lib"
1414
"eventHandler-lib"
15+
"ccClass-lib"
1516
)
1617

1718
# Basic setup variables

mqttBroker.lua

Lines changed: 150 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,142 +1,177 @@
1-
---@class MQTTBroker
2-
local MQTTBroker = {}
1+
local scm = require("scm")
2+
local class = scm:load("ccClass")
33

4-
local ERRORText = {
5-
["validate_MessageWrongType"] = "Message is not of type table! It is type %s",
6-
["validate_MissingSender"] = "Sender_ID is Missing",
7-
}
4+
---@class MQTTBroker
5+
---@field clientList MQTTBroker_ClientList
6+
---@field topicList MQTT_TopicList
7+
---@field validator nil | fun(topic, message): boolean, (nil | string)
8+
---@field defaultTimeout number 5 minutes
9+
---@field private messageIndex {[number]: MQTT_message}
10+
local MQTTBroker = class(function (a)
811

9-
MQTTBroker.defaultTimeOut = 300 * 1000 -- 5 minutes
12+
a.defaultTimeOut = 300 * 1000 -- 5 minutes
1013

11-
---@type MQTTBroker_ClientList
12-
MQTTBroker.clientList = {}
14+
a.clientList = {}
1315

14-
---@type nil | fun(topic, message): boolean, (nil | string)
15-
MQTTBroker.validator = nil
16+
a.validator = nil
1617

17-
---@type MQTT_messages
18-
MQTTBroker.messages = {}
18+
a.messageList = {}
19+
a.topicList = {}
1920

20-
---@private
21-
MQTTBroker.messageIndex = {}
22-
23-
---@param id any
24-
---@return MQTT_message | nil
25-
function MQTTBroker:getMessageByID(id)
26-
return self.messageIndex[id]
27-
end
21+
a.messageIndex = {}
2822

23+
end) --[[@as MQTTBroker]]
2924

25+
local ERRORText = {
26+
["validate_MessageWrongType"] = "Message is not of type table! It is type %s",
27+
["validate_MissingSender"] = "Sender_ID is Missing",
28+
}
3029

31-
---@param topic string
32-
---@param message MQTT_message
33-
---@return boolean
34-
---@return string | nil
35-
function MQTTBroker:validatePayload(topic, message)
36-
if type(message) ~= "table" then
37-
return false, string.format(ERRORText["validate_MessageWrongType"], type(message))
38-
end
39-
if not message.sender or type(message.sender) ~= "number" then
40-
return false, ERRORText["validate_MissingSender"]
41-
end
42-
if not MQTTBroker.validator then return true end
43-
return MQTTBroker.validator(topic, message.payload)
44-
end
45-
46-
---@param topic string
47-
---@param message MQTT_message
48-
---@return boolean success
49-
---@return string | nil idOrErrorReason
50-
function MQTTBroker:publish(topic, message)
51-
local ok, err = self:validatePayload(topic, message)
52-
if not ok then
53-
return false, err
54-
end
55-
56-
57-
58-
local suffix = 0
59-
local isDublicate = true
60-
local currentTime = string.format(math.floor(os.time()))
61-
while isDublicate do
62-
if self:getMessageByID(currentTime .. suffix) then
63-
suffix = suffix + 1
64-
isDublicate = true
65-
else
66-
isDublicate = false
67-
end
68-
end
69-
message.id = currentTime .. suffix
30+
---@private
31+
---@return MQTT_Topic
32+
function MQTTBroker:createEmptyTopic()
33+
return {messages = {}, subscribedClients = {}, retainedMessage = nil}
34+
end
7035

71-
if not message.retain and not message.ttl then
72-
message.ttl = self.defaultTimeOut
73-
end
36+
---@param id any
37+
---@return MQTT_message | nil
38+
function MQTTBroker:getMessageByID(id)
39+
return self.messageIndex[id]
40+
end
7441

75-
message.readBy = {}
7642

77-
if self.messages[topic] == nil then
78-
self.messages[topic] = {}
79-
end
8043

81-
table.insert(self.messages[topic], message)
82-
self.messageIndex[message.id] = message
83-
return true, message.id
44+
---@param topic string
45+
---@param message MQTT_message
46+
---@return boolean
47+
---@return string | nil
48+
function MQTTBroker:validateMessage(topic, message)
49+
if type(message) ~= "table" then
50+
return false, string.format(ERRORText["validate_MessageWrongType"], type(message))
8451
end
85-
86-
---@param clientID number
87-
---@param topic string
88-
---@return boolean success
89-
function MQTTBroker:subscribe(clientID, topic)
90-
return true
52+
if not message.sender or type(message.sender) ~= "number" then
53+
return false, ERRORText["validate_MissingSender"]
9154
end
92-
93-
---@param clientID number
94-
---@param topic string
95-
---@return boolean success
96-
function MQTTBroker:unsubscribe(clientID, topic)
97-
return true
55+
if not MQTTBroker.validator then return true end
56+
return MQTTBroker.validator(topic, message.payload)
57+
end
58+
59+
---@param topic string
60+
---@param message MQTT_message
61+
---@return boolean success
62+
---@return string | nil idOrErrorReason
63+
function MQTTBroker:publish(topic, message)
64+
65+
local ok, err = self:validateMessage(topic, message)
66+
if not ok then
67+
return false, err
9868
end
9969

100-
---@param clientID number
101-
---@return MQTT_messages
102-
function MQTTBroker:getMessagesForClient(clientID)
103-
return {}
70+
-- add suffix, as more then one message "could"'ve come in per timestamp
71+
local suffix = 0
72+
local isDublicate = true
73+
local currentTime = string.format(math.floor(os.time()))
74+
while isDublicate do
75+
if self:getMessageByID(currentTime .. suffix) then
76+
suffix = suffix + 1
77+
isDublicate = true
78+
else
79+
isDublicate = false
80+
end
10481
end
82+
message.id = currentTime .. suffix
10583

106-
107-
---@param clientID number
108-
---@param topic string
109-
---@return boolean success
110-
function MQTTBroker:markAsRead(clientID, topic)
111-
return true
84+
-- if retain is set, no ttl will be set (from the broker anyway)
85+
if not message.retain and not message.ttl then
86+
message.ttl = self.defaultTimeout
11287
end
11388

114-
---@return number deletedMessages
115-
function MQTTBroker:expireMessages()
116-
return 0
89+
-- adding message to the message- and indexList
90+
if self.topicList[topic] == nil then
91+
self.topicList[topic] = self:createEmptyTopic()
11792
end
118-
119-
---@param topic string
120-
---@return boolean
121-
function MQTTBroker:pruneTopic(topic)
122-
return true
93+
94+
if message.retain then
95+
self.topicList[topic].retainedMessage = message
96+
else
97+
table.insert(self.topicList[topic].messages, message)
12398
end
124-
125-
---Register the last will of a client (In case of a timeout)
126-
---@param clientID number
127-
---@param topic string
128-
---@param message MQTT_message
129-
---@param timeoutDuration number in seconds
130-
---@return boolean
131-
function MQTTBroker:registerLastWillMessage(clientID, topic, message, timeoutDuration)
132-
return true
99+
self.messageIndex[message.id] = message
100+
message.readBy = {}
101+
return true, message.id
102+
103+
end
104+
105+
---@param clientID number
106+
---@param topic string
107+
---@return boolean success
108+
function MQTTBroker:subscribe(clientID, topic)
109+
if self.clientList[clientID] == nil then
110+
self.clientList[clientID] = {
111+
timeoutDuration = self.defaultTimeout,
112+
subscribedToTopic = {}
113+
}
133114
end
134-
135-
---comment
136-
---@return integer
137-
function MQTTBroker:checkClientTimeouts()
138-
return 0
115+
local client = self.clientList[clientID]
116+
---@cast client MQTTBroker_Client
117+
client.lastSeen = os.time()
118+
119+
table.insert(client.subscribedToTopic, topic)
120+
if self.topicList[topic] == nil then
121+
self.topicList[topic] = self:createEmptyTopic()
139122
end
140-
123+
table.insert(self.topicList[topic].subscribedClients, clientID)
124+
125+
return true
126+
127+
end
128+
129+
---@param clientID number
130+
---@param topic string
131+
---@return boolean success
132+
function MQTTBroker:unsubscribe(clientID, topic)
133+
return true
134+
end
135+
136+
---@param clientID number
137+
---@return MQTT_message[]
138+
function MQTTBroker:getMessagesForClient(clientID)
139+
return {}
140+
end
141+
142+
143+
---@param clientID number
144+
---@param topic string
145+
---@return boolean success
146+
function MQTTBroker:markAsRead(clientID, topic)
147+
return true
148+
end
149+
150+
---@return number deletedMessages
151+
function MQTTBroker:expireMessages()
152+
return 0
153+
end
154+
155+
---@param topic string
156+
---@return boolean
157+
function MQTTBroker:pruneTopic(topic)
158+
return true
159+
end
160+
161+
---Register the last will of a client (In case of a timeout)
162+
---@param clientID number
163+
---@param topic string
164+
---@param message MQTT_message
165+
---@param timeoutDuration number in seconds
166+
---@return boolean
167+
function MQTTBroker:registerLastWillMessage(clientID, topic, message, timeoutDuration)
168+
return true
169+
end
170+
171+
---comment
172+
---@return integer
173+
function MQTTBroker:checkClientTimeouts()
174+
return 0
175+
end
141176

142177
return MQTTBroker

tests/MQTTBroker_spec.lua

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,36 @@ package.path = package.path .. ";"
3333
.."libs/peripherals/?.lua;"
3434

3535
local MQTTBroker = require("mqttBroker")
36+
---@type MQTTBroker
37+
local broker
3638

3739
describe("Basic Tests", function()
40+
41+
before_each(function()
42+
broker = MQTTBroker()
43+
end)
3844
it("loaded", function()
39-
assert.is.truthy(MQTTBroker)
45+
assert.is.truthy(broker)
4046
end)
4147
describe("Publish", function()
4248
it("SenderID", function ()
43-
assert.True(MQTTBroker:publish("test", {sender = 2}))
44-
assert.False(MQTTBroker:publish("test", {}))
49+
assert.True(broker:publish("test", {sender = 2}))
50+
assert.False(broker:publish("test", {}))
4551
end)
4652
it("Publish", function()
47-
local ok, id = MQTTBroker:publish("test", {sender = 2, payload = "test payload"})
48-
local ok2, id2 = MQTTBroker:publish("test", {sender = 2, payload = "test payload 2"})
53+
local ok, id = broker:publish("test", {sender = 2, payload = "test payload"})
54+
local ok2, id2 = broker:publish("test", {sender = 2, payload = "test payload 2"})
4955
assert.True(ok)
5056
assert.True(ok2)
51-
assert.are.same(MQTTBroker:getMessageByID(id).payload, "test payload")
52-
assert.are.same(MQTTBroker:getMessageByID(id2).payload, "test payload 2")
57+
assert.are.same(broker:getMessageByID(id).payload, "test payload")
58+
assert.are.same(broker:getMessageByID(id2).payload, "test payload 2")
59+
end)
60+
end)
61+
describe("Subscribe", function()
62+
it("Registered", function()
63+
broker:subscribe(1, "test")
64+
assert.is.truthy(broker.clientList[1])
65+
5366
end)
5467
end)
5568
end)

0 commit comments

Comments
 (0)