11/*
2- Copyright (c) 2010-2019 Roger Light <roger@atchoo.org>
2+ Copyright (c) 2010-2020 Roger Light <roger@atchoo.org>
33
44All rights reserved. This program and the accompanying materials
5- are made available under the terms of the Eclipse Public License v1 .0
5+ are made available under the terms of the Eclipse Public License 2 .0
66and Eclipse Distribution License v1.0 which accompany this distribution.
7-
7+
88The Eclipse Public License is available at
9- http ://www.eclipse.org/legal/epl-v10.html
9+ https ://www.eclipse.org/legal/epl-2.0/
1010and the Eclipse Distribution License is available at
1111 http://www.eclipse.org/org/documents/edl-v10.php.
12-
12+
13+ SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
14+
1315Contributors:
1416 Roger Light - initial implementation and documentation.
1517*/
@@ -22,25 +24,88 @@ and the Eclipse Distribution License is available at
2224#include "mosquitto_internal.h"
2325#include "memory_mosq.h"
2426#include "messages_mosq.h"
25- #include "mqtt3_protocol .h"
27+ #include "mqtt_protocol .h"
2628#include "net_mosq.h"
29+ #include "packet_mosq.h"
2730#include "send_mosq.h"
2831#include "util_mosq.h"
2932
3033
3134int mosquitto_publish (struct mosquitto * mosq , int * mid , const char * topic , int payloadlen , const void * payload , int qos , bool retain )
35+ {
36+ return mosquitto_publish_v5 (mosq , mid , topic , payloadlen , payload , qos , retain , NULL );
37+ }
38+
39+ int mosquitto_publish_v5 (struct mosquitto * mosq , int * mid , const char * topic , int payloadlen , const void * payload , int qos , bool retain , const mosquitto_property * properties )
3240{
3341 struct mosquitto_message_all * message ;
3442 uint16_t local_mid ;
35- int queue_status ;
43+ const mosquitto_property * p ;
44+ const mosquitto_property * outgoing_properties = NULL ;
45+ mosquitto_property * properties_copy = NULL ;
46+ mosquitto_property local_property ;
47+ bool have_topic_alias ;
48+ int rc ;
49+ size_t tlen = 0 ;
50+ uint32_t remaining_length ;
51+
52+ if (!mosq || qos < 0 || qos > 2 ) return MOSQ_ERR_INVAL ;
53+ if (mosq -> protocol != mosq_p_mqtt5 && properties ) return MOSQ_ERR_NOT_SUPPORTED ;
54+ if (qos > mosq -> max_qos ) return MOSQ_ERR_QOS_NOT_SUPPORTED ;
55+
56+ if (!mosq -> retain_available ){
57+ retain = false;
58+ }
3659
37- if (!mosq || !topic || qos < 0 || qos > 2 ) return MOSQ_ERR_INVAL ;
38- if (STREMPTY (topic )) return MOSQ_ERR_INVAL ;
39- if (mosquitto_validate_utf8 (topic , strlen (topic ))) return MOSQ_ERR_MALFORMED_UTF8 ;
40- if (payloadlen < 0 || payloadlen > MQTT_MAX_PAYLOAD ) return MOSQ_ERR_PAYLOAD_SIZE ;
60+ if (properties ){
61+ if (properties -> client_generated ){
62+ outgoing_properties = properties ;
63+ }else {
64+ memcpy (& local_property , properties , sizeof (mosquitto_property ));
65+ local_property .client_generated = true;
66+ local_property .next = NULL ;
67+ outgoing_properties = & local_property ;
68+ }
69+ rc = mosquitto_property_check_all (CMD_PUBLISH , outgoing_properties );
70+ if (rc ) return rc ;
71+ }
72+
73+ if (!topic || STREMPTY (topic )){
74+ if (topic ) topic = NULL ;
75+
76+ if (mosq -> protocol == mosq_p_mqtt5 ){
77+ p = outgoing_properties ;
78+ have_topic_alias = false;
79+ while (p ){
80+ if (p -> identifier == MQTT_PROP_TOPIC_ALIAS ){
81+ have_topic_alias = true;
82+ break ;
83+ }
84+ p = p -> next ;
85+ }
86+ if (have_topic_alias == false){
87+ return MOSQ_ERR_INVAL ;
88+ }
89+ }else {
90+ return MOSQ_ERR_INVAL ;
91+ }
92+ }else {
93+ tlen = strlen (topic );
94+ if (mosquitto_validate_utf8 (topic , (int )tlen )) return MOSQ_ERR_MALFORMED_UTF8 ;
95+ if (payloadlen < 0 || payloadlen > (int )MQTT_MAX_PAYLOAD ) return MOSQ_ERR_PAYLOAD_SIZE ;
96+ if (mosquitto_pub_topic_check (topic ) != MOSQ_ERR_SUCCESS ){
97+ return MOSQ_ERR_INVAL ;
98+ }
99+ }
41100
42- if (mosquitto_pub_topic_check (topic ) != MOSQ_ERR_SUCCESS ){
43- return MOSQ_ERR_INVAL ;
101+ if (mosq -> maximum_packet_size > 0 ){
102+ remaining_length = 1 + 2 + (uint32_t )tlen + (uint32_t )payloadlen + property__get_length_all (outgoing_properties );
103+ if (qos > 0 ){
104+ remaining_length ++ ;
105+ }
106+ if (packet__check_oversize (mosq , remaining_length )){
107+ return MOSQ_ERR_OVERSIZE_PACKET ;
108+ }
44109 }
45110
46111 local_mid = mosquitto__mid_generate (mosq );
@@ -49,74 +114,167 @@ int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int p
49114 }
50115
51116 if (qos == 0 ){
52- return send__publish (mosq , local_mid , topic , payloadlen , payload , qos , retain , false);
117+ return send__publish (mosq , local_mid , topic , ( uint32_t ) payloadlen , payload , ( uint8_t ) qos , retain , false, outgoing_properties , NULL , 0 );
53118 }else {
119+ if (outgoing_properties ){
120+ rc = mosquitto_property_copy_all (& properties_copy , outgoing_properties );
121+ if (rc ) return rc ;
122+ }
54123 message = mosquitto__calloc (1 , sizeof (struct mosquitto_message_all ));
55- if (!message ) return MOSQ_ERR_NOMEM ;
124+ if (!message ){
125+ mosquitto_property_free_all (& properties_copy );
126+ return MOSQ_ERR_NOMEM ;
127+ }
56128
57129 message -> next = NULL ;
58130 message -> timestamp = mosquitto_time ();
59131 message -> msg .mid = local_mid ;
60- message -> msg .topic = mosquitto__strdup (topic );
61- if (!message -> msg .topic ){
62- message__cleanup (& message );
63- return MOSQ_ERR_NOMEM ;
132+ if (topic ){
133+ message -> msg .topic = mosquitto__strdup (topic );
134+ if (!message -> msg .topic ){
135+ message__cleanup (& message );
136+ mosquitto_property_free_all (& properties_copy );
137+ return MOSQ_ERR_NOMEM ;
138+ }
64139 }
65140 if (payloadlen ){
66141 message -> msg .payloadlen = payloadlen ;
67- message -> msg .payload = mosquitto__malloc (payloadlen * sizeof (uint8_t ));
142+ message -> msg .payload = mosquitto__malloc (( unsigned int ) payloadlen * sizeof (uint8_t ));
68143 if (!message -> msg .payload ){
69144 message__cleanup (& message );
145+ mosquitto_property_free_all (& properties_copy );
70146 return MOSQ_ERR_NOMEM ;
71147 }
72- memcpy (message -> msg .payload , payload , payloadlen * sizeof (uint8_t ));
148+ memcpy (message -> msg .payload , payload , ( uint32_t ) payloadlen * sizeof (uint8_t ));
73149 }else {
74150 message -> msg .payloadlen = 0 ;
75151 message -> msg .payload = NULL ;
76152 }
77- message -> msg .qos = qos ;
153+ message -> msg .qos = ( uint8_t ) qos ;
78154 message -> msg .retain = retain ;
79155 message -> dup = false;
156+ message -> properties = properties_copy ;
80157
81- pthread_mutex_lock (& mosq -> out_message_mutex );
82- queue_status = message__queue (mosq , message , mosq_md_out );
83- if (queue_status == 0 ){
84- if (qos == 1 ){
85- message -> state = mosq_ms_wait_for_puback ;
86- }else if (qos == 2 ){
87- message -> state = mosq_ms_wait_for_pubrec ;
88- }
89- pthread_mutex_unlock (& mosq -> out_message_mutex );
90- return send__publish (mosq , message -> msg .mid , message -> msg .topic , message -> msg .payloadlen , message -> msg .payload , message -> msg .qos , message -> msg .retain , message -> dup );
91- }else {
92- message -> state = mosq_ms_invalid ;
93- pthread_mutex_unlock (& mosq -> out_message_mutex );
94- return MOSQ_ERR_SUCCESS ;
95- }
158+ pthread_mutex_lock (& mosq -> msgs_out .mutex );
159+ message -> state = mosq_ms_invalid ;
160+ rc = message__queue (mosq , message , mosq_md_out );
161+ pthread_mutex_unlock (& mosq -> msgs_out .mutex );
162+ return rc ;
96163 }
97164}
98165
99166
100167int mosquitto_subscribe (struct mosquitto * mosq , int * mid , const char * sub , int qos )
101168{
102- if (!mosq ) return MOSQ_ERR_INVAL ;
169+ return mosquitto_subscribe_multiple (mosq , mid , 1 , (char * const * const )& sub , qos , 0 , NULL );
170+ }
171+
172+
173+ int mosquitto_subscribe_v5 (struct mosquitto * mosq , int * mid , const char * sub , int qos , int options , const mosquitto_property * properties )
174+ {
175+ return mosquitto_subscribe_multiple (mosq , mid , 1 , (char * const * const )& sub , qos , options , properties );
176+ }
177+
178+
179+ int mosquitto_subscribe_multiple (struct mosquitto * mosq , int * mid , int sub_count , char * const * const sub , int qos , int options , const mosquitto_property * properties )
180+ {
181+ const mosquitto_property * outgoing_properties = NULL ;
182+ mosquitto_property local_property ;
183+ int i ;
184+ int rc ;
185+ uint32_t remaining_length = 0 ;
186+ int slen ;
187+
188+ if (!mosq || !sub_count || !sub ) return MOSQ_ERR_INVAL ;
189+ if (mosq -> protocol != mosq_p_mqtt5 && properties ) return MOSQ_ERR_NOT_SUPPORTED ;
190+ if (qos < 0 || qos > 2 ) return MOSQ_ERR_INVAL ;
191+ if ((options & 0x30 ) == 0x30 || (options & 0xC0 ) != 0 ) return MOSQ_ERR_INVAL ;
103192 if (mosq -> sock == INVALID_SOCKET ) return MOSQ_ERR_NO_CONN ;
104193
105- if (mosquitto_sub_topic_check (sub )) return MOSQ_ERR_INVAL ;
106- if (mosquitto_validate_utf8 (sub , strlen (sub ))) return MOSQ_ERR_MALFORMED_UTF8 ;
194+ if (properties ){
195+ if (properties -> client_generated ){
196+ outgoing_properties = properties ;
197+ }else {
198+ memcpy (& local_property , properties , sizeof (mosquitto_property ));
199+ local_property .client_generated = true;
200+ local_property .next = NULL ;
201+ outgoing_properties = & local_property ;
202+ }
203+ rc = mosquitto_property_check_all (CMD_SUBSCRIBE , outgoing_properties );
204+ if (rc ) return rc ;
205+ }
206+
207+ for (i = 0 ; i < sub_count ; i ++ ){
208+ if (mosquitto_sub_topic_check (sub [i ])) return MOSQ_ERR_INVAL ;
209+ slen = (int )strlen (sub [i ]);
210+ if (mosquitto_validate_utf8 (sub [i ], slen )) return MOSQ_ERR_MALFORMED_UTF8 ;
211+ remaining_length += 2 + (uint32_t )slen + 1 ;
212+ }
107213
108- return send__subscribe (mosq , mid , sub , qos );
214+ if (mosq -> maximum_packet_size > 0 ){
215+ remaining_length += 2 + property__get_length_all (outgoing_properties );
216+ if (packet__check_oversize (mosq , remaining_length )){
217+ return MOSQ_ERR_OVERSIZE_PACKET ;
218+ }
219+ }
220+ if (mosq -> protocol == mosq_p_mqtt311 || mosq -> protocol == mosq_p_mqtt31 ){
221+ options = 0 ;
222+ }
223+
224+ return send__subscribe (mosq , mid , sub_count , sub , qos |options , outgoing_properties );
109225}
110226
111227
112228int mosquitto_unsubscribe (struct mosquitto * mosq , int * mid , const char * sub )
113229{
230+ return mosquitto_unsubscribe_multiple (mosq , mid , 1 , (char * const * const )& sub , NULL );
231+ }
232+
233+ int mosquitto_unsubscribe_v5 (struct mosquitto * mosq , int * mid , const char * sub , const mosquitto_property * properties )
234+ {
235+ return mosquitto_unsubscribe_multiple (mosq , mid , 1 , (char * const * const )& sub , properties );
236+ }
237+
238+ int mosquitto_unsubscribe_multiple (struct mosquitto * mosq , int * mid , int sub_count , char * const * const sub , const mosquitto_property * properties )
239+ {
240+ const mosquitto_property * outgoing_properties = NULL ;
241+ mosquitto_property local_property ;
242+ int rc ;
243+ int i ;
244+ uint32_t remaining_length = 0 ;
245+ int slen ;
246+
114247 if (!mosq ) return MOSQ_ERR_INVAL ;
248+ if (mosq -> protocol != mosq_p_mqtt5 && properties ) return MOSQ_ERR_NOT_SUPPORTED ;
115249 if (mosq -> sock == INVALID_SOCKET ) return MOSQ_ERR_NO_CONN ;
116250
117- if (mosquitto_sub_topic_check (sub )) return MOSQ_ERR_INVAL ;
118- if (mosquitto_validate_utf8 (sub , strlen (sub ))) return MOSQ_ERR_MALFORMED_UTF8 ;
251+ if (properties ){
252+ if (properties -> client_generated ){
253+ outgoing_properties = properties ;
254+ }else {
255+ memcpy (& local_property , properties , sizeof (mosquitto_property ));
256+ local_property .client_generated = true;
257+ local_property .next = NULL ;
258+ outgoing_properties = & local_property ;
259+ }
260+ rc = mosquitto_property_check_all (CMD_UNSUBSCRIBE , outgoing_properties );
261+ if (rc ) return rc ;
262+ }
263+
264+ for (i = 0 ; i < sub_count ; i ++ ){
265+ if (mosquitto_sub_topic_check (sub [i ])) return MOSQ_ERR_INVAL ;
266+ slen = (int )strlen (sub [i ]);
267+ if (mosquitto_validate_utf8 (sub [i ], slen )) return MOSQ_ERR_MALFORMED_UTF8 ;
268+ remaining_length += 2U + (uint32_t )slen ;
269+ }
270+
271+ if (mosq -> maximum_packet_size > 0 ){
272+ remaining_length += 2U + property__get_length_all (outgoing_properties );
273+ if (packet__check_oversize (mosq , remaining_length )){
274+ return MOSQ_ERR_OVERSIZE_PACKET ;
275+ }
276+ }
119277
120- return send__unsubscribe (mosq , mid , sub );
278+ return send__unsubscribe (mosq , mid , sub_count , sub , outgoing_properties );
121279}
122280
0 commit comments