Skip to content

Commit d3ff32a

Browse files
committed
Avoid messaging handled_event_ids and write/1
1 parent fe85d78 commit d3ff32a

File tree

4 files changed

+66
-82
lines changed

4 files changed

+66
-82
lines changed

lib/matrix_client/poller.ex

Lines changed: 11 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -33,18 +33,17 @@ defmodule M51.MatrixClient.Poller do
3333

3434
# If we are being restarted, pick up from where the last process stopped.
3535
since = M51.MatrixClient.State.poll_since_marker(state)
36-
handled_event_ids = M51.MatrixClient.State.handled_events(state)
3736

3837
if M51.IrcConn.State.registered(irc_state) do
39-
loop_poll(sup_pid, since, handled_event_ids)
38+
loop_poll(sup_pid, since)
4039
else
4140
receive do
42-
:start_polling -> loop_poll(sup_pid, since, handled_event_ids)
41+
:start_polling -> loop_poll(sup_pid, since)
4342
end
4443
end
4544
end
4645

47-
def loop_poll(sup_pid, since, handled_event_ids \\ MapSet.new()) do
46+
def loop_poll(sup_pid, since) do
4847
client = M51.IrcConn.Supervisor.matrix_client(sup_pid)
4948
state = M51.IrcConn.Supervisor.matrix_state(sup_pid)
5049

@@ -56,14 +55,13 @@ defmodule M51.MatrixClient.Poller do
5655
end
5756

5857
raw_client ->
59-
since = poll_one(sup_pid, since, handled_event_ids, raw_client)
58+
since = poll_one(sup_pid, since, raw_client)
6059
M51.MatrixClient.State.update_poll_since_marker(state, since)
61-
# do not pass handled_event_ids, no longer needed
6260
loop_poll(sup_pid, since)
6361
end
6462
end
6563

66-
defp poll_one(sup_pid, since, handled_event_ids, raw_client) do
64+
defp poll_one(sup_pid, since, raw_client) do
6765
query = %{
6866
# Completely arbitrary value. Just make sure it's lower than recv_timeout below
6967
"timeout" => "600000"
@@ -79,30 +77,19 @@ defmodule M51.MatrixClient.Poller do
7977

8078
case M51.Matrix.RawClient.get(raw_client, path, [], options) do
8179
{:ok, events} ->
82-
handle_events(sup_pid, is_backlog, events, handled_event_ids)
80+
handle_events(sup_pid, is_backlog, events)
8381
events["next_batch"]
8482

8583
{:error, code, _} when code >= 500 and code < 600 ->
8684
# server error, try again
87-
poll_one(sup_pid, since, handled_event_ids, raw_client)
85+
poll_one(sup_pid, since, raw_client)
8886
end
8987
end
9088

9189
@doc """
9290
Internal method that dispatches event; public only so it can be unit-tested.
9391
"""
94-
def handle_events(sup_pid, is_backlog, events, handled_event_ids \\ MapSet.new()) do
95-
irc_state = M51.IrcConn.Supervisor.state(sup_pid)
96-
capabilities = M51.IrcConn.State.capabilities(irc_state)
97-
writer = M51.IrcConn.Supervisor.writer(sup_pid)
98-
99-
write = fn cmd ->
100-
M51.IrcConn.Writer.write_command(
101-
writer,
102-
M51.Irc.Command.downgrade(cmd, capabilities)
103-
)
104-
end
105-
92+
def handle_events(sup_pid, is_backlog, events) do
10693
events
10794
|> Map.get("rooms", %{})
10895
|> Map.get("join", %{})
@@ -113,8 +100,6 @@ defmodule M51.MatrixClient.Poller do
113100
room_id,
114101
:join,
115102
is_backlog,
116-
handled_event_ids,
117-
write,
118103
event
119104
)
120105
end)
@@ -129,8 +114,6 @@ defmodule M51.MatrixClient.Poller do
129114
room_id,
130115
:leave,
131116
is_backlog,
132-
handled_event_ids,
133-
write,
134117
event
135118
)
136119
end)
@@ -145,8 +128,6 @@ defmodule M51.MatrixClient.Poller do
145128
room_id,
146129
:invite,
147130
is_backlog,
148-
handled_event_ids,
149-
write,
150131
event
151132
)
152133
end)
@@ -204,7 +185,7 @@ defmodule M51.MatrixClient.Poller do
204185
handle_event(sup_pid, room_id, sender, is_backlog, write, event)
205186
# Don't mark it handled right now, there is still some processing to
206187
# do below.
207-
# M51.MatrixClient.State.mark_handled_event(state, event_id)
188+
# M51.MatrixClient.State.mark_handled_event(state, room_id, event_id)
208189
end
209190
end)
210191

@@ -244,7 +225,7 @@ defmodule M51.MatrixClient.Poller do
244225

245226
handle_event(sup_pid, room_id, sender, is_backlog, write, event)
246227

247-
M51.MatrixClient.State.mark_handled_event(state, event_id)
228+
M51.MatrixClient.State.mark_handled_event(state, room_id, event_id)
248229
end
249230
end)
250231
end
@@ -1121,7 +1102,7 @@ defmodule M51.MatrixClient.Poller do
11211102
nil
11221103
end
11231104

1124-
M51.MatrixClient.State.mark_handled_event(state, event_id)
1105+
M51.MatrixClient.State.mark_handled_event(state, room_id, event_id)
11251106
end
11261107
end)
11271108
end

lib/matrix_client/room_handler.ex

Lines changed: 41 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -37,49 +37,52 @@ defmodule M51.MatrixClient.RoomHandler do
3737
end
3838

3939
@impl true
40-
def handle_cast({:events, :join, is_backlog, handled_event_ids, write, events}, state) do
40+
def handle_cast({:events, room_type, is_backlog, events}, state) do
4141
{sup_pid, room_id} = state
42+
state_pid = M51.IrcConn.Supervisor.matrix_state(sup_pid)
43+
irc_state = M51.IrcConn.Supervisor.state(sup_pid)
44+
capabilities = M51.IrcConn.State.capabilities(irc_state)
45+
writer = M51.IrcConn.Supervisor.writer(sup_pid)
46+
handled_event_ids = M51.MatrixClient.State.handled_events(state_pid, room_id)
4247

43-
M51.MatrixClient.Poller.handle_joined_room(
44-
sup_pid,
45-
is_backlog,
46-
handled_event_ids,
47-
room_id,
48-
write,
49-
events
50-
)
51-
52-
{:noreply, state}
53-
end
54-
55-
@impl true
56-
def handle_cast({:events, :leave, is_backlog, handled_event_ids, write, events}, state) do
57-
{sup_pid, room_id} = state
48+
write = fn cmd ->
49+
M51.IrcConn.Writer.write_command(
50+
writer,
51+
M51.Irc.Command.downgrade(cmd, capabilities)
52+
)
53+
end
5854

59-
M51.MatrixClient.Poller.handle_left_room(
60-
sup_pid,
61-
is_backlog,
62-
handled_event_ids,
63-
room_id,
64-
write,
65-
events
66-
)
55+
case room_type do
56+
:join ->
57+
M51.MatrixClient.Poller.handle_joined_room(
58+
sup_pid,
59+
is_backlog,
60+
handled_event_ids,
61+
room_id,
62+
write,
63+
events
64+
)
6765

68-
{:noreply, state}
69-
end
66+
:leave ->
67+
M51.MatrixClient.Poller.handle_left_room(
68+
sup_pid,
69+
is_backlog,
70+
handled_event_ids,
71+
room_id,
72+
write,
73+
events
74+
)
7075

71-
@impl true
72-
def handle_cast({:events, :invite, is_backlog, handled_event_ids, write, events}, state) do
73-
{sup_pid, room_id} = state
74-
75-
M51.MatrixClient.Poller.handle_invited_room(
76-
sup_pid,
77-
is_backlog,
78-
handled_event_ids,
79-
room_id,
80-
write,
81-
events
82-
)
76+
:invite ->
77+
M51.MatrixClient.Poller.handle_invited_room(
78+
sup_pid,
79+
is_backlog,
80+
handled_event_ids,
81+
room_id,
82+
write,
83+
events
84+
)
85+
end
8386

8487
{:noreply, state}
8588
end

lib/matrix_client/room_supervisor.ex

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,9 @@ defmodule M51.MatrixClient.RoomSupervisor do
5656
end
5757
end
5858

59-
def handle_events(sup_pid, room_id, type, is_backlog, handled_event_ids, write, events) do
60-
# TODO: fetch handled_event_ids in the server instead of message-passing it
61-
# TODO: define write/1 in the server instead of message-passing it
62-
59+
def handle_events(sup_pid, room_id, type, is_backlog, events) do
6360
room_handler_pid = M51.MatrixClient.RoomSupervisor.start_or_get_room_handler(sup_pid, room_id)
6461

65-
GenServer.cast(
66-
room_handler_pid,
67-
{:events, type, is_backlog, handled_event_ids, write, events}
68-
)
62+
GenServer.cast(room_handler_pid, {:events, type, is_backlog, events})
6963
end
7064
end

lib/matrix_client/state.ex

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,8 @@ defmodule M51.MatrixClient.State do
2626
# events handled since the last update to :poll_since (the poller updates
2727
# this set as it handles events in a batch; then updates :poll_since
2828
# an resets this set when it is done with a batch).
29-
handled_events: MapSet.new(),
29+
# Stored as a Map from room ids to the set of event ids.
30+
handled_events: Map.new(),
3031
# %{channel name => list of callbacks to run when a room
3132
# with that channel name is completely synced }
3233
channel_sync_callbacks: Map.new()
@@ -263,23 +264,28 @@ defmodule M51.MatrixClient.State do
263264
Agent.get(pid, fn state -> state.poll_since end)
264265
end
265266

266-
def handled_events(pid) do
267-
Agent.get(pid, fn state -> state.handled_events end)
267+
def handled_events(pid, room_id) do
268+
Agent.get(pid, fn state -> Map.get(state.handled_events, room_id) || MapSet.new() end)
268269
end
269270

270271
@doc """
271272
Updates the 'since' marker, and resets the 'handled_events' set.
272273
"""
273274
def update_poll_since_marker(pid, new_since_marker) do
274275
Agent.update(pid, fn state ->
275-
%{state | poll_since: new_since_marker, handled_events: MapSet.new()}
276+
%{state | poll_since: new_since_marker, handled_events: Map.new()}
276277
end)
277278
end
278279

279-
def mark_handled_event(pid, event_id) do
280+
def mark_handled_event(pid, room_id, event_id) do
280281
if event_id != nil do
281282
Agent.update(pid, fn state ->
282-
%{state | handled_events: MapSet.put(state.handled_events, event_id)}
283+
handled_events =
284+
Map.update(state.handled_events, room_id, nil, fn event_ids ->
285+
MapSet.put(event_ids || MapSet.new(), event_id)
286+
end)
287+
288+
%{state | handled_events: handled_events}
283289
end)
284290
end
285291
end

0 commit comments

Comments
 (0)