Skip to content

Commit 3d0959a

Browse files
committed
rtpengine: provide failover on errors returned from engine
Feature sponsored by Five9 https://www.five9.com
1 parent e2b8eef commit 3d0959a

File tree

2 files changed

+207
-26
lines changed

2 files changed

+207
-26
lines changed

modules/rtpengine/doc/rtpengine_admin.xml

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,21 @@
6363
If the set was selected using setid_avp, the avp needs to be
6464
set only once before rtpengine_offer() or rtpengine_manage() call.
6565
</para>
66+
<para id="param_failover" xreflabel="Failover">
67+
The module is able to failover to a new node within a set, if a chosen
68+
one has communication issues. Moreover, it will also failover if the node
69+
returns one of the following errors:
70+
<itemizedlist>
71+
<listitem><para>
72+
Parallel session limit reached
73+
</para></listitem>
74+
<listitem><para>
75+
Ran out of ports
76+
</para></listitem>
77+
</itemizedlist>
78+
You can use the <xref linkend="param_extra_failover_error"/> parameter
79+
to extend the above list.
80+
</para>
6681
</section>
6782

6883
<section id="dependencies" xreflabel="Dependencies">
@@ -805,6 +820,36 @@ rtpengine_offer("... codec-mask-PCMA codec-strip-opus transcode-opus ...");
805820
</programlisting>
806821
</example>
807822

823+
<section id="param_extra_failover_error" xreflabel="extra_failover_error">
824+
<title><varname>extra_failover_error</varname> (string)</title>
825+
<para>
826+
Contains a (XDB) regular expression that can be
827+
used to match an error received from a RTPEngine node. If matched
828+
the module tries to use a new node to handle the affected command.
829+
</para>
830+
<para>
831+
This parameter can be used to extend the list
832+
(see <xref linkend="para_failover"/> of errors the module
833+
implicitely fails over.
834+
</para>
835+
<para>
836+
<emphasis>Note</emphasis> each declaration will define a single
837+
expression/matching rule. If you want to define multiple rules, you
838+
need to define the parameter multiple times.
839+
</para>
840+
<para>
841+
Default value is empty, no extra errors are being used.
842+
</para>
843+
<example>
844+
<title>Set <varname>extra_failover_error</varname> parameter</title>
845+
<programlisting format="linespecific">
846+
...
847+
modparam("rtpengine", "extra_failover_error", "Parallel session limit reached")
848+
...
849+
</programlisting>
850+
</example>
851+
</section>
852+
808853
</section>
809854
<section id="func_rtpengine_answer" xreflabel="rtpengine_answer()">
810855
<title>

modules/rtpengine/rtpengine.c

Lines changed: 162 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
#include <stdlib.h>
4747
#include <string.h>
4848
#include <unistd.h>
49+
#include <regex.h>
4950

5051
#include "../../str.h"
5152
#include "../../flags.h"
@@ -221,6 +222,11 @@ typedef struct rtpe_set_link {
221222
} v;
222223
} rtpe_set_link_t;
223224

225+
struct rtpe_ignore_node {
226+
struct rtpe_node *node;
227+
struct rtpe_ignore_node *next;
228+
};
229+
224230
static const char *command_strings[] = {
225231
[OP_OFFER] = "offer",
226232
[OP_ANSWER] = "answer",
@@ -321,7 +327,7 @@ static int fixup_set_id(void ** param);
321327
static int fixup_free_set_id(void ** param);
322328
static int set_rtpengine_set_f(struct sip_msg * msg, rtpe_set_link_t *set_param);
323329
static struct rtpe_set * select_rtpe_set(int id_set);
324-
static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *);
330+
static struct rtpe_node *select_rtpe_node(str, struct rtpe_set *, struct rtpe_ignore_node *);
325331
static struct rtpe_node *lookup_rtpe_node(struct rtpe_set * rtpe_list, str *rtpe_url);
326332
static void free_rtpe_set(int);
327333
static void free_rtpe_node(struct rtpe_set *, str *);
@@ -335,6 +341,7 @@ static int update_rtpengines(int);
335341
static int _add_rtpengine_from_database(void);
336342
static int rtpengine_set_store(modparam_t type, void * val);
337343
static int rtpengine_set_notify(modparam_t type, void * val);
344+
static int rtpengine_extra_failover(modparam_t type, void * val);
338345
static int rtpengine_add_rtpengine_set( char * rtp_proxies, int set_id);
339346

340347
static int mod_init(void);
@@ -699,6 +706,8 @@ static const param_export_t params[] = {
699706
{"db_table", STR_PARAM, &db_table.s },
700707
{"socket_column", STR_PARAM, &db_rtpe_sock_col.s },
701708
{"set_column", STR_PARAM, &db_rtpe_set_col.s },
709+
{"extra_failover_error", STR_PARAM|USE_FUNC_PARAM,
710+
(void *)rtpengine_extra_failover},
702711
{"notification_sock", STR_PARAM|USE_FUNC_PARAM,
703712
(void *)rtpengine_set_notify},
704713
{"ping_enabled", INT_PARAM, &rtpengine_ping_enabled },
@@ -770,6 +779,11 @@ struct module_exports exports = {
770779
0 /* reload confirm function */
771780
};
772781

782+
static char *rtpe_default_failover_errors[] = {
783+
"Parallel session limit reached",
784+
"Ran out of ports",
785+
};
786+
773787
static void rtpe_stats_free(struct rtpe_stats *stats)
774788
{
775789
if (stats->json.s)
@@ -968,7 +982,6 @@ static int rtpengine_set_notify(modparam_t type, void * val)
968982
return 0;
969983
}
970984

971-
972985
static int add_rtpengine_socks(struct rtpe_set * rtpe_list,
973986
char * rtpengine){
974987
/* Make rtp proxies list. */
@@ -2469,6 +2482,97 @@ static struct rtpe_node *get_rtpe_node(str *node, struct rtpe_set *set)
24692482
return NULL;
24702483
}
24712484

2485+
static int rtpe_add_ignore_node(struct rtpe_ignore_node **list, struct rtpe_node *node)
2486+
{
2487+
struct rtpe_ignore_node *new = pkg_malloc(sizeof *new);
2488+
if (!new)
2489+
return 0;
2490+
2491+
new->node = node;
2492+
new->next = *list;
2493+
*list = new;
2494+
LM_INFO("temporary ignoring %.*s node for this attempt\n", node->rn_url.len, node->rn_url.s);
2495+
return 1;
2496+
}
2497+
2498+
static int rtpe_is_ignore_node(struct rtpe_ignore_node *list, struct rtpe_node *node)
2499+
{
2500+
struct rtpe_ignore_node *it;
2501+
for (it = list; it; it = it->next)
2502+
if (it->node == node)
2503+
return 1;
2504+
return 0;
2505+
}
2506+
2507+
static void rtpe_free_ignore_node(struct rtpe_ignore_node *list)
2508+
{
2509+
struct rtpe_ignore_node *next, *it;
2510+
for (it = list; it; it = next) {
2511+
next = it->next;
2512+
pkg_free(it);
2513+
}
2514+
}
2515+
2516+
OSIPS_LIST_HEAD(rtpe_failover_errors);
2517+
struct rtpe_failover_regex {
2518+
regex_t re;
2519+
struct list_head list;
2520+
};
2521+
2522+
static int rtpengine_extra_failover(modparam_t type, void * val)
2523+
{
2524+
char *p;
2525+
struct rtpe_failover_regex *re;
2526+
2527+
p = (char* )val;
2528+
2529+
if(p==0 || *p=='\0')
2530+
return 0;
2531+
re = pkg_malloc(sizeof(*re));
2532+
if (!re) {
2533+
LM_ERR("no more memory for regular expression!\n");
2534+
return -1;
2535+
}
2536+
memset(re, 0, sizeof *re);
2537+
if (regcomp(&re->re, p, (REG_EXTENDED|REG_ICASE|REG_NEWLINE))) {
2538+
LM_ERR("could not compile regex [%s]\n", p);
2539+
pkg_free(re);
2540+
return -1;
2541+
}
2542+
list_add(&re->list, &rtpe_failover_errors);
2543+
return 0;
2544+
}
2545+
2546+
static int rtpe_check_ignore_node(str *error)
2547+
{
2548+
int ret;
2549+
str error_nt;
2550+
regmatch_t pmatch;
2551+
struct list_head *it;
2552+
struct rtpe_failover_regex *re;
2553+
2554+
int i, size = sizeof(rtpe_default_failover_errors)/sizeof(rtpe_default_failover_errors[0]);
2555+
for (i = 0; i < size; i++) {
2556+
if (str_casematch_nt(error, rtpe_default_failover_errors[i]))
2557+
return 1;
2558+
}
2559+
if (list_empty(&rtpe_failover_errors))
2560+
return 0;
2561+
if (pkg_nt_str_dup(&error_nt, error) < 0) {
2562+
LM_ERR("could not duplicate error!\n");
2563+
return 0;
2564+
}
2565+
ret = 1;
2566+
list_for_each(it, &rtpe_failover_errors) {
2567+
re = list_entry(it, struct rtpe_failover_regex, list);
2568+
if (regexec(&re->re, error_nt.s, 1, &pmatch, 0) == 0)
2569+
goto end;
2570+
}
2571+
ret = 0;
2572+
end:
2573+
pkg_free(error_nt.s);
2574+
return ret;
2575+
}
24722576

24732577
static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_msg *msg,
24742578
enum rtpe_operation op, str *flags_str, str *body_in, pv_spec_t *spvar,
@@ -2478,15 +2582,17 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
24782582
bencode_item_t *item, *resp;
24792583
str viabranch, error;
24802584
int ret, flags_exist = 0, callid_exist = 0, from_tag_exist = 0, to_tag_exist = 0;
2481-
struct rtpe_node *node;
2585+
struct rtpe_node *node, *failed_node;
24822586
char *cp, *err = NULL;
24832587
pv_value_t val;
24842588
str flags_nt = {0,0};
2589+
struct rtpe_ignore_node *ignore_list = NULL;
24852590

24862591
/*** get & init basic stuff needed ***/
24872592

24882593
memset(&ng_flags, 0, sizeof(ng_flags));
24892594
error.len = 0;
2595+
error.s = "";
24902596

24912597
if (!extra_dict) {
24922598
if (bencode_buffer_init(bencbuf)) {
@@ -2658,24 +2764,69 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
26582764
if (!set && (set=rtpe_ctx_set_get())==NULL )
26592765
set = *default_rtpe_set;
26602766

2767+
failed_node = NULL;
2768+
26612769
RTPE_START_READ();
26622770
do {
26632771
if (snode && snode->s) {
26642772
if ((node = get_rtpe_node(snode, set)) == NULL && op == OP_OFFER)
2665-
node = select_rtpe_node(ng_flags.call_id, set);
2773+
node = select_rtpe_node(ng_flags.call_id, set, ignore_list);
26662774
snode = NULL;
26672775
} else {
2668-
node = select_rtpe_node(ng_flags.call_id, set);
2776+
node = select_rtpe_node(ng_flags.call_id, set, ignore_list);
26692777
}
26702778
if (!node) {
2671-
err = "no available proxies";
2779+
if (!err && !error.len)
2780+
err = "no available proxies";
26722781
RTPE_STOP_READ();
26732782
goto error;
26742783
}
26752784

26762785
cp = send_rtpe_command(node, ng_flags.dict, &ret);
2786+
if (cp) {
2787+
/*** process reply ***/
2788+
resp = bencode_decode_expect(bencbuf, cp, ret, BENCODE_DICTIONARY);
2789+
if (resp) {
2790+
if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
2791+
if (!bencode_dictionary_get_str(resp, "error-reason", &error)) {
2792+
LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
2793+
error.s = "";
2794+
error.len = 0;
2795+
} else {
2796+
LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
2797+
}
2798+
if (rtpe_check_ignore_node(&error)) {
2799+
cp = NULL;
2800+
if (!rtpe_add_ignore_node(&ignore_list, node))
2801+
LM_ERR("could not add node to ignore list!\n");
2802+
else
2803+
continue; /* one more loop */
2804+
} else {
2805+
break; /* break the loop and exit with error */
2806+
}
2807+
}
2808+
} else {
2809+
LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
2810+
err = "failed to decode bencoded reply";
2811+
cp = NULL;
2812+
}
2813+
}
2814+
if (!cp) {
2815+
if (node == failed_node) {
2816+
/* this is the 2nd error this server generated at this round,
2817+
* so we should ignore it and try another one */
2818+
if (!rtpe_add_ignore_node(&ignore_list, node)) {
2819+
LM_ERR("could not add node to ignore list!\n");
2820+
goto error;
2821+
}
2822+
failed_node = NULL;
2823+
} else {
2824+
failed_node = node;
2825+
}
2826+
}
26772827
} while (cp == NULL);
26782828
RTPE_STOP_READ();
2829+
rtpe_free_ignore_node(ignore_list);
26792830
LM_DBG("proxy reply: %.*s\n", ret, cp);
26802831

26812832
/* store the value of the selected node */
@@ -2687,28 +2838,13 @@ static bencode_item_t *rtpe_function_call(bencode_buffer_t *bencbuf, struct sip_
26872838
LM_ERR("setting rtpengine pvar failed\n");
26882839
}
26892840

2690-
/*** process reply ***/
2691-
2692-
resp = bencode_decode_expect(bencbuf, cp, ret, BENCODE_DICTIONARY);
2693-
if (!resp) {
2694-
LM_ERR("failed to decode bencoded reply from proxy: %.*s\n", ret, cp);
2695-
err = "failed to decode bencoded reply";
2696-
goto error;
2697-
}
2698-
if (!bencode_dictionary_get_strcmp(resp, "result", "error")) {
2699-
if (!bencode_dictionary_get_str(resp, "error-reason", &error))
2700-
LM_ERR("proxy return error but didn't give an error reason: %.*s\n", ret, cp);
2701-
else
2702-
LM_ERR("proxy replied with error: %.*s\n", error.len, error.s);
2703-
goto error;
2704-
}
2705-
27062841
if (flags_nt.s)
27072842
pkg_free(flags_nt.s);
27082843

27092844
return resp;
27102845

27112846
error:
2847+
rtpe_free_ignore_node(ignore_list);
27122848
if (flags_nt.s)
27132849
pkg_free(flags_nt.s);
27142850
if (err) {
@@ -3063,7 +3199,7 @@ static struct rtpe_set * select_rtpe_set(int id_set )
30633199
* too expensive here.
30643200
*/
30653201
static struct rtpe_node *
3066-
select_rtpe_node(str callid, struct rtpe_set *set)
3202+
select_rtpe_node(str callid, struct rtpe_set *set, struct rtpe_ignore_node *ignore_list)
30673203
{
30683204
unsigned sum, weight_sum;
30693205
struct rtpe_node* node;
@@ -3083,7 +3219,7 @@ select_rtpe_node(str callid, struct rtpe_set *set)
30833219
/* Most popular case: 1 proxy, nothing to calculate */
30843220
if (set->rtpe_node_count == 1) {
30853221
node = set->rn_first;
3086-
if (node->rn_disabled)
3222+
if (node->rn_disabled || rtpe_is_ignore_node(ignore_list, node))
30873223
return NULL;
30883224
return node;
30893225
}
@@ -3099,7 +3235,7 @@ select_rtpe_node(str callid, struct rtpe_set *set)
30993235
found = 0;
31003236
for (node=set->rn_first; node!=NULL; node=node->rn_next) {
31013237
constant_weight_sum += node->rn_weight;
3102-
if (!node->rn_disabled) {
3238+
if (!node->rn_disabled && !rtpe_is_ignore_node(ignore_list, node)) {
31033239
weight_sum += node->rn_weight;
31043240
found = 1;
31053241
}
@@ -3115,7 +3251,7 @@ select_rtpe_node(str callid, struct rtpe_set *set)
31153251
was_forced = 0;
31163252
for (node=set->rn_first; node!=NULL;) {
31173253
if (sumcut < (int)node->rn_weight) {
3118-
if (!node->rn_disabled)
3254+
if (!node->rn_disabled && !rtpe_is_ignore_node(ignore_list, node))
31193255
return node;
31203256
if (was_forced == 0) {
31213257
/* appropriate proxy is disabled : redistribute on enabled ones */

0 commit comments

Comments
 (0)