Skip to content

Commit fe85d78

Browse files
committed
Move Matrix event handling to room-specific processes
So a single room being crashy does not block everything
1 parent c26b6d9 commit fe85d78

File tree

5 files changed

+194
-6
lines changed

5 files changed

+194
-6
lines changed

lib/irc_conn/supervisor.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ defmodule M51.IrcConn.Supervisor do
3838
{M51.MatrixClient.Client, {self(), []}},
3939
{M51.MatrixClient.Sender, {self()}},
4040
{M51.MatrixClient.Poller, {self()}},
41+
{M51.MatrixClient.RoomSupervisor, {self()}},
4142
{M51.IrcConn.Handler, {self()}},
4243
{M51.IrcConn.Reader, {self(), sock}}
4344
]
@@ -75,6 +76,11 @@ defmodule M51.IrcConn.Supervisor do
7576
{:via, Registry, {M51.Registry, {sup, :matrix_poller}}}
7677
end
7778

79+
@doc "Returns the pid of the M51.IrcConn.Handler child."
80+
def matrix_room_supervisor(sup) do
81+
{:via, Registry, {M51.Registry, {sup, :matrix_room_supervisor}}}
82+
end
83+
7884
@doc "Returns the pid of the M51.IrcConn.Handler child."
7985
def handler(sup) do
8086
{:via, Registry, {M51.Registry, {sup, :irc_handler}}}

lib/matrix_client/poller.ex

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -108,23 +108,47 @@ defmodule M51.MatrixClient.Poller do
108108
|> Map.get("join", %{})
109109
|> Map.to_list()
110110
|> Enum.map(fn {room_id, event} ->
111-
handle_joined_room(sup_pid, is_backlog, handled_event_ids, room_id, write, event)
111+
M51.MatrixClient.RoomSupervisor.handle_events(
112+
sup_pid,
113+
room_id,
114+
:join,
115+
is_backlog,
116+
handled_event_ids,
117+
write,
118+
event
119+
)
112120
end)
113121

114122
events
115123
|> Map.get("rooms", %{})
116124
|> Map.get("leave", %{})
117125
|> Map.to_list()
118126
|> Enum.map(fn {room_id, event} ->
119-
handle_left_room(sup_pid, is_backlog, handled_event_ids, room_id, write, event)
127+
M51.MatrixClient.RoomSupervisor.handle_events(
128+
sup_pid,
129+
room_id,
130+
:leave,
131+
is_backlog,
132+
handled_event_ids,
133+
write,
134+
event
135+
)
120136
end)
121137

122138
events
123139
|> Map.get("rooms", %{})
124140
|> Map.get("invite", %{})
125141
|> Map.to_list()
126142
|> Enum.map(fn {room_id, event} ->
127-
handle_invited_room(sup_pid, is_backlog, handled_event_ids, room_id, write, event)
143+
M51.MatrixClient.RoomSupervisor.handle_events(
144+
sup_pid,
145+
room_id,
146+
:invite,
147+
is_backlog,
148+
handled_event_ids,
149+
write,
150+
event
151+
)
128152
end)
129153
end
130154

@@ -157,7 +181,7 @@ defmodule M51.MatrixClient.Poller do
157181
end
158182
end
159183

160-
defp handle_joined_room(sup_pid, is_backlog, handled_event_ids, room_id, write, room_event) do
184+
def handle_joined_room(sup_pid, is_backlog, handled_event_ids, room_id, write, room_event) do
161185
state = M51.IrcConn.Supervisor.matrix_state(sup_pid)
162186
irc_state = M51.IrcConn.Supervisor.state(sup_pid)
163187

@@ -1055,13 +1079,13 @@ defmodule M51.MatrixClient.Poller do
10551079
end
10561080
end
10571081

1058-
defp handle_left_room(sup_pid, _is_backlog, _handled_event_ids, _room_id, _write, _event) do
1082+
def handle_left_room(sup_pid, _is_backlog, _handled_event_ids, _room_id, _write, _event) do
10591083
_state = M51.IrcConn.Supervisor.matrix_state(sup_pid)
10601084
_writer = M51.IrcConn.Supervisor.writer(sup_pid)
10611085
# TODO
10621086
end
10631087

1064-
defp handle_invited_room(sup_pid, is_backlog, handled_event_ids, room_id, write, room_event) do
1088+
def handle_invited_room(sup_pid, is_backlog, handled_event_ids, room_id, write, room_event) do
10651089
irc_state = M51.IrcConn.Supervisor.state(sup_pid)
10661090
state = M51.IrcConn.Supervisor.matrix_state(sup_pid)
10671091
nick = M51.IrcConn.State.nick(irc_state)

lib/matrix_client/room_handler.ex

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
##
2+
# Copyright (C) 2022 Valentin Lorentz
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU Affero General Public License version 3,
6+
# as published by the Free Software Foundation.
7+
#
8+
# This program is distributed in the hope that it will be useful,
9+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
# GNU Affero General Public License for more details.
12+
#
13+
# You should have received a copy of the GNU Affero General Public License
14+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
15+
###
16+
17+
defmodule M51.MatrixClient.RoomHandler do
18+
@moduledoc """
19+
Receives events from a Matrix room and sends them to IRC.
20+
"""
21+
22+
use GenServer
23+
24+
def start_link(args) do
25+
{sup_pid, room_id} = args
26+
27+
GenServer.start_link(__MODULE__, args,
28+
name: {:via, Registry, {M51.Registry, {sup_pid, :matrix_room_handler, room_id}}}
29+
)
30+
end
31+
32+
@impl true
33+
def init(args) do
34+
{sup_pid, room_id} = args
35+
36+
{:ok, {sup_pid, room_id}}
37+
end
38+
39+
@impl true
40+
def handle_cast({:events, :join, is_backlog, handled_event_ids, write, events}, state) do
41+
{sup_pid, room_id} = state
42+
43+
M51.MatrixClient.Poller.handle_joined_room(
44+
sup_pid,
45+
is_backlog,
46+
handled_event_ids,
47+
room_id,
48+
write,
49+
events
50+
)
51+
52+
{:noreply, state}
53+
end
54+
55+
@impl true
56+
def handle_cast({:events, :leave, is_backlog, handled_event_ids, write, events}, state) do
57+
{sup_pid, room_id} = state
58+
59+
M51.MatrixClient.Poller.handle_left_room(
60+
sup_pid,
61+
is_backlog,
62+
handled_event_ids,
63+
room_id,
64+
write,
65+
events
66+
)
67+
68+
{:noreply, state}
69+
end
70+
71+
@impl true
72+
def handle_cast({:events, :invite, is_backlog, handled_event_ids, write, events}, state) do
73+
{sup_pid, room_id} = state
74+
75+
M51.MatrixClient.Poller.handle_invited_room(
76+
sup_pid,
77+
is_backlog,
78+
handled_event_ids,
79+
room_id,
80+
write,
81+
events
82+
)
83+
84+
{:noreply, state}
85+
end
86+
end
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
##
2+
# Copyright (C) 2022 Valentin Lorentz
3+
#
4+
# This program is free software: you can redistribute it and/or modify
5+
# it under the terms of the GNU Affero General Public License version 3,
6+
# as published by the Free Software Foundation.
7+
#
8+
# This program is distributed in the hope that it will be useful,
9+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
10+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11+
# GNU Affero General Public License for more details.
12+
#
13+
# You should have received a copy of the GNU Affero General Public License
14+
# along with this program. If not, see <https://www.gnu.org/licenses/>.
15+
###
16+
17+
defmodule M51.MatrixClient.RoomSupervisor do
18+
@moduledoc """
19+
Supervises a GenServer for each joined room, which receives events for
20+
the room and sends them to IRC.
21+
"""
22+
use DynamicSupervisor
23+
24+
def start_link(init_arg) do
25+
{sup_pid} = init_arg
26+
room_sup = M51.IrcConn.Supervisor.matrix_room_supervisor(sup_pid)
27+
DynamicSupervisor.start_link(__MODULE__, init_arg, name: room_sup)
28+
end
29+
30+
@impl true
31+
def init(init_arg) do
32+
{sup_pid} = init_arg
33+
34+
ret = DynamicSupervisor.init(strategy: :one_for_one)
35+
36+
Registry.register(M51.Registry, {sup_pid, :matrix_room_supervisor}, nil)
37+
38+
ret
39+
end
40+
41+
def start_or_get_room_handler(sup_pid, room_id) do
42+
room_sup = M51.IrcConn.Supervisor.matrix_room_supervisor(sup_pid)
43+
44+
case Registry.lookup(M51.Registry, {sup_pid, :matrix_room_handler, room_id}) do
45+
[] ->
46+
{:ok, new_pid} =
47+
DynamicSupervisor.start_child(
48+
room_sup,
49+
{M51.MatrixClient.RoomHandler, {sup_pid, room_id}}
50+
)
51+
52+
new_pid
53+
54+
[{existing_pid, _}] ->
55+
existing_pid
56+
end
57+
end
58+
59+
def handle_events(sup_pid, room_id, type, is_backlog, handled_event_ids, write, events) do
60+
# TODO: fetch handled_event_ids in the server instead of message-passing it
61+
# TODO: define write/1 in the server instead of message-passing it
62+
63+
room_handler_pid = M51.MatrixClient.RoomSupervisor.start_or_get_room_handler(sup_pid, room_id)
64+
65+
GenServer.cast(
66+
room_handler_pid,
67+
{:events, type, is_backlog, handled_event_ids, write, events}
68+
)
69+
end
70+
end

test/matrix_client/poller_test.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ defmodule M51.MatrixClient.PollerTest do
3535
start_supervised!({MockIrcConnWriter, {self()}})
3636
|> Process.register(MockIrcConnWriter)
3737

38+
start_supervised!({M51.MatrixClient.RoomSupervisor, {self()}})
39+
3840
M51.IrcConn.State.set_nick(:process_ircconn_state, "mynick:example.com")
3941

4042
:ok

0 commit comments

Comments
 (0)