Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions include/zephyr/sys/p4wq.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ struct k_p4wq_work;
*/
typedef void (*k_p4wq_handler_t)(struct k_p4wq_work *work);

/**
* Optional P4 Queue done callback.
* Will be called after the memory structure is not used anymore by the p4wq.
* If it is not used it must be set to NULL.
*/
typedef void (*k_p4wq_done_handler_t)(struct k_p4wq_work *work);

/**
* @brief P4 Queue Work Item
*
Expand Down Expand Up @@ -74,6 +81,11 @@ struct k_p4wq {

/* K_P4WQ_* flags above */
uint32_t flags;

/* done handler which is called every time after work was successfully executed
* and k_p4wq_work is not needed by p4wq anymore
*/
k_p4wq_done_handler_t done_handler;
};

struct k_p4wq_initparam {
Expand All @@ -83,6 +95,7 @@ struct k_p4wq_initparam {
struct k_thread *threads;
struct z_thread_stack_element *stacks;
uint32_t flags;
k_p4wq_done_handler_t done_handler;
};

/**
Expand All @@ -95,8 +108,9 @@ struct k_p4wq_initparam {
* @param name Symbol name of the struct k_p4wq that will be defined
* @param n_threads Number of threads in the work queue pool
* @param stack_sz Requested stack size of each thread, in bytes
* @param dn_handler Function pointer to handler of type k_p4wq_done_handler_t
*/
#define K_P4WQ_DEFINE(name, n_threads, stack_sz) \
#define K_P4WQ_DEFINE_WITH_DONE_HANDLER(name, n_threads, stack_sz, dn_handler) \
static K_THREAD_STACK_ARRAY_DEFINE(_p4stacks_##name, \
n_threads, stack_sz); \
static struct k_thread _p4threads_##name[n_threads]; \
Expand All @@ -109,8 +123,19 @@ struct k_p4wq_initparam {
.stacks = &(_p4stacks_##name[0][0]), \
.queue = &name, \
.flags = 0, \
.done_handler = dn_handler, \
}

/**
* @brief Statically initialize a P4 Work Queue
*
* Same like K_P4WQ_DEFINE_WITH_DONE_HANDLER but without an
* optional handler which is called everytime when work is executed
* and not used anymore by the p4wq
*/
#define K_P4WQ_DEFINE(name, n_threads, stack_sz) \
K_P4WQ_DEFINE_WITH_DONE_HANDLER(name, n_threads, stack_sz, NULL)

/**
* @brief Statically initialize an array of P4 Work Queues
*
Expand All @@ -122,8 +147,9 @@ struct k_p4wq_initparam {
* @param n_threads Number of threads and work queues
* @param stack_sz Requested stack size of each thread, in bytes
* @param flg Flags
* @param dn_handler Function pointer to handler of type k_p4wq_done_handler_t
*/
#define K_P4WQ_ARRAY_DEFINE(name, n_threads, stack_sz, flg) \
#define K_P4WQ_ARRAY_DEFINE_WITH_DONE_HANDLER(name, n_threads, stack_sz, flg, dn_handler) \
static K_THREAD_STACK_ARRAY_DEFINE(_p4stacks_##name, \
n_threads, stack_sz); \
static struct k_thread _p4threads_##name[n_threads]; \
Expand All @@ -136,8 +162,19 @@ struct k_p4wq_initparam {
.stacks = &(_p4stacks_##name[0][0]), \
.queue = name, \
.flags = K_P4WQ_QUEUE_PER_THREAD | flg, \
.done_handler = dn_handler, \
}

/**
* @brief Statically initialize an array of P4 Work Queues
*
* Same like K_P4WQ_ARRAY_DEFINE_WITH_DONE_HANDLER but without an
* optional handler which is called everytime when work is executed
* and not used anymore by the p4wq
*/
#define K_P4WQ_ARRAY_DEFINE(name, n_threads, stack_sz, flg) \
K_P4WQ_ARRAY_DEFINE_WITH_DONE_HANDLER(name, n_threads, stack_sz, flg, NULL)

/**
* @brief Initialize P4 Queue
*
Expand Down
19 changes: 17 additions & 2 deletions lib/os/p4wq.c
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,14 @@ static FUNC_NORETURN void p4wq_loop(void *p0, void *p1, void *p2)
if (!thread_was_requeued(_current)) {
sys_dlist_remove(&w->dlnode);
w->thread = NULL;
k_sem_give(&w->done_sem);

if (queue->done_handler) {
k_spin_unlock(&queue->lock, k);
queue->done_handler(w);
k = k_spin_lock(&queue->lock);
} else {
k_sem_give(&w->done_sem);
}
}
} else {
z_pend_curr(&queue->lock, k, &queue->waitq, K_FOREVER);
Expand Down Expand Up @@ -152,6 +159,7 @@ static int static_init(void)

if (!i || (pp->flags & K_P4WQ_QUEUE_PER_THREAD)) {
k_p4wq_init(q);
q->done_handler = pp->done_handler;
}

q->flags = pp->flags;
Expand Down Expand Up @@ -299,7 +307,14 @@ bool k_p4wq_cancel(struct k_p4wq *queue, struct k_p4wq_work *item)

if (ret) {
rb_remove(&queue->queue, &item->rbnode);
k_sem_give(&item->done_sem);

if (queue->done_handler) {
k_spin_unlock(&queue->lock, k);
queue->done_handler(item);
k = k_spin_lock(&queue->lock);
} else {
k_sem_give(&item->done_sem);
}
}

k_spin_unlock(&queue->lock, k);
Expand Down
19 changes: 13 additions & 6 deletions subsys/rtio/rtio_workq.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,24 @@
#define RTIO_WORKQ_PRIO_HIGH RTIO_WORKQ_PRIO_MED - 1
#define RTIO_WORKQ_PRIO_LOW RTIO_WORKQ_PRIO_MED + 1

K_P4WQ_DEFINE(rtio_workq,
CONFIG_RTIO_WORKQ_THREADS_POOL,
CONFIG_RTIO_WORKQ_STACK_SIZE);

K_MEM_SLAB_DEFINE_STATIC(rtio_work_items_slab,
sizeof(struct rtio_work_req),
CONFIG_RTIO_WORKQ_POOL_ITEMS,
4);

static void rtio_work_req_done_handler(struct k_p4wq_work *work)
{
struct rtio_work_req *req = CONTAINER_OF(work,
struct rtio_work_req,
work);
k_mem_slab_free(&rtio_work_items_slab, req);
}

K_P4WQ_DEFINE_WITH_DONE_HANDLER(rtio_workq,
CONFIG_RTIO_WORKQ_THREADS_POOL,
CONFIG_RTIO_WORKQ_STACK_SIZE,
rtio_work_req_done_handler);

static void rtio_work_handler(struct k_p4wq_work *work)
{
struct rtio_work_req *req = CONTAINER_OF(work,
Expand All @@ -28,8 +37,6 @@ static void rtio_work_handler(struct k_p4wq_work *work)
struct rtio_iodev_sqe *iodev_sqe = req->iodev_sqe;

req->handler(iodev_sqe);

k_mem_slab_free(&rtio_work_items_slab, req);
}

struct rtio_work_req *rtio_work_req_alloc(void)
Expand Down
Loading