Skip to content

Commit 5783a22

Browse files
Merge branch 'update_mqtt_pre_refactor' into 'master'
Update esp-mqtt submodule to 6af4446a Closes IDFGH-11179, IDFGH-14022, IDFGH-14489, and IDFGH-14616 See merge request espressif/esp-idf!38893
2 parents 1390440 + 412e7c4 commit 5783a22

File tree

5 files changed

+98
-45
lines changed

5 files changed

+98
-45
lines changed

tools/test_apps/protocols/mqtt/publish_connect_test/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
cmake_minimum_required(VERSION 3.16)
44

55
include($ENV{IDF_PATH}/tools/cmake/project.cmake)
6+
idf_build_set_property(MINIMAL_BUILD ON)
67

78
project(mqtt_publish_connect_test)
89

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
idf_component_register(SRCS "publish_test.c" "connect_test.c" "publish_connect_test.c"
2-
INCLUDE_DIRS ".")
2+
INCLUDE_DIRS "."
3+
REQUIRES mqtt nvs_flash console esp_netif)
34
target_compile_options(${COMPONENT_LIB} PRIVATE "-Wno-format")

tools/test_apps/protocols/mqtt/publish_connect_test/main/publish_test.c

Lines changed: 38 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -67,31 +67,44 @@ static void mqtt_event_handler(void *handler_args, esp_event_base_t base, int32_
6767
ESP_LOGI(TAG, "MQTT_EVENT_DATA");
6868
ESP_LOGI(TAG, "TOPIC=%.*s", event->topic_len, event->topic);
6969
ESP_LOGI(TAG, "ID=%d, total_len=%d, data_len=%d, current_data_offset=%d", event->msg_id, event->total_data_len, event->data_len, event->current_data_offset);
70-
if (event->topic) {
71-
actual_len = event->data_len;
72-
msg_id = event->msg_id;
73-
} else {
74-
actual_len += event->data_len;
75-
// check consistency with msg_id across multiple data events for single msg
76-
if (msg_id != event->msg_id) {
77-
ESP_LOGI(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id);
78-
abort();
79-
}
80-
}
81-
memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len);
82-
if (actual_len == event->total_data_len) {
83-
if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) {
84-
memset(test_data->received_data, 0, test_data->expected_size);
85-
test_data->nr_of_msg_received ++;
86-
if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) {
87-
ESP_LOGI(TAG, "Correct pattern received exactly x times");
88-
ESP_LOGI(TAG, "Test finished correctly!");
89-
}
90-
} else {
91-
ESP_LOGE(TAG, "FAILED!");
92-
abort();
93-
}
94-
}
70+
if (event->current_data_offset == 0) {
71+
actual_len = event->data_len;
72+
msg_id = event->msg_id;
73+
if (event->total_data_len != test_data->expected_size) {
74+
ESP_LOGE(TAG, "Incorrect message size: %d != %d", event->total_data_len, test_data->expected_size);
75+
abort();
76+
}
77+
} else {
78+
actual_len += event->data_len;
79+
// check consistency with msg_id across multiple data events for single msg
80+
if (msg_id != event->msg_id) {
81+
ESP_LOGE(TAG, "Wrong msg_id in chunked message %d != %d", msg_id, event->msg_id);
82+
abort();
83+
}
84+
}
85+
if (event->current_data_offset + event->data_len > test_data->expected_size) {
86+
ESP_LOGE(TAG, "Buffer overflow detected: offset %d + data_len %d > buffer size %d", event->current_data_offset, event->data_len, test_data->expected_size);
87+
abort();
88+
}
89+
if (memcmp(test_data->expected + event->current_data_offset, event->data, event->data_len) != 0) {
90+
ESP_LOGE(TAG, "Data mismatch at offset %d: \n expected %.*s, \n got %.*s", event->current_data_offset, event->data_len, test_data->expected + event->current_data_offset, event->data_len, event->data);
91+
abort();
92+
}
93+
94+
memcpy(test_data->received_data + event->current_data_offset, event->data, event->data_len);
95+
if (actual_len == event->total_data_len) {
96+
if (0 == memcmp(test_data->received_data, test_data->expected, test_data->expected_size)) {
97+
memset(test_data->received_data, 0, test_data->expected_size);
98+
test_data->nr_of_msg_received++;
99+
if (test_data->nr_of_msg_received == test_data->nr_of_msg_expected) {
100+
ESP_LOGI(TAG, "Correct pattern received exactly x times");
101+
ESP_LOGI(TAG, "Test finished correctly!");
102+
}
103+
} else {
104+
ESP_LOGE(TAG, "FAILED!");
105+
abort();
106+
}
107+
}
95108
break;
96109
case MQTT_EVENT_ERROR:
97110
ESP_LOGE(TAG, "MQTT_EVENT_ERROR");

tools/test_apps/protocols/mqtt/publish_connect_test/pytest_mqtt_publish_app.py

Lines changed: 56 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@
88
import re
99
import ssl
1010
import string
11+
import time
1112
from itertools import count
1213
from itertools import product
1314
from threading import Event
1415
from threading import Lock
1516
from typing import Any
1617
from typing import Dict
1718
from typing import List
18-
from typing import no_type_check
1919
from typing import Tuple
20+
from typing import no_type_check
2021

2122
import paho.mqtt.client as mqtt
2223
import pexpect
@@ -61,7 +62,7 @@ def on_connect(self, mqttc: Any, obj: Any, flags: Any, rc: int) -> None:
6162
def on_connect_fail(self, mqttc: Any, obj: Any) -> None:
6263
logging.error('Connect failed')
6364

64-
def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
65+
def on_message(self, mqttc: mqtt.Client, obj: Any, msg: mqtt.MQTTMessage) -> None:
6566
payload = msg.payload.decode('utf-8')
6667
if payload == self.expected_data:
6768
self.received += 1
@@ -70,8 +71,9 @@ def on_message(self, mqttc: Any, userdata: Any, msg: mqtt.MQTTMessage) -> None:
7071
else:
7172
differences = len(list(filter(lambda data: data[0] != data[1], zip(payload, self.expected_data))))
7273
logging.error(
73-
f'Payload differ in {differences} positions from expected data. received size: {len(payload)} expected size:'
74-
f'{len(self.expected_data)}'
74+
f'Payload on topic "{msg.topic}" (QoS {msg.qos}) differs in {differences} positions '
75+
'from expected data. '
76+
f'Received size: {len(payload)}, expected size: {len(self.expected_data)}.'
7577
)
7678
logging.info(f'Repetitions: {payload.count(self.config["pattern"])}')
7779
logging.info(f'Pattern: {self.config["pattern"]}')
@@ -85,6 +87,7 @@ def __enter__(self) -> Any:
8587
qos = self.config['qos']
8688
broker_host = self.config['broker_host_' + self.config['transport']]
8789
broker_port = self.config['broker_port_' + self.config['transport']]
90+
connect_timeout_seconds = self.config.get('client_connect_timeout', 30)
8891

8992
try:
9093
self.print_details('Connecting...')
@@ -93,14 +96,17 @@ def __enter__(self) -> Any:
9396
self.tls_insecure_set(True)
9497
self.event_client_connected.clear()
9598
self.loop_start()
96-
self.connect(broker_host, broker_port, 60)
99+
self.connect(broker_host, broker_port, 60) # paho's keepalive
97100
except Exception:
98101
self.print_details(f'ENV_TEST_FAILURE: Unexpected error while connecting to broker {broker_host}')
99102
raise
100103
self.print_details(f'Connecting py-client to broker {broker_host}:{broker_port}...')
101104

102-
if not self.event_client_connected.wait(timeout=30):
103-
raise ValueError(f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host}')
105+
if not self.event_client_connected.wait(timeout=connect_timeout_seconds):
106+
raise ValueError(
107+
f'ENV_TEST_FAILURE: Test script cannot connect to broker: {broker_host} '
108+
f'within {connect_timeout_seconds}s'
109+
)
104110
self.event_client_got_all.clear()
105111
result, self.subscribe_mid = self.subscribe(self.config['subscribe_topic'], qos)
106112
assert result == 0
@@ -148,7 +154,11 @@ def get_config_from_dut(dut, config_option):
148154
publish_cfg['pattern'] = ''.join(
149155
random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) for _ in range(DEFAULT_MSG_SIZE)
150156
)
157+
publish_cfg['client_connect_timeout'] = 30
158+
publish_cfg['dut_subscribe_timeout'] = 60
159+
publish_cfg['publish_ack_timeout'] = 60
151160
publish_cfg['test_timeout'] = get_timeout(test_case)
161+
152162
unique_topic = ''.join(
153163
random.choice(string.ascii_uppercase + string.ascii_lowercase) for _ in range(DEFAULT_MSG_SIZE)
154164
)
@@ -159,9 +169,10 @@ def get_config_from_dut(dut, config_option):
159169

160170

161171
@contextlib.contextmanager
162-
def connected_and_subscribed(dut: Dut) -> Any:
172+
def connected_and_subscribed(dut: Dut, config: Dict[str, Any]) -> Any:
163173
dut.write('start')
164-
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=60)
174+
dut_subscribe_timeout = config.get('dut_subscribe_timeout', 60)
175+
dut.expect(re.compile(rb'MQTT_EVENT_SUBSCRIBED'), timeout=dut_subscribe_timeout)
165176
yield
166177
dut.write('stop')
167178

@@ -177,6 +188,7 @@ def get_scenarios() -> List[Dict[str, int]]:
177188
continue
178189
break
179190
if not scenarios: # No message sizes present in the env - set defaults
191+
logging.info('Using predefined cases')
180192
scenarios = [
181193
{'msg_len': 0, 'nr_of_msgs': 5}, # zero-sized messages
182194
{'msg_len': 2, 'nr_of_msgs': 5}, # short messages
@@ -201,13 +213,15 @@ def run_publish_test_case(dut: Dut, config: Any) -> None:
201213
logging.info(
202214
f'Starting Publish test: transport:{config["transport"]}, qos:{config["qos"]},'
203215
f'nr_of_msgs:{config["scenario"]["nr_of_msgs"]},'
204-
f' msg_size:{config["scenario"]["msg_len"] * DEFAULT_MSG_SIZE}, enqueue:{config["enqueue"]}'
216+
f' msg_size:{config["scenario"]["msg_len"]}, enqueue:{config["enqueue"]}'
205217
)
206218
dut.write(
207-
f'publish_setup {config["transport"]} {config["publish_topic"]} {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}'
219+
f'publish_setup {config["transport"]} {config["publish_topic"]}'
220+
f' {config["subscribe_topic"]} {config["pattern"]} {config["scenario"]["msg_len"]}'
208221
)
209-
with MqttPublisher(config) as publisher, connected_and_subscribed(dut):
210-
assert publisher.event_client_subscribed.wait(timeout=config['test_timeout']), 'Runner failed to subscribe'
222+
with MqttPublisher(config) as publisher, connected_and_subscribed(dut, config):
223+
py_client_subscribe_timeout = config.get('py_client_subscribe_timeout', config['test_timeout'])
224+
assert publisher.event_client_subscribed.wait(timeout=py_client_subscribe_timeout), 'Runner failed to subscribe'
211225
msgs_published: List[mqtt.MQTTMessageInfo] = []
212226
dut.write(f'publish {config["scenario"]["nr_of_msgs"]} {config["qos"]} {config["enqueue"]}')
213227
assert publisher.event_client_got_all.wait(timeout=config['test_timeout']), (
@@ -222,11 +236,33 @@ def run_publish_test_case(dut: Dut, config: Any) -> None:
222236
msg = publisher.publish(topic=config['publish_topic'], payload=payload, qos=config['qos'])
223237
if config['qos'] > 0:
224238
msgs_published.append(msg)
225-
logging.info(f'Published: {len(msgs_published)}')
226-
while msgs_published:
227-
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
228-
229-
logging.info('All messages from runner published')
239+
logging.info(f'Published: {len(msgs_published)} messages from script with QoS > 0 needing ACK.')
240+
241+
if msgs_published:
242+
publish_ack_timeout_seconds = config.get('publish_ack_timeout', 60) # Default 60s, make configurable
243+
ack_wait_start_time = time.time()
244+
initial_unacked_count = len(msgs_published)
245+
logging.info(f'Waiting {initial_unacked_count} publish ack with timeout {publish_ack_timeout_seconds}s...')
246+
247+
while msgs_published:
248+
if time.time() - ack_wait_start_time > publish_ack_timeout_seconds:
249+
unacked_mids = [msg.mid for msg in msgs_published if msg.mid is not None and not msg.is_published()]
250+
logging.error(
251+
f'Timeout waiting for publish acknowledgements. '
252+
f'{len(unacked_mids)} of {initial_unacked_count} messages remain unacknowledged. '
253+
f'Unacked MIDs: {unacked_mids}'
254+
)
255+
# This will likely cause the test to fail at a later assertion,
256+
# or you could raise an explicit error here.
257+
# e.g. raise Exception('Timeout waiting for publish acknowledgements')
258+
break
259+
msgs_published = [msg for msg in msgs_published if not msg.is_published()]
260+
if msgs_published: # Avoid busy-looping if list is not empty
261+
time.sleep(0.1) # Brief pause
262+
if not msgs_published:
263+
logging.info('All script-published QoS > 0 messages acknowledged by broker.')
264+
265+
logging.info('All messages from runner published (or timed out waiting for ACK).')
230266

231267
try:
232268
dut.expect(re.compile(rb'Correct pattern received exactly x times'), timeout=config['test_timeout'])
@@ -262,6 +298,7 @@ def make_cases(transport: Any, scenarios: List[Dict[str, int]]) -> List[Tuple[st
262298
@pytest.mark.parametrize('test_case', test_cases)
263299
@pytest.mark.parametrize('config', ['default'], indirect=True)
264300
@idf_parametrize('target', ['esp32'], indirect=['target'])
301+
@pytest.mark.flaky(reruns=1, reruns_delay=1)
265302
def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
266303
publish_cfg = get_configurations(dut, test_case)
267304
dut.expect(re.compile(rb'mqtt>'), timeout=30)
@@ -273,6 +310,7 @@ def test_mqtt_publish(dut: Dut, test_case: Any) -> None:
273310
@pytest.mark.nightly_run
274311
@pytest.mark.parametrize('test_case', stress_test_cases)
275312
@pytest.mark.parametrize('config', ['default'], indirect=True)
313+
@pytest.mark.flaky(reruns=1, reruns_delay=1)
276314
@idf_parametrize('target', ['esp32'], indirect=['target'])
277315
def test_mqtt_publish_stress(dut: Dut, test_case: Any) -> None:
278316
publish_cfg = get_configurations(dut, test_case)

0 commit comments

Comments
 (0)