Skip to content

Commit e046335

Browse files
committed
mqueue: add support for fetching bulk items
1 parent 326b2b3 commit e046335

File tree

4 files changed

+75
-16
lines changed

4 files changed

+75
-16
lines changed

modules/mqueue/doc/mqueue_admin.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,16 @@ opensips-cli -x mq_get_size xyz
277277
</section>
278278
<section id="mi_mq_fetch" xreflabel="mq_fetch">
279279
<title>mq_fetch</title>
280-
<para>Fetch a key-value pair from a memory queue.</para>
280+
<para>Fetch one (or up to limit) key-value pair from a memory queue.</para>
281281
<para>Parameters:</para>
282282
<itemizedlist>
283283
<listitem>
284284
<emphasis>name</emphasis> - the name of memory queue
285285
</listitem>
286+
<listitem>
287+
<emphasis>limit</emphasis> (optional) - if used, an array
288+
with up to <emphasis>limit</emphasis> records are being returned.
289+
</listitem>
286290
</itemizedlist>
287291
<example>
288292
<title><function>mq_fetch</function> usage</title>

modules/mqueue/mqueue_api.c

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,24 @@ mq_pv_t *mq_pv_get(str *name)
225225
return NULL;
226226
}
227227

228+
/**
229+
*
230+
*/
231+
mq_item_t *mq_head_fetch_item(mq_head_t *mh)
232+
{
233+
mq_item_t *ret;
234+
235+
if(mh->ifirst == NULL)
236+
return NULL;
237+
238+
ret = mh->ifirst;
239+
mh->ifirst = mh->ifirst->next;
240+
if(mh->ifirst == NULL)
241+
mh->ilast = NULL;
242+
mh->csize--;
243+
return ret;
244+
}
245+
228246
/**
229247
*
230248
*/
@@ -245,21 +263,10 @@ int mq_head_fetch(str *name)
245263
return -1;
246264
lock_get(&mh->lock);
247265

248-
if(mh->ifirst == NULL) {
249-
/* empty queue */
250-
lock_release(&mh->lock);
251-
return -2;
252-
}
253-
254-
mp->item = mh->ifirst;
255-
mh->ifirst = mh->ifirst->next;
256-
if(mh->ifirst == NULL) {
257-
mh->ilast = NULL;
258-
}
259-
mh->csize--;
266+
mp->item = mq_head_fetch_item(mh);
260267

261268
lock_release(&mh->lock);
262-
return 0;
269+
return (mp->item?0:-2);
263270
}
264271

265272
/**

modules/mqueue/mqueue_api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ int mq_head_defined(void);
7373
void mq_destroy(void);
7474
int mq_head_add(str *name, int msize, int addmode);
7575
int mq_head_fetch(str *name);
76+
mq_item_t *mq_head_fetch_item(mq_head_t *mh);
7677
void mq_pv_free(str *name);
7778
int mq_item_add(str *qname, str *key, str *val);
7879
mq_head_t *mq_head_get(str *name);

modules/mqueue/mqueue_mod.c

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ mi_response_t *mi_get_size(const mi_params_t *params,
5454
struct mi_handler *async_hdl);
5555
mi_response_t *mi_fetch(const mi_params_t *params,
5656
struct mi_handler *async_hdl);
57+
mi_response_t *mi_fetch_bulk(const mi_params_t *params,
58+
struct mi_handler *async_hdl);
5759

5860
static pv_export_t mod_pvs[] = {
5961
{ {"mqk", sizeof("mqk") - 1}, 1090, pv_get_mqk, 0,
@@ -97,8 +99,8 @@ static const stat_export_t mod_stats[] = {
9799
};
98100

99101
#define MQH1 "Params: none ; Get the size of all memory queues."
100-
#define MQH2 "Params: [mqueue] ; Get the size of a memory queue."
101-
#define MQH3 "Params: [mqueue] ; Fetch a key-value pair from a memory queue."
102+
#define MQH2 "Params: mqueue; Get the size of a memory queue."
103+
#define MQH3 "Params: mqueue [limit]; Fetch one (or a max limit of) key-value pair from a memory queue."
102104

103105
static const mi_export_t mi_cmds[] = {
104106
{"mq_get_sizes", MQH1, 0, 0, {
@@ -111,6 +113,7 @@ static const mi_export_t mi_cmds[] = {
111113
},
112114
{"mq_fetch", MQH3, 0, 0, {
113115
{mi_fetch, {"name", 0}},
116+
{mi_fetch_bulk,{"name", "limit", 0}},
114117
{EMPTY_MI_RECIPE}}
115118
},
116119
{EMPTY_MI_EXPORT}
@@ -422,3 +425,47 @@ mi_response_t *mi_fetch(const mi_params_t *params,
422425
return NULL;
423426
}
424427

428+
mi_response_t *mi_fetch_bulk(const mi_params_t *params,
429+
struct mi_handler *async_hdl)
430+
{
431+
mi_response_t *resp;
432+
mi_item_t *resp_obj, *mq_item;
433+
str mqueue_name;
434+
int limit;
435+
mq_head_t *mh;
436+
mq_item_t *item = NULL;
437+
438+
if (get_mi_string_param(params, "name", &mqueue_name.s, &mqueue_name.len) < 0)
439+
return init_mi_param_error();
440+
441+
if (get_mi_int_param(params, "limit", &limit) < 0 || limit < 1)
442+
return init_mi_param_error();
443+
444+
mh = mq_head_get(&mqueue_name);
445+
if (!mh)
446+
return init_mi_error(404, MI_SSTR("No such queue"));
447+
448+
resp = init_mi_result_array(&resp_obj);
449+
if (!resp)
450+
return NULL;
451+
452+
lock_get(&mh->lock);
453+
do {
454+
item = mq_head_fetch_item(mh);
455+
if (!item)
456+
break;
457+
mq_item = add_mi_object(resp_obj, NULL, 0);
458+
if (add_mi_string_fmt(mq_item, MI_SSTR("key"), item->key.s, item->key.len) < 0)
459+
break;
460+
if (add_mi_string_fmt(mq_item, MI_SSTR("value"), item->val.s, item->val.len) < 0)
461+
break;
462+
shm_free(item);
463+
item = 0;
464+
} while (--limit > 0);
465+
lock_release(&mh->lock);
466+
467+
if (item)
468+
shm_free(item);
469+
470+
return resp;
471+
}

0 commit comments

Comments
 (0)