Skip to content

Commit 99fd112

Browse files
Making MqttClient inherit from MqttClientInterface
1 parent f4001e7 commit 99fd112

File tree

2 files changed

+117
-44
lines changed

2 files changed

+117
-44
lines changed

src/MqttClient.cpp

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,14 +151,25 @@ int MqttClient::messageDup() const
151151
return -1;
152152
}
153153

154-
int MqttClient::messageQoS() const
154+
uint16_t MqttClient::messageId() const
155155
{
156156
if (_rxState == MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD) {
157157
// message received and ready for reading
158-
return _rxMessageQoS;
158+
return _rxPacketId;
159159
}
160160

161-
return -1;
161+
return 0;
162+
}
163+
164+
165+
MqttQos MqttClient::messageQoS() const
166+
{
167+
if (_rxState == MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD) {
168+
// message received and ready for reading
169+
return static_cast<MqttQos>(_rxMessageQoS);
170+
}
171+
172+
return QosDefault;
162173
}
163174

164175
int MqttClient::messageRetain() const
@@ -171,6 +182,32 @@ int MqttClient::messageRetain() const
171182
return -1;
172183
}
173184

185+
void MqttClient::setClient(arduino::Client* client) {
186+
if(_client != nullptr && _client->connected()) {
187+
// TODO if the current client is connected we cannot perform the change, first call disconnect
188+
return;
189+
}
190+
191+
_client = client;
192+
}
193+
194+
void MqttClient::setReceiveCallback(MqttReceiveCallback cbk) {
195+
_cbk = cbk;
196+
}
197+
198+
error_t MqttClient::publish(Topic t, uint8_t payload[], size_t size, MqttQos qos, MqttPublishFlag flags) {
199+
int error = this->beginMessage(t, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
200+
201+
if(error == 0) { // TODO replace this with a proper enum value
202+
return error;
203+
}
204+
205+
int res = this->write(payload, size);
206+
this->endMessage();
207+
208+
return res;
209+
}
210+
174211
int MqttClient::beginMessage(const char* topic, unsigned long size, bool retain, uint8_t qos, bool dup)
175212
{
176213
_txMessageTopic = topic;
@@ -259,6 +296,20 @@ int MqttClient::endMessage()
259296
return 1;
260297
}
261298

299+
void MqttClient::setWill(
300+
Topic willTopic, const uint8_t* will_message, size_t will_size, MqttQos qos, MqttPublishFlag flags) {
301+
int error = this->beginWill(willTopic, (flags & RetainEnabled) == RetainEnabled, qos, (flags & DupEnabled) == DupEnabled);
302+
303+
if(error == 0) { // TODO replace this with a proper enum value
304+
return;
305+
}
306+
307+
int res = this->write(will_message, will_size);
308+
this->endWill();
309+
310+
return;
311+
}
312+
262313
int MqttClient::beginWill(const char* topic, unsigned short size, bool retain, uint8_t qos)
263314
{
264315
int topicLength = strlen(topic);
@@ -314,7 +365,7 @@ int MqttClient::endWill()
314365
return 1;
315366
}
316367

317-
int MqttClient::subscribe(const char* topic, uint8_t qos)
368+
error_t MqttClient::subscribe(Topic topic, MqttQos qos)
318369
{
319370
int topicLength = strlen(topic);
320371
int remainingLength = topicLength + 5;
@@ -362,12 +413,12 @@ int MqttClient::subscribe(const char* topic, uint8_t qos)
362413
return 0;
363414
}
364415

365-
int MqttClient::subscribe(const String& topic, uint8_t qos)
416+
error_t MqttClient::subscribe(const String& topic, MqttQos qos)
366417
{
367418
return subscribe(topic.c_str(), qos);
368419
}
369420

370-
int MqttClient::unsubscribe(const char* topic)
421+
error_t MqttClient::unsubscribe(Topic topic)
371422
{
372423
int topicLength = strlen(topic);
373424
int remainingLength = topicLength + 4;
@@ -565,16 +616,18 @@ void MqttClient::poll()
565616
} else {
566617
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
567618

568-
if (_onMessage) {
619+
if(_cbk) {
620+
_cbk(_rxMessageTopic.c_str());
621+
} else if (_onMessage) {
569622
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
570623
_onMessage(this,_rxLength);
571624
#else
572625
_onMessage(_rxLength);
573626
#endif
627+
}
574628

575-
if (_rxLength == 0) {
576-
_rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
577-
}
629+
if ((_onMessage || _cbk) && _rxLength == 0) {
630+
_rxState = MQTT_CLIENT_RX_STATE_READ_TYPE;
578631
}
579632
}
580633
}
@@ -592,7 +645,9 @@ void MqttClient::poll()
592645

593646
_rxState = MQTT_CLIENT_RX_STATE_READ_PUBLISH_PAYLOAD;
594647

595-
if (_onMessage) {
648+
if(_cbk) {
649+
_cbk(_rxMessageTopic.c_str());
650+
} else if (_onMessage) {
596651
#ifdef MQTT_CLIENT_STD_FUNCTION_CALLBACK
597652
_onMessage(this,_rxLength);
598653
#else
@@ -647,12 +702,12 @@ void MqttClient::poll()
647702
}
648703
}
649704

650-
int MqttClient::connect(IPAddress ip, uint16_t port)
705+
error_t MqttClient::connect(IPAddress ip, uint16_t port)
651706
{
652707
return connect(ip, NULL, port);
653708
}
654709

655-
int MqttClient::connect(const char *host, uint16_t port)
710+
error_t MqttClient::connect(const char *host, uint16_t port)
656711
{
657712
return connect((uint32_t)0, host, port);
658713
}
@@ -706,7 +761,7 @@ int MqttClient::read()
706761
return b;
707762
}
708763

709-
int MqttClient::read(uint8_t *buf, size_t size)
764+
int MqttClient::read(uint8_t buf[], size_t size)
710765
{
711766
size_t result = 0;
712767

@@ -833,7 +888,7 @@ int MqttClient::subscribeQoS() const
833888
return _subscribeQos;
834889
}
835890

836-
int MqttClient::connect(IPAddress ip, const char* host, uint16_t port)
891+
error_t MqttClient::connect(IPAddress ip, const char* host, uint16_t port)
837892
{
838893
if (clientConnected()) {
839894
_client->stop();
@@ -1041,7 +1096,7 @@ void MqttClient::pubcomp(uint16_t id)
10411096
endPacket();
10421097
}
10431098

1044-
void MqttClient::ping()
1099+
error_t MqttClient::ping()
10451100
{
10461101
uint8_t packetBuffer[2];
10471102

src/MqttClient.h

Lines changed: 46 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
#define _MQTT_CLIENT_H_
2222

2323
#include <Arduino.h>
24-
#include <Client.h>
24+
#include "MqttInterface.h"
2525

2626
#define MQTT_CONNECTION_REFUSED -2
2727
#define MQTT_CONNECTION_TIMEOUT -1
@@ -41,9 +41,9 @@
4141

4242
namespace arduino {
4343

44-
class MqttClient : public Client {
44+
class MqttClient : public MqttClientInterface, public Client {
4545
public:
46-
MqttClient(Client* client);
46+
MqttClient(Client* client=nullptr);
4747
MqttClient(Client& client);
4848
virtual ~MqttClient();
4949

@@ -55,52 +55,69 @@ class MqttClient : public Client {
5555
void onMessage(void(*)(int));
5656
#endif
5757

58+
error_t publish(
59+
Topic t, uint8_t payload[],
60+
size_t size, MqttQos qos = QosDefault,
61+
MqttPublishFlag flags = MqttPublishFlags::None);
62+
63+
error_t ping() override;
64+
65+
void setReceiveCallback(MqttReceiveCallback cbk);
66+
67+
void setWill(
68+
Topic willTopic, const uint8_t* will_message,
69+
size_t will_size, MqttQos qos=QosDefault,
70+
MqttPublishFlag flags = MqttPublishFlags::None) override;
71+
72+
void setClient(arduino::Client*) override;
73+
5874
int parseMessage();
59-
String messageTopic() const;
60-
int messageDup() const;
61-
int messageQoS() const;
62-
int messageRetain() const;
63-
64-
int beginMessage(const char* topic, unsigned long size, bool retain = false, uint8_t qos = 0, bool dup = false);
65-
int beginMessage(const String& topic, unsigned long size, bool retain = false, uint8_t qos = 0, bool dup = false);
66-
int beginMessage(const char* topic, bool retain = false, uint8_t qos = 0, bool dup = false);
67-
int beginMessage(const String& topic, bool retain = false, uint8_t qos = 0, bool dup = false);
75+
String messageTopic() const override;
76+
int messageDup() const override;
77+
uint16_t messageId() const override;
78+
MqttQos messageQoS() const override;
79+
int messageRetain() const override;
80+
81+
int beginMessage(const char* topic, unsigned long size, bool retain = false, uint8_t qos = QosDefault, bool dup = false);
82+
int beginMessage(const String& topic, unsigned long size, bool retain = false, uint8_t qos = QosDefault, bool dup = false);
83+
int beginMessage(const char* topic, bool retain = false, uint8_t qos = QosDefault, bool dup = false);
84+
int beginMessage(const String& topic, bool retain = false, uint8_t qos = QosDefault, bool dup = false);
6885
int endMessage();
6986

70-
int beginWill(const char* topic, unsigned short size, bool retain, uint8_t qos);
71-
int beginWill(const String& topic, unsigned short size, bool retain, uint8_t qos);
72-
int beginWill(const char* topic, bool retain, uint8_t qos);
73-
int beginWill(const String& topic, bool retain, uint8_t qos);
87+
int beginWill(const char* topic, unsigned short size, bool retain, uint8_t qos = QosDefault);
88+
int beginWill(const String& topic, unsigned short size, bool retain, uint8_t qos = QosDefault);
89+
int beginWill(const char* topic, bool retain, uint8_t qos = QosDefault);
90+
int beginWill(const String& topic, bool retain, uint8_t qos = QosDefault);
7491
int endWill();
7592

76-
int subscribe(const char* topic, uint8_t qos = 0);
77-
int subscribe(const String& topic, uint8_t qos = 0);
78-
int unsubscribe(const char* topic);
79-
int unsubscribe(const String& topic);
93+
error_t subscribe(Topic topic, MqttQos qos = QosDefault) override;
94+
error_t subscribe(const String& topic, MqttQos qos = QosDefault);
95+
error_t unsubscribe(Topic topic) override;
96+
error_t unsubscribe(const String& topic);
8097

81-
void poll();
98+
void poll() override;
8299

83100
// from Client
84-
virtual int connect(IPAddress ip, uint16_t port = 1883);
85-
virtual int connect(const char *host, uint16_t port = 1883);
101+
error_t connect(IPAddress ip, uint16_t port = 1883) override;
102+
error_t connect(const char *host, uint16_t port = 1883) override;
86103
#ifdef ESP8266
87104
virtual int connect(const IPAddress& ip, uint16_t port) { return 0; }; /* ESP8266 core defines this pure virtual in Client.h */
88105
#endif
89106
virtual size_t write(uint8_t);
90107
virtual size_t write(const uint8_t *buf, size_t size);
91108
virtual int available();
92109
virtual int read();
93-
virtual int read(uint8_t *buf, size_t size);
110+
virtual int read(uint8_t buf[], size_t size);
94111
virtual int peek();
95112
virtual void flush();
96113
virtual void stop();
97114
virtual uint8_t connected();
98115
virtual operator bool();
99116

100-
void setId(const char* id);
117+
void setId(const char* id) override;
101118
void setId(const String& id);
102119

103-
void setUsernamePassword(const char* username, const char* password);
120+
void setUsernamePassword(const char* username, const char* password) override;
104121
void setUsernamePassword(const String& username, const String& password);
105122

106123
void setCleanSession(bool cleanSession);
@@ -123,8 +140,7 @@ class MqttClient : public Client {
123140
void pubrec(uint16_t id);
124141
void pubrel(uint16_t id);
125142
void pubcomp(uint16_t id);
126-
void ping();
127-
void disconnect();
143+
void disconnect() override;
128144

129145
int beginPacket(uint8_t type, uint8_t flags, size_t length, uint8_t* buffer);
130146
int writeString(const char* s, uint16_t length);
@@ -197,6 +213,8 @@ class MqttClient : public Client {
197213
uint16_t _willBufferIndex;
198214
size_t _willMessageIndex;
199215
uint8_t _willFlags;
216+
217+
MqttReceiveCallback _cbk;
200218
};
201219
}
202220

0 commit comments

Comments
 (0)