Skip to content

Commit 74ce0bd

Browse files
author
Thong Phan
committed
samples: tflite-micro: micro_speech: add OpenAMP transport (Linux/Zephyr)
This commit adds OpenAMP Remote Processor Messaging (RPMsg) support to enable communication between Linux userspace and the Zephyr micro_speech application running on the remote core. Implementation details: - Endpoint name: "audio_pcm" - Buffer flow: hold/release mechanism for efficient memory management - Queue depth: 16 messages with 4-byte alignment (K_MSGQ_DEFINE) - Notify path: mailbox/IPM for inter-processor interrupts - Linux userspace maps to this endpoint via /dev/ttyRPMSG* device Signed-off-by: Thong Phan <[email protected]>
1 parent 7b828d7 commit 74ce0bd

File tree

5 files changed

+362
-0
lines changed

5 files changed

+362
-0
lines changed

samples/modules/tflite-micro/micro_speech/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ file(
1717
app_sources
1818
src/*
1919
src/inference/*
20+
src/transport/*
2021
)
2122

2223
target_include_directories(

samples/modules/tflite-micro/micro_speech/src/inference/model_runner.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ LOG_MODULE_REGISTER(model_runner);
2929
#include "audio_preprocessor_int8_model.hpp"
3030
#include "micro_speech_quantized_model.hpp"
3131

32+
#include "transport/rpmsg_transport.h"
33+
3234
#include <zephyr/kernel.h>
3335
#include <algorithm>
3436
#include <stdio.h>

samples/modules/tflite-micro/micro_speech/src/main_functions.cpp

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
LOG_MODULE_REGISTER(micro_speech_openamp);
2323

2424
#include "inference/model_runner.hpp"
25+
#include "transport/rpmsg_transport.h"
2526

2627
#include <zephyr/kernel.h>
2728
#include <zephyr/device.h>
@@ -40,20 +41,93 @@ LOG_MODULE_REGISTER(micro_speech_openamp);
4041
#define BUFFER_SIZE_SAMPLES (SAMPLES_PER_SECOND)
4142
#define BUFFER_SIZE_BYTES (BUFFER_SIZE_SAMPLES * SAMPLE_SIZE_BYTES)
4243
/* Double buffering for receiving and processing */
44+
static int16_t buffer_a[BUFFER_SIZE_SAMPLES];
4345
static int16_t buffer_b[BUFFER_SIZE_SAMPLES];
46+
static int16_t *write_buffer = buffer_a;
4447
static int16_t *processing_buffer = buffer_b;
4548
/* Threading & Synchronization */
4649
K_THREAD_STACK_DEFINE(thread_receive_stack, APP_RECEIVE_TASK_STACK_SIZE);
4750
K_THREAD_STACK_DEFINE(thread_audio_processing_stack, APP_AUDIO_PROCESSING_TASK_STACK_SIZE);
51+
static struct k_thread thread_receive_data;
4852
static struct k_thread thread_audio_processing_data;
4953
static K_SEM_DEFINE(processing_buffer_ready_sem, 0, 1);
5054
static K_SEM_DEFINE(ml_complete_sem, 1, 1); /* Starts at 1 to allow the first file */
5155
static K_MUTEX_DEFINE(buffer_access_mutex);
5256
/* Shared State (protected by buffer_access_mutex) */
57+
static uint16_t g_samples_in_write_buffer = 0;
5358
static uint16_t g_samples_in_processing_buffer = 0;
59+
static bool g_eof_received = false;
5460

5561
extern "C" {
5662

63+
void app_receive_data_thread(void *arg1, void *arg2, void *arg3)
64+
{
65+
ARG_UNUSED(arg1);
66+
ARG_UNUSED(arg2);
67+
ARG_UNUSED(arg3);
68+
69+
int ret;
70+
71+
LOG_INF("Receiving data thread started");
72+
73+
k_sem_take(&data_tty_ready_sem, K_FOREVER);
74+
ret = rpmsg_create_ept(&tty_ept, rpdev, "audio_pcm", RPMSG_ADDR_ANY, RPMSG_ADDR_ANY, rpmsg_recv_tty_callback, NULL);
75+
if (ret != 0) {
76+
LOG_ERR("Could not create RPMsg endpoint");
77+
return;
78+
}
79+
80+
while (1) {
81+
/* Wait for previous buffer to be fully processed */
82+
k_sem_take(&ml_complete_sem, K_FOREVER);
83+
84+
while(1) {
85+
struct rpmsg_rcv_msg rx_msg;
86+
/* Wait for messages */
87+
k_msgq_get(&tty_msgq, &rx_msg, K_FOREVER);
88+
89+
bool should_break = false;
90+
91+
if (rx_msg.len == 3 && memcmp(rx_msg.data, "EOF", 3) == 0) { /* EOF */
92+
k_mutex_lock(&buffer_access_mutex, K_FOREVER);
93+
g_eof_received = true;
94+
/* Swap and process any leftover data */
95+
if (g_samples_in_write_buffer > 0) {
96+
int16_t *temp = write_buffer;
97+
write_buffer = processing_buffer;
98+
processing_buffer = temp;
99+
g_samples_in_processing_buffer = g_samples_in_write_buffer;
100+
g_samples_in_write_buffer = 0;
101+
k_sem_give(&processing_buffer_ready_sem);
102+
} else { /* Still signal to trigger final processing if needed */
103+
k_sem_give(&processing_buffer_ready_sem);
104+
}
105+
k_mutex_unlock(&buffer_access_mutex);
106+
should_break = true;
107+
} else { /* Receive data */
108+
k_mutex_lock(&buffer_access_mutex, K_FOREVER);
109+
size_t bytes_to_copy = MIN(rx_msg.len, (BUFFER_SIZE_SAMPLES - g_samples_in_write_buffer) * SAMPLE_SIZE_BYTES);
110+
memcpy((uint8_t*)write_buffer + g_samples_in_write_buffer * SAMPLE_SIZE_BYTES, rx_msg.data, bytes_to_copy);
111+
g_samples_in_write_buffer += bytes_to_copy / SAMPLE_SIZE_BYTES;
112+
/* Buffer is full. Swap buffer */
113+
if (g_samples_in_write_buffer >= BUFFER_SIZE_SAMPLES) {
114+
int16_t *temp = write_buffer;
115+
write_buffer = processing_buffer;
116+
processing_buffer = temp;
117+
g_samples_in_processing_buffer = g_samples_in_write_buffer;
118+
g_samples_in_write_buffer = 0;
119+
k_sem_give(&processing_buffer_ready_sem);
120+
}
121+
k_mutex_unlock(&buffer_access_mutex);
122+
}
123+
rpmsg_release_rx_buffer(&tty_ept, rx_msg.data);
124+
if (should_break) {
125+
break; /* Wait for ml_complete_sem */
126+
}
127+
}
128+
}
129+
}
130+
57131
void app_audio_processing_thread(void *arg1, void *arg2, void *arg3)
58132
{
59133
ARG_UNUSED(arg1);
@@ -68,6 +142,7 @@ void app_audio_processing_thread(void *arg1, void *arg2, void *arg3)
68142

69143
k_mutex_lock(&buffer_access_mutex, K_FOREVER);
70144
size_t samples_to_process = g_samples_in_processing_buffer;
145+
bool is_eof = g_eof_received;
71146
g_samples_in_processing_buffer = 0; /* Mark buffer as claimed */
72147
k_mutex_unlock(&buffer_access_mutex);
73148

@@ -81,6 +156,15 @@ void app_audio_processing_thread(void *arg1, void *arg2, void *arg3)
81156
/* Clear the buffer after processing */
82157
memset(processing_buffer, 0, BUFFER_SIZE_BYTES);
83158
}
159+
160+
if (is_eof) { /* EOF processing */
161+
k_mutex_lock(&buffer_access_mutex, K_FOREVER);
162+
g_eof_received = false;
163+
k_mutex_unlock(&buffer_access_mutex);
164+
165+
LOG_DBG("Buffer processing complete. Ready for next buffer.");
166+
k_sem_give(&ml_complete_sem);
167+
}
84168
k_sem_give(&ml_complete_sem);
85169
}
86170
}
@@ -92,6 +176,13 @@ void setup() {
92176
/* Initialize model runner */
93177
model_runner_init();
94178

179+
/* Initialize RPMsg transport */
180+
rpmsg_transport_start();
181+
182+
/* Create data receiving thread (higher priority) */
183+
k_thread_create(&thread_receive_data, thread_receive_stack, APP_RECEIVE_TASK_STACK_SIZE,
184+
app_receive_data_thread, NULL, NULL, NULL, 5, 0, K_NO_WAIT);
185+
95186
/* Create audio processing thread (lower priority) */
96187
k_thread_create(&thread_audio_processing_data, thread_audio_processing_stack, APP_AUDIO_PROCESSING_TASK_STACK_SIZE,
97188
app_audio_processing_thread, NULL, NULL, NULL, 6, 0, K_NO_WAIT);
Lines changed: 229 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,229 @@
1+
/*
2+
* Copyright (c) 2020, STMICROELECTRONICS
3+
* Author: Thong Phan <[email protected]>
4+
*
5+
* Heavily based on pwm_mcux_ftm.c, which is:
6+
* Copyright (c) 2017, NXP
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*/
10+
11+
#include "rpmsg_transport.h"
12+
13+
#include <zephyr/logging/log.h>
14+
LOG_MODULE_REGISTER(rpmsg_transport);
15+
#include <zephyr/device.h>
16+
#include <zephyr/drivers/ipm.h>
17+
18+
#include <metal/sys.h>
19+
#include <metal/io.h>
20+
#include <resource_table.h>
21+
#include <addr_translation.h>
22+
23+
#if !DT_HAS_CHOSEN(zephyr_ipc_shm)
24+
#error "Sample requires definition of shared memory for rpmsg"
25+
#endif
26+
27+
/* Constants from device tree */
28+
#define SHM_NODE DT_CHOSEN(zephyr_ipc_shm)
29+
#define SHM_START_ADDR DT_REG_ADDR(SHM_NODE)
30+
#define SHM_SIZE DT_REG_SIZE(SHM_NODE)
31+
/* Stack size for the management thread */
32+
#define APP_TASK_STACK_SIZE (1024)
33+
34+
#if CONFIG_IPM_MAX_DATA_SIZE > 0
35+
#define IPM_SEND(dev, w, id, d, s) ipm_send(dev, w, id, d, s)
36+
#else
37+
#define IPM_SEND(dev, w, id, d, s) ipm_send(dev, w, id, NULL, 0)
38+
#endif
39+
/* Thread definitions */
40+
K_THREAD_STACK_DEFINE(thread_mng_stack, APP_TASK_STACK_SIZE);
41+
static struct k_thread thread_mng_data;
42+
/* OpenAMP components */
43+
static const struct device *const ipm_handle = DEVICE_DT_GET(DT_CHOSEN(zephyr_ipc));
44+
static metal_phys_addr_t shm_physmap = SHM_START_ADDR;
45+
static metal_phys_addr_t rsc_tab_physmap;
46+
static struct metal_io_region shm_io_data;
47+
static struct metal_io_region rsc_io_data;
48+
static struct metal_io_region *shm_io = &shm_io_data;
49+
static struct metal_io_region *rsc_io = &rsc_io_data;
50+
static struct rpmsg_virtio_device rvdev;
51+
static void *rsc_table;
52+
/* Public variables */
53+
struct rpmsg_device *rpdev;
54+
struct rpmsg_endpoint tty_ept;
55+
K_MSGQ_DEFINE(tty_msgq, sizeof(struct rpmsg_rcv_msg), 16, 4);
56+
/* 16 messages * 20ms per frames covers 320ms of data */
57+
K_SEM_DEFINE(data_tty_ready_sem, 0, 1);
58+
/* Internal semaphore, hidden from other files */
59+
static K_SEM_DEFINE(data_sem, 0, 1);
60+
61+
static void platform_ipm_callback(const struct device *dev, void *context, uint32_t id,
62+
volatile void *data)
63+
64+
{
65+
LOG_DBG("%s: msg received from mb %d", __func__, id);
66+
k_sem_give(&data_sem);
67+
}
68+
69+
int rpmsg_recv_tty_callback(struct rpmsg_endpoint *ept, void *data, size_t len, uint32_t src,
70+
void *priv)
71+
{
72+
struct rpmsg_rcv_msg msg = {.data = data, .len = len};
73+
74+
rpmsg_hold_rx_buffer(ept, data);
75+
76+
if (k_msgq_put(&tty_msgq, &msg, K_NO_WAIT) != 0) {
77+
LOG_WRN("tty_msgq full, dropping frame len %zu\n", len);
78+
rpmsg_release_rx_buffer(ept, data);
79+
}
80+
return RPMSG_SUCCESS;
81+
}
82+
83+
static void receive_message(unsigned char **msg, unsigned int *len)
84+
{
85+
if (k_sem_take(&data_sem, K_FOREVER) == 0) {
86+
rproc_virtio_notified(rvdev.vdev, VRING1_ID);
87+
}
88+
}
89+
90+
static void new_service_cb(struct rpmsg_device *rdev, const char *name, uint32_t src)
91+
{
92+
LOG_ERR("%s: unexpected ns service receive for name %s", __func__, name);
93+
}
94+
95+
static int mailbox_notify(void *priv, uint32_t id)
96+
{
97+
ARG_UNUSED(priv);
98+
LOG_DBG("%s: msg received", __func__);
99+
IPM_SEND(ipm_handle, 0, id, &id, 4);
100+
return 0;
101+
}
102+
103+
static int platform_init(void)
104+
{
105+
int rsc_size;
106+
struct metal_init_params metal_params = METAL_INIT_DEFAULTS;
107+
int status;
108+
109+
status = metal_init(&metal_params);
110+
if (status) {
111+
LOG_ERR("metal_init: failed: %d", status);
112+
return -1;
113+
}
114+
115+
metal_io_init(shm_io, (void *)SHM_START_ADDR, &shm_physmap, SHM_SIZE, -1, 0,
116+
addr_translation_get_ops(shm_physmap));
117+
118+
rsc_table_get(&rsc_table, &rsc_size);
119+
rsc_tab_physmap = (uintptr_t)rsc_table;
120+
metal_io_init(rsc_io, rsc_table, &rsc_tab_physmap, rsc_size, -1, 0, NULL);
121+
122+
if (!device_is_ready(ipm_handle)) {
123+
LOG_ERR("IPM device is not ready");
124+
return -1;
125+
}
126+
127+
ipm_register_callback(ipm_handle, platform_ipm_callback, NULL);
128+
129+
status = ipm_set_enabled(ipm_handle, 1);
130+
if (status) {
131+
LOG_ERR("ipm_set_enabled failed");
132+
return -1;
133+
}
134+
135+
return 0;
136+
}
137+
138+
static void cleanup_system(void)
139+
{
140+
ipm_set_enabled(ipm_handle, 0);
141+
rpmsg_deinit_vdev(&rvdev);
142+
metal_finish();
143+
}
144+
145+
static struct rpmsg_device *platform_create_rpmsg_vdev(rpmsg_ns_bind_cb ns_cb)
146+
{
147+
struct fw_rsc_vdev_vring *vring_rsc;
148+
struct virtio_device *vdev;
149+
int ret;
150+
151+
vdev = rproc_virtio_create_vdev(VIRTIO_DEV_DEVICE, VDEV_ID, rsc_table_to_vdev(rsc_table),
152+
rsc_io, NULL, mailbox_notify, NULL);
153+
if (!vdev) {
154+
LOG_ERR("failed to create vdev");
155+
return NULL;
156+
}
157+
158+
rproc_virtio_wait_remote_ready(vdev);
159+
160+
vring_rsc = rsc_table_get_vring0(rsc_table);
161+
ret = rproc_virtio_init_vring(vdev, 0, vring_rsc->notifyid, (void *)vring_rsc->da, rsc_io,
162+
vring_rsc->num, vring_rsc->align);
163+
if (ret) {
164+
LOG_ERR("failed to init vring 0");
165+
goto failed;
166+
}
167+
168+
vring_rsc = rsc_table_get_vring1(rsc_table);
169+
ret = rproc_virtio_init_vring(vdev, 1, vring_rsc->notifyid, (void *)vring_rsc->da, rsc_io,
170+
vring_rsc->num, vring_rsc->align);
171+
if (ret) {
172+
LOG_ERR("failed to init vring 1");
173+
goto failed;
174+
}
175+
176+
ret = rpmsg_init_vdev(&rvdev, vdev, ns_cb, shm_io, NULL);
177+
if (ret) {
178+
LOG_ERR("failed rpmsg_init_vdev");
179+
goto failed;
180+
}
181+
182+
return rpmsg_virtio_get_rpmsg_device(&rvdev);
183+
184+
failed:
185+
rproc_virtio_remove_vdev(vdev);
186+
return NULL;
187+
}
188+
189+
static void rpmsg_mng_task(void *arg1, void *arg2, void *arg3)
190+
{
191+
ARG_UNUSED(arg1);
192+
ARG_UNUSED(arg2);
193+
ARG_UNUSED(arg3);
194+
195+
unsigned char *msg;
196+
unsigned int len;
197+
int ret = 0;
198+
199+
LOG_INF("Microspeech sample with OpenAMP started");
200+
201+
ret = platform_init();
202+
if (ret) {
203+
LOG_ERR("Failed to initialize platform");
204+
goto task_end;
205+
}
206+
207+
rpdev = platform_create_rpmsg_vdev(new_service_cb);
208+
if (!rpdev) {
209+
LOG_ERR("Failed to create rpmsg virtio device");
210+
goto task_end;
211+
}
212+
213+
k_sem_give(&data_tty_ready_sem);
214+
215+
while (1) {
216+
receive_message(&msg, &len);
217+
}
218+
219+
task_end:
220+
cleanup_system();
221+
LOG_INF("Microspeech sample with OpenAMP ended");
222+
}
223+
224+
/* Public Function */
225+
void rpmsg_transport_start(void)
226+
{
227+
k_thread_create(&thread_mng_data, thread_mng_stack, APP_TASK_STACK_SIZE, rpmsg_mng_task,
228+
NULL, NULL, NULL, K_PRIO_COOP(8), 0, K_NO_WAIT);
229+
}

0 commit comments

Comments
 (0)