11% %
2- % % Copyright (c) 2021 dushin.net
2+ % % Copyright (c) 2021-2023 dushin.net
33% % All rights reserved.
44% %
55% % Licensed under the Apache License, Version 2.0 (the "License");
@@ -33,16 +33,8 @@ start() ->
3333 {ok , _MQTT } = mqtt_client :start (Config ),
3434 io :format (" MQTT started.~n " ),
3535
36- loop_forever ( ).
36+ timer : sleep ( infinity ).
3737
38- loop_forever () ->
39- receive
40- halt -> halt
41- end .
42-
43- % %
44- % % connected callback. This function will be called
45- % %
4638handle_connected (MQTT ) ->
4739 Config = mqtt_client :get_config (MQTT ),
4840 Topic = <<" atomvm/qos0" >>,
@@ -60,15 +52,10 @@ handle_subscribed(MQTT, Topic) ->
6052
6153handle_data (_MQTT , Topic , Data ) ->
6254 io :format (" Received data on topic ~p : ~p ~n " , [Topic , Data ]),
63- % io:format("Pending publishes: ~p~n", [mqtt_client:get_pending_publishes(MQTT)]),
64- % io:format("Pending subscriptions: ~p~n", [mqtt_client:get_pending_subscriptions(MQTT)]),
65- % io:format("Pending unsubscriptions: ~p~n", [mqtt_client:get_pending_unsubscriptions(MQTT)]),
66- io :format (" process count: ~p~n " , [erlang :system_info (process_count )]),
67- io :format (" Free heap on handle_data: ~p~n " , [erlang :system_info (esp32_free_heap_size )]),
6855 ok .
6956
7057start_network (StaConfig ) ->
71- case network_fsm :wait_for_sta (StaConfig ) of
58+ case network :wait_for_sta (StaConfig ) of
7259 {ok , {Address , Netmask , Gateway }} ->
7360 io :format (
7461 " Acquired IP address: ~s Netmask: ~s Gateway: ~s~n " ,
@@ -81,8 +68,27 @@ start_network(StaConfig) ->
8168
8269publish_loop (MQTT , Topic , Seq ) ->
8370 io :format (" Publishing data on topic ~p~n " , [Topic ]),
84- _ = mqtt_client :publish (MQTT , Topic , list_to_binary (" echo" ++ integer_to_list (Seq ))),
85- timer :sleep (5000 ),
86- io :format (" process count: ~p~n " , [erlang :system_info (process_count )]),
87- io :format (" Free heap after publish: ~p~n " , [erlang :system_info (esp32_free_heap_size )]),
71+ try
72+ Self = self (),
73+ HandlePublished = fun (MQTT2 , Topic2 , MsgId ) ->
74+ Self ! published ,
75+ handle_published (MQTT2 , Topic2 , MsgId )
76+ end ,
77+ PublishOptions = #{qos => at_least_once , published_handler => HandlePublished },
78+ Msg = list_to_binary (" echo" ++ integer_to_list (Seq )),
79+ _ = mqtt_client :publish (MQTT , Topic , Msg , PublishOptions ),
80+ receive
81+ published ->
82+ ok
83+ after 10000 ->
84+ io :format (" Timed out waiting for publish ack~n " )
85+ end
86+ catch
87+ C :E :S ->
88+ io :format (" Error in publish: ~p :~p~p~n " , [C , E , S ])
89+ end ,
90+ timer :sleep (1000 ),
8891 publish_loop (MQTT , Topic , Seq + 1 ).
92+
93+ handle_published (MQTT , Topic , MsgId ) ->
94+ io :format (" MQTT ~p published to topic ~p msg_id=~p~n " , [MQTT , Topic , MsgId ]).
0 commit comments