Skip to content

Commit 94173b8

Browse files
committed
clusterer: Enhance cluster_broadcast_req() to include self
... via an optional new parameter (default: false).
1 parent 147072a commit 94173b8

File tree

6 files changed

+54
-17
lines changed

6 files changed

+54
-17
lines changed

modules/clusterer/api.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ enum clusterer_event {
7474

7575
enum cl_node_match_op {
7676
NODE_CMP_ANY,
77+
NODE_CMP_ALL, /* same as ANY, but additionally includes current node */
7778
NODE_CMP_EQ_SIP_ADDR,
7879
NODE_CMP_NEQ_SIP_ADDR
7980
};

modules/clusterer/clusterer.c

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ extern int clusterer_enable_rerouting;
6060

6161
int dispatch_jobs = 1;
6262

63+
void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id);
64+
6365
str cap_sr_details_str[] = {
6466
str_init("not synced"),
6567
str_init("sync pending"),
@@ -392,12 +394,16 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet,
392394
}
393395
lock_release(cl->current_node->lock);
394396

395-
node = get_node_by_id(cl, dst_node_id);
396-
if (!node) {
397-
LM_ERR("Node id [%d] not found in cluster\n", dst_node_id);
398-
if (!locked)
399-
lock_stop_read(cl_list_lock);
400-
return CLUSTERER_SEND_ERR;
397+
if (dst_node_id == cl->current_node->node_id) {
398+
node = cl->current_node;
399+
} else {
400+
node = get_node_by_id(cl, dst_node_id);
401+
if (!node) {
402+
LM_ERR("Node id [%d] not found in cluster\n", dst_node_id);
403+
if (!locked)
404+
lock_stop_read(cl_list_lock);
405+
return CLUSTERER_SEND_ERR;
406+
}
401407
}
402408

403409
lock_get(node->lock);
@@ -422,7 +428,16 @@ enum clusterer_send_ret clusterer_send_msg(bin_packet_t *packet,
422428
}
423429
}
424430

425-
rc = msg_send_retry(packet, node, 0, &ev_actions_required);
431+
if (node == cl->current_node && packet->type == CLUSTERER_GENERIC_MSG) {
432+
bin_remove_int_buffer_end(packet, 1);
433+
bin_push_int(packet, node->node_id);
434+
bin_get_capability(packet, &capability);
435+
packet->front_pointer = capability.s + capability.len + CMD_FIELD_SIZE;
436+
handle_cl_gen_msg(packet, cluster_id, node->node_id);
437+
rc = 0;
438+
} else {
439+
rc = msg_send_retry(packet, node, 0, &ev_actions_required);
440+
}
426441

427442
bin_remove_int_buffer_end(packet, 3);
428443

@@ -510,6 +525,17 @@ clusterer_bcast_msg(bin_packet_t *packet, int dst_cid,
510525
sent = 1;
511526
}
512527

528+
if (match_op == NODE_CMP_ALL && packet->type == CLUSTERER_GENERIC_MSG) {
529+
LM_DBG("broadcasting gen to self (cl: %d, node: %d)\n",
530+
dst_cid, dst_cl->current_node->node_id);
531+
bin_remove_int_buffer_end(packet, 1);
532+
bin_push_int(packet, dst_cl->current_node->node_id);
533+
bin_get_capability(packet, &capability);
534+
packet->front_pointer = capability.s + capability.len + CMD_FIELD_SIZE;
535+
536+
handle_cl_gen_msg(packet, dst_cid, dst_cl->current_node->node_id);
537+
}
538+
513539
bin_remove_int_buffer_end(packet, 3);
514540

515541
if (ev_actions_required)
@@ -615,7 +641,8 @@ enum clusterer_send_ret send_gen_msg(int cluster_id, int dst_id, str *gen_msg,
615641
return rc;
616642
}
617643

618-
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag)
644+
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg,
645+
str *exchg_tag, int all)
619646
{
620647
bin_packet_t packet;
621648
int rc;
@@ -626,7 +653,8 @@ enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_t
626653
return CLUSTERER_SEND_ERR;
627654
}
628655

629-
rc = clusterer_bcast_msg(&packet, cluster_id, NODE_CMP_ANY, 0);
656+
rc = clusterer_bcast_msg(&packet, cluster_id,
657+
all ? NODE_CMP_ALL : NODE_CMP_ANY, 0);
630658

631659
bin_free_packet(&packet);
632660

@@ -940,7 +968,7 @@ static void handle_internal_msg(bin_packet_t *received, int packet_type,
940968
}
941969
}
942970

943-
static void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id)
971+
void handle_cl_gen_msg(bin_packet_t *packet, int cluster_id, int source_id)
944972
{
945973
int req_like;
946974
str rcv_msg, rcv_tag;

modules/clusterer/clusterer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ unsigned long clusterer_get_num_nodes(int state);
191191

192192
enum clusterer_send_ret send_gen_msg(int cluster_id, int node_id, str *gen_msg,
193193
str *exchg_tag, int req_like);
194-
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag);
194+
enum clusterer_send_ret bcast_gen_msg(int cluster_id, str *gen_msg, str *exchg_tag, int all);
195195
enum clusterer_send_ret send_mi_cmd(int cluster_id, int dst_id, str cmd_name,
196196
mi_item_t *cmd_params_arr, int no_params);
197197
enum clusterer_send_ret bcast_remove_node(int cluster_id, int target_node);

modules/clusterer/clusterer_mod.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ static void heartbeats_timer_handler(unsigned int ticks, void *param);
100100
static void heartbeats_utimer_handler(utime_t ticks, void *param);
101101

102102
int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg,
103-
pv_spec_t *param_tag);
103+
pv_spec_t *param_tag, int *all);
104104
int cmd_send_req(struct sip_msg *msg, int *cluster_id, int *node_id,
105105
str *gen_msg, pv_spec_t *param_tag);
106106
int cmd_send_rpl(struct sip_msg *msg, int *cluster_id, int *node_id,
@@ -118,7 +118,8 @@ static const cmd_export_t cmds[] = {
118118
{"cluster_broadcast_req", (cmd_function)cmd_broadcast_req, {
119119
{CMD_PARAM_INT,0,0},
120120
{CMD_PARAM_STR,0,0},
121-
{CMD_PARAM_VAR|CMD_PARAM_OPT,0,0}, {0,0,0}},
121+
{CMD_PARAM_VAR|CMD_PARAM_OPT,0,0},
122+
{CMD_PARAM_INT|CMD_PARAM_OPT,0,0}, {0,0,0}},
122123
REQUEST_ROUTE | FAILURE_ROUTE | ONREPLY_ROUTE | LOCAL_ROUTE | BRANCH_ROUTE | EVENT_ROUTE},
123124
{"cluster_send_req", (cmd_function)cmd_send_req, {
124125
{CMD_PARAM_INT,0,0},
@@ -1267,7 +1268,7 @@ static inline void generate_msg_tag(pv_value_t *tag_val, int cluster_id)
12671268
}
12681269

12691270
int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg,
1270-
pv_spec_t *param_tag)
1271+
pv_spec_t *param_tag, int *all)
12711272
{
12721273
pv_value_t tag_val;
12731274
int rc;
@@ -1280,7 +1281,7 @@ int cmd_broadcast_req(struct sip_msg *msg, int *cluster_id, str *gen_msg,
12801281
return -1;
12811282
}
12821283

1283-
rc = bcast_gen_msg(*cluster_id, gen_msg, &tag_val.rs);
1284+
rc = bcast_gen_msg(*cluster_id, gen_msg, &tag_val.rs, (all && *all));
12841285
switch (rc) {
12851286
case 0:
12861287
return 1;

modules/clusterer/doc/clusterer_admin.xml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -792,11 +792,16 @@ event_route[E_CLUSTERER_REQ_RECEIVED] {
792792

793793
<section id="func_cluster_broadcast_req" xreflabel="cluster_broadcast_req()">
794794
<title>
795-
<function moreinfo="none">cluster_broadcast_req(cluster_id, msg, [tag])</function>
795+
<function moreinfo="none">cluster_broadcast_req(cluster_id, msg, [tag], [include_self])</function>
796796
</title>
797797
<para>
798798
This function has a similar behaviour to the <function moreinfo="none">cluster_send_req()</function> function with the exception that the message is sent to all the nodes in the specified cluster.
799799
</para>
800+
<itemizedlist>
801+
<listitem>
802+
<para><emphasis>include_self</emphasis> (bool, optional, default: <emphasis>false</emphasis>) - raise the event for current node as well, but without actually sending a packet (both req and rpl).</para>
803+
</listitem>
804+
</itemizedlist>
800805
<para>
801806
The function can return the following values:
802807
<itemizedlist>
@@ -824,7 +829,8 @@ event_route[E_CLUSTERER_REQ_RECEIVED] {
824829
<title>cluster_broadcast_req() usage</title>
825830
<programlisting format="linespecific">
826831
...
827-
cluster_broadcast_req($var(cl_id), $var(share_data));
832+
# also raise the event for current node
833+
cluster_broadcast_req($var(cl_id), $var(share_data), , true);
828834
...
829835
</programlisting>
830836
</example>

modules/clusterer/node_info.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,7 @@ int match_node(const node_info_t *a, const node_info_t *b,
10141014
{
10151015
switch (match_op) {
10161016
case NODE_CMP_ANY:
1017+
case NODE_CMP_ALL:
10171018
break;
10181019
case NODE_CMP_EQ_SIP_ADDR:
10191020
lock_get(a->lock);

0 commit comments

Comments
 (0)