Skip to content

Commit 2c723b0

Browse files
committed
feat: Add observe and observe_deep callback-based event handlers
Add callback function-based event handler registration functions to shared type modules: - Array, Map, Text, XmlElement, XmlFragment, and XmlText - observe: Subscribe to events with a callback function - observe_deep: Subscribe to deep events with a callback function This provides an alternative to the existing Subscription-based approach, allowing simpler event handling patterns for user applications.
1 parent e8b7e4d commit 2c723b0

File tree

17 files changed

+671
-33
lines changed

17 files changed

+671
-33
lines changed

lib/doc.ex

Lines changed: 120 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -54,38 +54,52 @@ defmodule Yex.Doc do
5454
"""
5555
defmacro run_in_worker_process(doc, do: block) do
5656
quote do
57-
case unquote(doc).worker_pid do
58-
pid when pid == self() ->
59-
unquote(block)
60-
61-
nil ->
62-
raise RuntimeError, "Document has no worker process assigned"
63-
64-
worker_pid ->
65-
wrapped_fun = fn ->
66-
try do
67-
unquote(block)
68-
rescue
69-
e ->
70-
{Yex.Doc, :reraise, e, __STACKTRACE__}
71-
end
72-
end
57+
Yex.Doc.run_in_worker_process_fn(unquote(doc), fn ->
58+
unquote(block)
59+
end)
60+
end
61+
end
62+
63+
@spec run_in_worker_process_fn(
64+
atom() | %{:worker_pid => any(), optional(any()) => any()},
65+
any()
66+
) :: any()
67+
def run_in_worker_process_fn(doc, fun) do
68+
case doc.worker_pid do
69+
pid when pid == self() ->
70+
result = fun.()
71+
handle_on_update_handler()
72+
result
7373

74-
# When a Doc or SharedType operation is performed from a process different from the
75-
# process that created the Doc, a message like {Yex.Doc, :run, fun} is sent to the
76-
# creator process via GenServer.call and the processing is delegated to that process.
77-
# If you encounter a GenServer.call timeout, this delegation mechanism may be the cause.
78-
case GenServer.call(
79-
worker_pid,
80-
{Yex.Doc, :run, wrapped_fun}
81-
) do
82-
{Yex.Doc, :reraise, e, stacktrace} ->
83-
reraise e, stacktrace
84-
85-
result ->
86-
result
74+
nil ->
75+
raise RuntimeError, "Document has no worker process assigned"
76+
77+
worker_pid ->
78+
wrapped_fun = fn ->
79+
try do
80+
result = fun.()
81+
handle_on_update_handler()
82+
result
83+
rescue
84+
e ->
85+
{Yex.Doc, :reraise, e, __STACKTRACE__}
8786
end
88-
end
87+
end
88+
89+
# When a Doc or SharedType operation is performed from a process different from the
90+
# process that created the Doc, a message like {Yex.Doc, :run, fun} is sent to the
91+
# creator process via GenServer.call and the processing is delegated to that process.
92+
# If you encounter a GenServer.call timeout, this delegation mechanism may be the cause.
93+
case GenServer.call(
94+
worker_pid,
95+
{Yex.Doc, :run, wrapped_fun}
96+
) do
97+
{Yex.Doc, :reraise, e, stacktrace} ->
98+
reraise e, stacktrace
99+
100+
result ->
101+
result
102+
end
89103
end
90104
end
91105

@@ -285,6 +299,82 @@ defmodule Yex.Doc do
285299
end
286300
end
287301

302+
def on_subdocs(%__MODULE__{} = doc, handler) do
303+
result =
304+
run_in_worker_process(doc,
305+
do: Yex.Nif.doc_monitor_subdocs(doc, self(), {Yex.CallbackHandler, handler})
306+
)
307+
308+
case result do
309+
{:ok, sub} ->
310+
{:ok, Yex.Subscription.register(sub)}
311+
312+
error ->
313+
error
314+
end
315+
end
316+
317+
@spec on_update(Yex.Doc.t(), handler :: (update :: binary(), origin :: term() -> nil)) :: any()
318+
def on_update(%__MODULE__{} = doc, handler) do
319+
on_update_v1(doc, handler)
320+
end
321+
322+
@spec on_update_v1(Yex.Doc.t(), handler :: (update :: binary(), origin :: term() -> nil)) ::
323+
any()
324+
def on_update_v1(%__MODULE__{} = doc, handler) do
325+
result =
326+
run_in_worker_process(doc,
327+
do: Yex.Nif.doc_monitor_update_v1(doc, self(), {Yex.CallbackHandler, handler})
328+
)
329+
330+
case result do
331+
{:ok, sub} ->
332+
{:ok, Yex.Subscription.register(sub)}
333+
334+
error ->
335+
error
336+
end
337+
end
338+
339+
@spec on_update_v2(Yex.Doc.t(), handler :: (update :: binary(), origin :: term() -> nil)) ::
340+
any()
341+
def on_update_v2(%__MODULE__{} = doc, handler) do
342+
result =
343+
run_in_worker_process(doc,
344+
do: Yex.Nif.doc_monitor_update_v2(doc, self(), {Yex.CallbackHandler, handler})
345+
)
346+
347+
case result do
348+
{:ok, sub} ->
349+
{:ok, Yex.Subscription.register(sub)}
350+
351+
error ->
352+
error
353+
end
354+
end
355+
356+
defp handle_on_update_handler() do
357+
receive do
358+
{_, arg1, {Yex.CallbackHandler, handler}} ->
359+
handler.(arg1)
360+
handle_on_update_handler()
361+
362+
{:subdocs, event, origin, {Yex.CallbackHandler, handler}} ->
363+
handler.(event, origin)
364+
handle_on_update_handler()
365+
366+
{_, arg1, arg2, {Yex.CallbackHandler, handler}} ->
367+
handler.(arg1, arg2)
368+
handle_on_update_handler()
369+
370+
{_, _ref, event, origin, {Yex.ObserveCallbackHandler, handler}} ->
371+
handler.(event, origin)
372+
handle_on_update_handler()
373+
after
374+
0 -> :ok
375+
end
376+
end
377+
288378
defp cur_txn(%__MODULE__{reference: ref}) do
289379
Process.get(ref, nil)
290380
end

lib/shared_type/array.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,28 @@ defmodule Yex.Array do
388388
Enum.member?(to_list(array), val)
389389
end
390390

391+
@spec observe(
392+
Yex.Array.t(),
393+
handler :: (update :: Yex.ArrayEvent.t(), origin :: term() -> nil)
394+
) :: reference()
395+
def observe(%__MODULE__{doc: doc} = array, handler) do
396+
Yex.SharedType.observe(array,
397+
metadata: {Yex.ObserveCallbackHandler, handler},
398+
notify_pid: doc.worker_pid
399+
)
400+
end
401+
402+
@spec observe_deep(
403+
Yex.Array.t(),
404+
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
405+
) :: reference()
406+
def observe_deep(%__MODULE__{doc: doc} = array, handler) do
407+
Yex.SharedType.observe_deep(array,
408+
metadata: {Yex.ObserveCallbackHandler, handler},
409+
notify_pid: doc.worker_pid
410+
)
411+
end
412+
391413
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
392414
Process.get(doc_ref, nil)
393415
end

lib/shared_type/event.ex

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,20 @@ defmodule Yex.XmlTextEvent do
109109
delta: Yex.Text.delta()
110110
}
111111
end
112+
113+
defmodule Yex.WeakEvent do
114+
@moduledoc """
115+
116+
Event when Weak type changes
117+
118+
@see Yex.SharedType.observe/1
119+
@see Yex.SharedType.observe_deep/1
120+
"""
121+
defstruct [
122+
:path
123+
]
124+
125+
@type t :: %__MODULE__{
126+
path: list(number() | String.t())
127+
}
128+
end

lib/shared_type/map.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -325,6 +325,28 @@ defmodule Yex.Map do
325325
)
326326
end
327327

328+
@spec observe(
329+
t,
330+
handler :: (update :: Yex.MapEvent.t(), origin :: term() -> nil)
331+
) :: reference()
332+
def observe(%__MODULE__{doc: doc} = map, handler) do
333+
Yex.SharedType.observe(map,
334+
metadata: {Yex.ObserveCallbackHandler, handler},
335+
notify_pid: doc.worker_pid
336+
)
337+
end
338+
339+
@spec observe_deep(
340+
t,
341+
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
342+
) :: reference()
343+
def observe_deep(%__MODULE__{doc: doc} = map, handler) do
344+
Yex.SharedType.observe_deep(map,
345+
metadata: {Yex.ObserveCallbackHandler, handler},
346+
notify_pid: doc.worker_pid
347+
)
348+
end
349+
328350
@doc false
329351
# Gets the current transaction reference from the process dictionary for the given document
330352
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do

lib/shared_type/shared_type.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ defmodule Yex.SharedType do
3939
@spec observe(t, keyword()) :: reference()
4040
def observe(%{doc: doc} = shared_type, opt \\ []) do
4141
ref = make_ref()
42-
notify_pid = self()
42+
notify_pid = Keyword.get(opt, :notify_pid, self())
4343

4444
sub =
4545
Doc.run_in_worker_process(doc,
@@ -87,7 +87,7 @@ defmodule Yex.SharedType do
8787
@spec observe_deep(t, keyword()) :: reference()
8888
def observe_deep(%{doc: doc} = shared_type, opt \\ []) do
8989
ref = make_ref()
90-
notify_pid = self()
90+
notify_pid = Keyword.get(opt, :notify_pid, self())
9191

9292
sub =
9393
Doc.run_in_worker_process(doc,

lib/shared_type/text.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,28 @@ defmodule Yex.Text do
202202
)
203203
end
204204

205+
@spec observe(
206+
t,
207+
handler :: (update :: Yex.TextEvent.t(), origin :: term() -> nil)
208+
) :: reference()
209+
def observe(%__MODULE__{doc: doc} = text, handler) do
210+
Yex.SharedType.observe(text,
211+
metadata: {Yex.ObserveCallbackHandler, handler},
212+
notify_pid: doc.worker_pid
213+
)
214+
end
215+
216+
@spec observe_deep(
217+
t,
218+
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
219+
) :: reference()
220+
def observe_deep(%__MODULE__{doc: doc} = text, handler) do
221+
Yex.SharedType.observe_deep(text,
222+
metadata: {Yex.ObserveCallbackHandler, handler},
223+
notify_pid: doc.worker_pid
224+
)
225+
end
226+
205227
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
206228
Process.get(doc_ref, nil)
207229
end

lib/shared_type/xml_element.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -351,6 +351,28 @@ defmodule Yex.XmlElement do
351351
)
352352
end
353353

354+
@spec observe(
355+
t,
356+
handler :: (update :: Yex.XmlEvent.t(), origin :: term() -> nil)
357+
) :: reference()
358+
def observe(%__MODULE__{doc: doc} = xml_element, handler) do
359+
Yex.SharedType.observe(xml_element,
360+
metadata: {Yex.ObserveCallbackHandler, handler},
361+
notify_pid: doc.worker_pid
362+
)
363+
end
364+
365+
@spec observe_deep(
366+
t,
367+
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
368+
) :: reference()
369+
def observe_deep(%__MODULE__{doc: doc} = xml_element, handler) do
370+
Yex.SharedType.observe_deep(xml_element,
371+
metadata: {Yex.ObserveCallbackHandler, handler},
372+
notify_pid: doc.worker_pid
373+
)
374+
end
375+
354376
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
355377
Process.get(doc_ref, nil)
356378
end

lib/shared_type/xml_fragment.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,28 @@ defmodule Yex.XmlFragment do
280280
)
281281
end
282282

283+
@spec observe(
284+
t,
285+
handler :: (update :: Yex.XmlEvent.t(), origin :: term() -> nil)
286+
) :: reference()
287+
def observe(%__MODULE__{doc: doc} = xml_fragment, handler) do
288+
Yex.SharedType.observe(xml_fragment,
289+
metadata: {Yex.ObserveCallbackHandler, handler},
290+
notify_pid: doc.worker_pid
291+
)
292+
end
293+
294+
@spec observe_deep(
295+
t,
296+
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
297+
) :: reference()
298+
def observe_deep(%__MODULE__{doc: doc} = xml_fragment, handler) do
299+
Yex.SharedType.observe_deep(xml_fragment,
300+
metadata: {Yex.ObserveCallbackHandler, handler},
301+
notify_pid: doc.worker_pid
302+
)
303+
end
304+
283305
@doc false
284306
# Gets the current transaction reference from the process dictionary for the given document
285307
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do

lib/shared_type/xml_text.ex

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,28 @@ defmodule Yex.XmlText do
139139
)
140140
end
141141

142+
@spec observe(
143+
t,
144+
handler :: (update :: Yex.XmlTextEvent.t(), origin :: term() -> nil)
145+
) :: reference()
146+
def observe(%__MODULE__{doc: doc} = xml_text, handler) do
147+
Yex.SharedType.observe(xml_text,
148+
metadata: {Yex.ObserveCallbackHandler, handler},
149+
notify_pid: doc.worker_pid
150+
)
151+
end
152+
153+
@spec observe_deep(
154+
t,
155+
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
156+
) :: reference()
157+
def observe_deep(%__MODULE__{doc: doc} = xml_text, handler) do
158+
Yex.SharedType.observe_deep(xml_text,
159+
metadata: {Yex.ObserveCallbackHandler, handler},
160+
notify_pid: doc.worker_pid
161+
)
162+
end
163+
142164
@doc false
143165
# Gets the current transaction reference from the process dictionary for the given document
144166
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do

lib/y_ex.ex

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ defmodule Yex do
1515
any_type
1616
| Yex.PrelimType.t()
1717

18+
@type event_type ::
19+
Yex.XmlTextEvent.t()
20+
| Yex.XmlEvent.t()
21+
| Yex.TextEvent.t()
22+
| Yex.MapEvent.t()
23+
| Yex.ArrayEvent.t()
24+
| Yex.WeakEvent.t()
1825
@doc """
1926
Computes the state vector and encodes it into an Binary.
2027
A state vector describes the state of the local client.

0 commit comments

Comments
 (0)