Skip to content

Commit 7404792

Browse files
committed
fix(modem): WIP for multiple conn
1 parent 1cf0041 commit 7404792

File tree

4 files changed

+104
-28
lines changed

4 files changed

+104
-28
lines changed

components/esp_modem/examples/modem_tcp_client/main/generate/sock_dce.cpp

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ constexpr auto const *TAG = "sock_dce";
1919
std::vector<DCE*> DCE::dce_list{};
2020
bool DCE::network_init = false;
2121
int Responder::s_link_id = 0;
22+
SemaphoreHandle_t Responder::s_dte_mutex{};
2223

2324
// Constructor - add this DCE instance to the static list
2425
DCE::DCE(std::shared_ptr<esp_modem::DTE> dte_arg, const esp_modem_dce_config *config)
@@ -82,13 +83,22 @@ bool DCE::perform_sock()
8283

8384
void DCE::perform_at(uint8_t *data, size_t len)
8485
{
85-
ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_VERBOSE);
86+
std::string_view resp_sv((char *)data, len);
87+
at.check_urc(state, resp_sv);
88+
if (state == status::IDLE) {
89+
return;
90+
}
91+
ESP_LOG_BUFFER_HEXDUMP(TAG, data, len, ESP_LOG_INFO);
8692
switch (at.process_data(state, data, len)) {
8793
case Responder::ret::OK:
94+
ESP_LOGW(TAG, "GIVE data %d", at.link_id);
95+
xSemaphoreGive(at.s_dte_mutex);
8896
state = status::IDLE;
8997
signal.set(IDLE);
9098
return;
9199
case Responder::ret::FAIL:
100+
ESP_LOGW(TAG, "GIVE data %d", at.link_id);
101+
xSemaphoreGive(at.s_dte_mutex);
92102
state = status::FAILED;
93103
signal.set(IDLE);
94104
return;
@@ -103,10 +113,14 @@ void DCE::perform_at(uint8_t *data, size_t len)
103113
std::string_view response((char *)data, len);
104114
switch (at.check_async_replies(state, response)) {
105115
case Responder::ret::OK:
116+
ESP_LOGW(TAG, "GIVE command %d", at.link_id);
117+
xSemaphoreGive(at.s_dte_mutex);
106118
state = status::IDLE;
107119
signal.set(IDLE);
108120
return;
109121
case Responder::ret::FAIL:
122+
ESP_LOGW(TAG, "GIVE command %d", at.link_id);
123+
xSemaphoreGive(at.s_dte_mutex);
110124
state = status::FAILED;
111125
signal.set(IDLE);
112126
return;
@@ -152,6 +166,9 @@ bool DCE::at_to_sock()
152166
close_sock();
153167
return false;
154168
}
169+
ESP_LOGI(TAG, "TAKE RECV %d", at.link_id);
170+
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
171+
ESP_LOGE(TAG, "TAKE RECV %d", at.link_id);
155172
state = status::RECEIVING;
156173
at.start_receiving(at.get_buf_len());
157174
return true;
@@ -160,8 +177,8 @@ bool DCE::at_to_sock()
160177
bool DCE::sock_to_at()
161178
{
162179
ESP_LOGD(TAG, "socket read: data available");
163-
if (!signal.wait(IDLE, 1000)) {
164-
ESP_LOGE(TAG, "Failed to get idle");
180+
if (!signal.wait(IDLE, 5000)) {
181+
ESP_LOGE(TAG, "Failed to get idle 2");
165182
close_sock();
166183
return false;
167184
}
@@ -170,6 +187,9 @@ bool DCE::sock_to_at()
170187
close_sock();
171188
return false;
172189
}
190+
ESP_LOGI(TAG, "TAKE SEND %d", at.link_id);
191+
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
192+
ESP_LOGE(TAG, "TAKE SEND %d", at.link_id);
173193
state = status::SENDING;
174194
int len = ::recv(sock, at.get_buf(), at.get_buf_len(), 0);
175195
if (len < 0) {
@@ -247,12 +267,15 @@ bool DCE::connect(std::string host, int port)
247267
{
248268
data_ready_fd = eventfd(0, EFD_SUPPORT_ISR);
249269
assert(data_ready_fd > 0);
250-
dte->on_read(nullptr);
251-
tcp_close();
252-
dte->on_read([this](uint8_t *data, size_t len) {
253-
read_callback(data, len);
254-
return esp_modem::command_result::TIMEOUT;
255-
});
270+
// dte->on_read(nullptr);
271+
// tcp_close();
272+
// dte->on_read([](uint8_t *data, size_t len) {
273+
// read_callback(data, len);
274+
// return esp_modem::command_result::TIMEOUT;
275+
// });
276+
ESP_LOGI(TAG, "TAKE CONNECT %d", at.link_id);
277+
xSemaphoreTake(at.s_dte_mutex, portMAX_DELAY);
278+
ESP_LOGE(TAG, "TAKE CONNECT %d", at.link_id);
256279
if (!at.start_connecting(host, port)) {
257280
ESP_LOGE(TAG, "Unable to start connecting");
258281
dte->on_read(nullptr);
@@ -268,6 +291,8 @@ bool DCE::init()
268291
return true;
269292
}
270293
network_init = true;
294+
Responder::s_dte_mutex = xSemaphoreCreateBinary();
295+
xSemaphoreGive(at.s_dte_mutex);
271296
esp_vfs_eventfd_config_t config = ESP_VFS_EVENTD_CONFIG_DEFAULT();
272297
esp_vfs_eventfd_register(&config);
273298

@@ -311,6 +336,10 @@ bool DCE::init()
311336
esp_modem::Task::Delay(5000);
312337
}
313338
ESP_LOGI(TAG, "Got IP %s", ip_addr.c_str());
339+
dte->on_read([](uint8_t *data, size_t len) {
340+
read_callback(data, len);
341+
return esp_modem::command_result::TIMEOUT;
342+
});
314343
return true;
315344
}
316345

components/esp_modem/examples/modem_tcp_client/main/generate/sock_dce.hpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ class Responder {
3434
sock(s), data_ready_fd(ready_fd), dte(dte_arg) {}
3535
ret process_data(status state, uint8_t *data, size_t len);
3636
ret check_async_replies(status state, std::string_view &response);
37+
ret check_urc(status state, std::string_view &response);
3738

3839
void start_sending(size_t len);
3940
void start_receiving(size_t len);
@@ -64,6 +65,7 @@ class Responder {
6465
}
6566

6667
int link_id{s_link_id++};
68+
static SemaphoreHandle_t s_dte_mutex;
6769
private:
6870
static int s_link_id;
6971
static constexpr size_t buffer_size = 512;
@@ -72,6 +74,7 @@ class Responder {
7274
{
7375
#ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
7476
::send(sock, data, len, 0);
77+
printf("sending %d\n", len);
7578
#else
7679
::memcpy(&buffer[actual_read], data, len);
7780
actual_read += len;
@@ -144,6 +147,7 @@ esp_modem::return_type name(ESP_MODEM_COMMAND_PARAMS(__VA_ARGS__));
144147
return 0;
145148
}
146149
at.clear_offsets();
150+
// TODO: MUTEX
147151
state = status::RECEIVING;
148152
uint64_t data;
149153
read(data_ready_fd, &data, sizeof(data));
@@ -166,6 +170,7 @@ esp_modem::return_type name(ESP_MODEM_COMMAND_PARAMS(__VA_ARGS__));
166170
if (!wait_to_idle(timeout_ms)) {
167171
return -1;
168172
}
173+
// TODO: MUTEX
169174
state = status::SENDING;
170175
memcpy(at.get_buf(), buffer, len_to_send);
171176
ESP_LOG_BUFFER_HEXDUMP("dce", at.get_buf(), len, ESP_LOG_VERBOSE);

components/esp_modem/examples/modem_tcp_client/main/modem_client.cpp

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@
2323
#include "tcp_transport_mbedtls.h"
2424
#include "tcp_transport_at.h"
2525

26-
#define BROKER_URL "test.mosquitto.org"
26+
// #define BROKER_URL "test.mosquitto.org"
27+
#define BROKER_URL "192.168.0.39"
2728
#define BROKER_PORT 1883
2829

2930

3031
static const char *TAG = "modem_client";
3132
static EventGroupHandle_t event_group = NULL;
3233
static const int CONNECT_BIT = BIT0;
3334
static const int GOT_DATA_BIT = BIT2;
35+
static const int DCE0_DONE = BIT3;
36+
static const int DCE1_DONE = BIT4;
3437

3538
static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_t event_id, void *event_data)
3639
{
@@ -75,9 +78,10 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
7578

7679
// sock_dce::DCE::dce_list{};
7780

81+
static void perform(void* ctx);
82+
7883
extern "C" void app_main(void)
7984
{
80-
8185
/* Init and register system/core components */
8286
ESP_ERROR_CHECK(esp_netif_init());
8387
ESP_ERROR_CHECK(esp_event_loop_create_default());
@@ -111,12 +115,34 @@ extern "C" void app_main(void)
111115
return;
112116
}
113117

118+
xTaskCreate(perform, "perform", 4096, dce.get(), 4, nullptr);
119+
120+
// vTaskDelay(pdMS_TO_TICKS(15000));
121+
/* create another DCE to serve a new connection */
122+
auto dce1 = sock_dce::create(&dce_config, dte);
123+
if (!dce1->init()) {
124+
ESP_LOGE(TAG, "Failed to setup network");
125+
return;
126+
}
127+
xTaskCreate(perform, "perform", 4096, dce1.get(), 4, nullptr);
128+
129+
xEventGroupWaitBits(event_group, DCE0_DONE | DCE1_DONE, pdFALSE, pdTRUE, portMAX_DELAY);
130+
}
131+
132+
static void perform(void* ctx)
133+
{
134+
auto dce = static_cast<sock_dce::DCE*>(ctx);
135+
char mqtt_client_id[] = "MQTT_CLIENT_0";
136+
static int counter = 0;
137+
const int id = counter++;
138+
mqtt_client_id[12] += id; // assumes different client id per each thread
114139
esp_mqtt_client_config_t mqtt_config = {};
115-
mqtt_config.broker.address.port = BROKER_PORT;
140+
mqtt_config.broker.address.port = BROKER_PORT + id;
116141
mqtt_config.session.message_retransmit_timeout = 10000;
142+
mqtt_config.credentials.client_id = mqtt_client_id;
117143
#ifndef CONFIG_EXAMPLE_CUSTOM_TCP_TRANSPORT
118144
mqtt_config.broker.address.uri = "mqtt://127.0.0.1";
119-
dce->start_listening(BROKER_PORT);
145+
dce->start_listening(BROKER_PORT + id);
120146
#else
121147
mqtt_config.broker.address.uri = "mqtt://" BROKER_URL;
122148
esp_transport_handle_t at = esp_transport_at_init(dce.get());
@@ -146,8 +172,6 @@ extern "C" void app_main(void)
146172
ESP_LOGI(TAG, "Network reinitialized, retrying");
147173
}
148174
}
149-
#else
150-
vTaskDelay(portMAX_DELAY);
151175
#endif
152-
176+
xEventGroupSetBits(event_group, id ? DCE0_DONE : DCE1_DONE);
153177
}

components/esp_modem/examples/modem_tcp_client/main/sock_commands_espat.cpp

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,18 @@ command_result net_open(CommandableIf *t)
6767
ESP_LOGI(TAG, "Multiple connections mode enabled");
6868

6969
// Set passive receive mode (1) for better control
70-
ret = set_rx_mode(t, 1);
71-
if (ret != command_result::OK) {
72-
ESP_LOGE(TAG, "Failed to set preferred Rx mode");
73-
return ret;
70+
for (int i = 0; i < 2; i++) {
71+
std::string cmd = "AT+CIPRECVTYPE=" + std::to_string(i) + ",1\r\n";
72+
dce_commands::generic_command(t, cmd, "OK", "ERROR", 1000);
7473
}
74+
// std::string cmd = "AT+CIPRECVTYPE=" + std::to_string(link_id) + "," + std::to_string(mode) + "\r\n";
75+
// return dce_commands::generic_command(t, cmd, "OK", "ERROR", 1000);
76+
//
77+
// ret = set_rx_mode(t, 1);
78+
// if (ret != command_result::OK) {
79+
// ESP_LOGE(TAG, "Failed to set preferred Rx mode");
80+
// return ret;
81+
// }
7582
return command_result::OK;
7683
}
7784

@@ -111,6 +118,7 @@ command_result net_close(CommandableIf *t)
111118

112119
command_result tcp_close(CommandableIf *t)
113120
{
121+
return command_result::OK;
114122
ESP_LOGV(TAG, "%s", __func__);
115123
// Use link ID 0 for closing connection
116124
const int link_id = 0;
@@ -192,6 +200,12 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
192200
auto *recv_data = (char *)data;
193201

194202
if (data_to_recv == 0) {
203+
// const std::string_view error_str = "ERROR";
204+
// const std::string_view data_sv(recv_data, len);
205+
// if (data_sv.find(error_str) == std::string_view::npos) {
206+
// // no data,
207+
// return ret::OK;
208+
// }
195209
const std::string_view head = "+CIPRECVDATA:";
196210

197211
// Find the response header
@@ -200,7 +214,7 @@ Responder::ret Responder::recv(uint8_t *data, size_t len)
200214
});
201215

202216
if (head_pos == recv_data + len) {
203-
return ret::FAIL;
217+
return ret::IN_PROGRESS;
204218
}
205219

206220
// Find the end of the length field
@@ -316,6 +330,17 @@ Responder::ret Responder::connect(std::string_view response)
316330
}
317331
return ret::IN_PROGRESS;
318332
}
333+
Responder::ret Responder::check_urc(status state, std::string_view &response)
334+
{
335+
// Handle data notifications - in multiple connections mode, format is +IPD,<link ID>,<len>
336+
std::string expected_urc = "+IPD," + std::to_string(link_id);
337+
if (response.find(expected_urc) != std::string::npos) {
338+
uint64_t data_ready = 1;
339+
write(data_ready_fd, &data_ready, sizeof(data_ready));
340+
ESP_LOGD(TAG, "Data available notification");
341+
}
342+
return ret::IN_PROGRESS;
343+
}
319344

320345
Responder::ret Responder::check_async_replies(status state, std::string_view &response)
321346
{
@@ -336,13 +361,6 @@ Responder::ret Responder::check_async_replies(status state, std::string_view &re
336361
return ret::FAIL;
337362
}
338363

339-
// Handle data notifications - in multiple connections mode, format is +IPD,<link ID>,<len>
340-
if (response.find("+IPD,") != std::string::npos) {
341-
uint64_t data_ready = 1;
342-
write(data_ready_fd, &data_ready, sizeof(data_ready));
343-
ESP_LOGD(TAG, "Data available notification");
344-
}
345-
346364
if (state == status::SENDING) {
347365
return send(response);
348366
} else if (state == status::CONNECTING) {

0 commit comments

Comments
 (0)