Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion examples/mqtt_client_example/src/mqtt_client_example.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ start() ->
%% Start the MQTT client.
%%
Config = #{
url => "mqtt://mqtt.eclipseprojects.io",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replaced with a working alternative

url => "mqtt://broker.hivemq.com",
% username => "some-user", % (optional)
% password => "some-password", % (optional)
% client_id => "some-client", % (optional - defaults to: atomvm-<DEVICE-MAC-ADDRESS>)
connected_handler => fun handle_connected/1
},
{ok, _MQTT} = mqtt_client:start(Config),
Expand Down
25 changes: 20 additions & 5 deletions markdown/mqtt_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ The input parameter to the `start/1` function is an Erlang `map` structure, cont

%% erlang
Config = #{
url => "mqtt://mqtt.eclipseprojects.io",
url => "mqtt://broker.hivemq.com",
connected_handler => fun handle_connected/1,
disconnected_handler => fun handle_disconnected/1,
error_handler => handle_error/2
Expand Down Expand Up @@ -113,7 +113,7 @@ You can publish a message using the `publish/4`
%% erlang
Topic = <<"atomvm/topic0">>,
Message = <<"Hello!">>,
MsgId = mqtt_client:publish(MTQQ, Topic, Message).
MsgId = mqtt_client:publish(MQTT, Topic, Message).

The above function call will publish a message to the specified topic using the MQTT QoS `at_most_once`. Note that messages sent with `at_most_once` QoS are not subject to notification.

Expand Down Expand Up @@ -151,7 +151,7 @@ Subscribe to an MQTT topic by using the `subscribe/3` function. Specify a topic
subscribed_handler = fun handle_subscribed/2,
data_handler = fun handle_data/3
},
ok = mqtt_client:subscribe(MTQQ, Topic, SubscribeOptions).
ok = mqtt_client:subscribe(MQTT, Topic, SubscribeOptions).

The `subscribe/3` function will return `{error, already_subscribed}` if the client application is already subscribed to the specified topic.

Expand All @@ -173,14 +173,14 @@ The `data_handler` will be passed the MQTT client instance, topic on which the m

### Unsubscribing from an MQTT topic

Use the `unscibscribe/3` function to unsubscribe from a topic.
Use the `unsubscribe/3` function to unsubscribe from a topic.

%% erlang
Topic = <<"atomvm/topic0">>,
UnSubscribeOptions = #{
unsubscribed_handler = fun handle_unsubscribed/2
},
ok = mqtt_client:unsubscribe(MTQQ, Topic, UnSubscribeOptions).
ok = mqtt_client:unsubscribe(MQTT, Topic, UnSubscribeOptions).

The `unsubscribe/3` function will return `{error, not_subscribed}` if the client application is not yet subscribed to the specified topic.

Expand All @@ -199,6 +199,21 @@ TODO

#### Username/Password authentication

The username and password can be specified either as parameters to `mqtt_client:start/1` in the configuration map or directly in the broker URL (if supported by the broker).

%% erlang
Config = #{
url => "mqtts://some-broker.io:8883"
username => <<"test">>,
password => <<"milkstout">>,
...
}
% or
Config = #{
url => "mqtts://test:[email protected]:8883",
...
}

#### Connecting via TLS

##### Client TLS Authentication
Expand Down
28 changes: 22 additions & 6 deletions ports/atomvm_mqtt_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ static const char *const unsubscribe_failed_atom = ATOM_STR("\x12", "unsu
static const char *const unsubscribed_atom = ATOM_STR("\xC", "unsubscribed");
static const char *const url_atom = ATOM_STR("\x3", "url");
static const char *const username_atom = ATOM_STR("\x8", "username");
static const char *const client_id_atom = ATOM_STR("\x9", "client_id");

// error codes
static const char *const bad_username_atom = ATOM_STR("\x0C", "bad_username");
Expand Down Expand Up @@ -700,12 +701,11 @@ void atomvm_mqtt_client_init(GlobalContext *global)
esp_log_level_set("MQTT_CLIENT", ESP_LOG_VERBOSE);
}

// NB. Caller assumes ownership of returned string
static char *maybe_get_string(term kv, AtomString key, GlobalContext *global)
static char* maybe_get_string_or_default(term kv, AtomString key, char *default_value, GlobalContext *global)
{
term value_term = interop_kv_get_value(kv, key, global);
if (!term_is_string(value_term) && !term_is_binary(value_term)) {
return NULL;
return default_value;
}

int ok;
Expand All @@ -717,6 +717,12 @@ static char *maybe_get_string(term kv, AtomString key, GlobalContext *global)
return value_str;
}

// NB. Caller assumes ownership of returned string
static char *maybe_get_string(term kv, AtomString key, GlobalContext *global)
{
return maybe_get_string_or_default(kv, key, NULL, global);
}

// NB. Caller assumes ownership of returned string
// static char *get_string_default(term kv, AtomString key, AtomString default_value, GlobalContext *global)
// {
Expand Down Expand Up @@ -785,17 +791,26 @@ Context *atomvm_mqtt_client_create_port(GlobalContext *global, term opts)
UNUSED(port);
char *username_str = maybe_get_string(opts, username_atom, global);
char *password_str = maybe_get_string(opts, password_atom, global);
char *client_id_str = maybe_get_string_or_default(opts, client_id_atom, get_default_client_id(), global);
// todo: implement cert support
// char *cert_str = maybe_get_string(opts, cert_atom, global);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not remove this commented code, left by the original author as a reminder, or at least change it to a TODO: comment to add certificate based authentication.


// Note that char * values passed into this struct are copied into the MQTT state
const char *client_id = get_default_client_id();
esp_mqtt_client_config_t mqtt_cfg = {
#if ESP_IDF_VERSION_MAJOR >= 5
.broker.address.uri = url_str,
.credentials.client_id = client_id
.credentials = {
.username = username_str,
.client_id = client_id_str,
.authentication = {
.password = password_str
}
},
#else
.uri = url_str,
.client_id = client_id,
.username = username_str,
.password = password_str
.client_id = client_id_str,
.user_context = (void *) ctx
#endif
};
Expand All @@ -805,6 +820,7 @@ Context *atomvm_mqtt_client_create_port(GlobalContext *global, term opts)
free(host_str);
free(username_str);
free(password_str);
free(client_id_str);

if (UNLIKELY(IS_NULL_PTR(client))) {
ESP_LOGE(TAG, "Error: Unable to initialize MQTT client.\n");
Expand Down
9 changes: 6 additions & 3 deletions src/mqtt_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@
error_handler => fun((mqtt(), error()) -> any()),
username => binary_or_string(),
password => binary_or_string(),
client_id => binary_or_string(),
trusted_cert => binary_or_string()
client_id => binary_or_string()
}.

-type error_type() :: esp_tls | connection_refused | undefined.
Expand Down Expand Up @@ -494,7 +493,11 @@ init(Config) ->
try
Self = self(),
Port = erlang:open_port({spawn, "atomvm_mqtt_client"}, [
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

username, password, client_id never made it into the port as they were missing from the opts here.

{receiver, Self}, {url, maps:get(url, Config)}
{receiver, Self},
{url, maps:get(url, Config)},
{username, maps:get(username, Config)},
{password, maps:get(password, Config)},
{client_id, maps:get(client_id, Config)}
]),
{ok, #state{
port = Port,
Expand Down