Skip to content

Commit fd644c5

Browse files
author
Emile Joubert
committed
Extend member structure to prevent pid collisions
1 parent 28f1cff commit fd644c5

File tree

1 file changed

+22
-14
lines changed

1 file changed

+22
-14
lines changed

src/gm.erl

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -515,8 +515,8 @@ group_members(Server) ->
515515
init([GroupName, Module, Args]) ->
516516
{MegaSecs, Secs, MicroSecs} = now(),
517517
random:seed(MegaSecs, Secs, MicroSecs),
518+
Self = {rabbit_guid:guid(), self()},
518519
gen_server2:cast(self(), join),
519-
Self = self(),
520520
{ok, #state { self = Self,
521521
left = {Self, undefined},
522522
right = {Self, undefined},
@@ -541,7 +541,8 @@ handle_call({confirmed_broadcast, Msg}, _From,
541541
right = {Self, undefined},
542542
module = Module,
543543
callback_args = Args }) ->
544-
handle_callback_result({Module:handle_msg(Args, Self, Msg), ok, State});
544+
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
545+
ok, State});
545546

546547
handle_call({confirmed_broadcast, Msg}, From, State) ->
547548
internal_broadcast(Msg, From, State);
@@ -604,7 +605,8 @@ handle_cast({broadcast, Msg},
604605
right = {Self, undefined},
605606
module = Module,
606607
callback_args = Args }) ->
607-
handle_callback_result({Module:handle_msg(Args, Self, Msg), State});
608+
handle_callback_result({Module:handle_msg(Args, get_pid(Self), Msg),
609+
State});
608610

609611
handle_cast({broadcast, Msg}, State) ->
610612
internal_broadcast(Msg, none, State);
@@ -623,7 +625,7 @@ handle_cast(join, State = #state { self = Self,
623625
State1 = check_neighbours(State #state { view = View,
624626
members_state = MembersState }),
625627
handle_callback_result(
626-
{Module:joined(Args, all_known_members(View)), State1});
628+
{Module:joined(Args, get_pids(all_known_members(View))), State1});
627629

628630
handle_cast(leave, State) ->
629631
{stop, normal, State}.
@@ -817,7 +819,7 @@ internal_broadcast(Msg, From, State = #state { self = Self,
817819
confirms = Confirms,
818820
callback_args = Args,
819821
broadcast_buffer = Buffer }) ->
820-
Result = Module:handle_msg(Args, Self, Msg),
822+
Result = Module:handle_msg(Args, get_pid(Self), Msg),
821823
Buffer1 = [{PubCount, Msg} | Buffer],
822824
Confirms1 = case From of
823825
none -> Confirms;
@@ -979,7 +981,7 @@ join_group(Self, GroupName, #gm_group { members = Members } = Group) ->
979981
end,
980982
try
981983
case gen_server2:call(
982-
Left, {add_on_right, Self}, infinity) of
984+
get_pid(Left), {add_on_right, Self}, infinity) of
983985
{ok, Group1} -> group_to_view(Group1);
984986
not_ready -> join_group(Self, GroupName)
985987
end
@@ -1114,24 +1116,25 @@ can_erase_view_member(_Self, _Id, _LA, _LP) -> false.
11141116
ensure_neighbour(_Ver, Self, {Self, undefined}, Self) ->
11151117
{Self, undefined};
11161118
ensure_neighbour(Ver, Self, {Self, undefined}, RealNeighbour) ->
1117-
ok = gen_server2:cast(RealNeighbour, {?TAG, Ver, check_neighbours}),
1119+
ok = gen_server2:cast(get_pid(RealNeighbour),
1120+
{?TAG, Ver, check_neighbours}),
11181121
{RealNeighbour, maybe_monitor(RealNeighbour, Self)};
11191122
ensure_neighbour(_Ver, _Self, {RealNeighbour, MRef}, RealNeighbour) ->
11201123
{RealNeighbour, MRef};
11211124
ensure_neighbour(Ver, Self, {RealNeighbour, MRef}, Neighbour) ->
11221125
true = erlang:demonitor(MRef),
11231126
Msg = {?TAG, Ver, check_neighbours},
1124-
ok = gen_server2:cast(RealNeighbour, Msg),
1127+
ok = gen_server2:cast(get_pid(RealNeighbour), Msg),
11251128
ok = case Neighbour of
11261129
Self -> ok;
1127-
_ -> gen_server2:cast(Neighbour, Msg)
1130+
_ -> gen_server2:cast(get_pid(Neighbour), Msg)
11281131
end,
11291132
{Neighbour, maybe_monitor(Neighbour, Self)}.
11301133

11311134
maybe_monitor(Self, Self) ->
11321135
undefined;
11331136
maybe_monitor(Other, _Self) ->
1134-
erlang:monitor(process, Other).
1137+
erlang:monitor(process, get_pid(Other)).
11351138

11361139
check_neighbours(State = #state { self = Self,
11371140
left = Left,
@@ -1238,6 +1241,9 @@ prepare_members_state(MembersState) ->
12381241
build_members_state(MembersStateList) ->
12391242
?DICT:from_list(MembersStateList).
12401243

1244+
get_pid({_Guid, Pid}) -> Pid.
1245+
1246+
get_pids(Ids) -> [Pid || {_Guid, Pid} <- Ids].
12411247

12421248
%% ---------------------------------------------------------------------------
12431249
%% Activity assembly
@@ -1262,13 +1268,13 @@ maybe_send_activity(Activity, #state { self = Self,
12621268
send_right(Right, View, {activity, Self, Activity}).
12631269

12641270
send_right(Right, View, Msg) ->
1265-
ok = gen_server2:cast(Right, {?TAG, view_version(View), Msg}).
1271+
ok = gen_server2:cast(get_pid(Right), {?TAG, view_version(View), Msg}).
12661272

12671273
callback(Args, Module, Activity) ->
12681274
lists:foldl(
12691275
fun ({Id, Pubs, _Acks}, ok) ->
12701276
lists:foldl(fun ({_PubNum, Pub}, ok) ->
1271-
Module:handle_msg(Args, Id, Pub);
1277+
Module:handle_msg(Args, get_pid(Id), Pub);
12721278
(_, Error) ->
12731279
Error
12741280
end, ok, Pubs);
@@ -1283,7 +1289,8 @@ callback_view_changed(Args, Module, OldView, NewView) ->
12831289
Deaths = OldMembers -- NewMembers,
12841290
case {Births, Deaths} of
12851291
{[], []} -> ok;
1286-
_ -> Module:members_changed(Args, Births, Deaths)
1292+
_ -> Module:members_changed(Args, get_pids(Births),
1293+
get_pids(Deaths))
12871294
end.
12881295

12891296
handle_callback_result({Result, State}) ->
@@ -1333,7 +1340,8 @@ maybe_confirm(_Self, _Id, Confirms, _PubNums) ->
13331340
Confirms.
13341341

13351342
purge_confirms(Confirms) ->
1336-
[gen_server2:reply(From, ok) || {_PubNum, From} <- queue:to_list(Confirms)],
1343+
[gen_server2:reply(From, ok) ||
1344+
{_PubNum, From} <- queue:to_list(Confirms)],
13371345
queue:new().
13381346

13391347

0 commit comments

Comments
 (0)