Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 120 additions & 30 deletions lib/doc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,52 @@ defmodule Yex.Doc do
"""
defmacro run_in_worker_process(doc, do: block) do
quote do
case unquote(doc).worker_pid do
pid when pid == self() ->
unquote(block)

nil ->
raise RuntimeError, "Document has no worker process assigned"

worker_pid ->
wrapped_fun = fn ->
try do
unquote(block)
rescue
e ->
{Yex.Doc, :reraise, e, __STACKTRACE__}
end
end
Yex.Doc.run_in_worker_process_fn(unquote(doc), fn ->
unquote(block)
end)
end
end

@spec run_in_worker_process_fn(
atom() | %{:worker_pid => any(), optional(any()) => any()},
any()
) :: any()
def run_in_worker_process_fn(doc, fun) do
case doc.worker_pid do
pid when pid == self() ->
result = fun.()
handle_on_update_handler()
result

# When a Doc or SharedType operation is performed from a process different from the
# process that created the Doc, a message like {Yex.Doc, :run, fun} is sent to the
# creator process via GenServer.call and the processing is delegated to that process.
# If you encounter a GenServer.call timeout, this delegation mechanism may be the cause.
case GenServer.call(
worker_pid,
{Yex.Doc, :run, wrapped_fun}
) do
{Yex.Doc, :reraise, e, stacktrace} ->
reraise e, stacktrace

result ->
result
nil ->
raise RuntimeError, "Document has no worker process assigned"

worker_pid ->
wrapped_fun = fn ->
try do
result = fun.()
handle_on_update_handler()
result
rescue
e ->
{Yex.Doc, :reraise, e, __STACKTRACE__}
end
end
end

# When a Doc or SharedType operation is performed from a process different from the
# process that created the Doc, a message like {Yex.Doc, :run, fun} is sent to the
# creator process via GenServer.call and the processing is delegated to that process.
# If you encounter a GenServer.call timeout, this delegation mechanism may be the cause.
case GenServer.call(
worker_pid,
{Yex.Doc, :run, wrapped_fun}
) do
{Yex.Doc, :reraise, e, stacktrace} ->
reraise e, stacktrace

result ->
result
end
end
end

Expand Down Expand Up @@ -285,6 +299,82 @@ defmodule Yex.Doc do
end
end

def on_subdocs(%__MODULE__{} = doc, handler) do
result =
run_in_worker_process(doc,
do: Yex.Nif.doc_monitor_subdocs(doc, self(), {Yex.CallbackHandler, handler})
)

case result do
{:ok, sub} ->
{:ok, Yex.Subscription.register(sub)}

error ->
error
end
end

@spec on_update(Yex.Doc.t(), handler :: (update :: binary(), origin :: term() -> nil)) :: any()
def on_update(%__MODULE__{} = doc, handler) do
on_update_v1(doc, handler)
end

@spec on_update_v1(Yex.Doc.t(), handler :: (update :: binary(), origin :: term() -> nil)) ::
any()
def on_update_v1(%__MODULE__{} = doc, handler) do
result =
run_in_worker_process(doc,
do: Yex.Nif.doc_monitor_update_v1(doc, self(), {Yex.CallbackHandler, handler})
)

case result do
{:ok, sub} ->
{:ok, Yex.Subscription.register(sub)}

error ->
error
end
end

@spec on_update_v2(Yex.Doc.t(), handler :: (update :: binary(), origin :: term() -> nil)) ::
any()
def on_update_v2(%__MODULE__{} = doc, handler) do
result =
run_in_worker_process(doc,
do: Yex.Nif.doc_monitor_update_v2(doc, self(), {Yex.CallbackHandler, handler})
)

case result do
{:ok, sub} ->
{:ok, Yex.Subscription.register(sub)}

error ->
error
end
end

defp handle_on_update_handler() do
receive do
{_, arg1, {Yex.CallbackHandler, handler}} ->
handler.(arg1)
handle_on_update_handler()
Comment on lines +358 to +360
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Remove incorrect receive clause - no 3-tuple messages are sent.

This receive clause expects a 3-tuple {_, arg1, {Yex.CallbackHandler, handler}} and calls the handler with only one argument. However:

  1. The NIF code sends 4-tuples for doc-level callbacks (subdocs, update_v1, update_v2)
  2. All handler specs (lines 317, 322, 339) define handlers that take two arguments: (update, origin)
  3. There is no code path that sends a 3-tuple message

This clause will never match and would call handlers incorrectly if it did. The other clauses (lines 362-364, 366-368, 370-372) correctly handle all expected message formats.

🔎 Proposed fix
 defp handle_on_update_handler() do
   receive do
-    {_, arg1, {Yex.CallbackHandler, handler}} ->
-      handler.(arg1)
-      handle_on_update_handler()
-
     {:subdocs, event, origin, {Yex.CallbackHandler, handler}} ->
       handler.(event, origin)
       handle_on_update_handler()
🤖 Prompt for AI Agents
In lib/doc.ex around lines 358 to 360, remove the incorrect receive clause that
matches a 3-tuple `{_, arg1, {Yex.CallbackHandler, handler}}`; it never occurs
(NIF sends 4-tuples) and would call handlers with a single arg contrary to
handler specs expecting two args. Delete that branch so only the existing
clauses that match 4-tuples and invoke handlers with (update, origin) remain;
ensure no other code references or relies on the removed pattern.


{:subdocs, event, origin, {Yex.CallbackHandler, handler}} ->
handler.(event, origin)
handle_on_update_handler()

{_, arg1, arg2, {Yex.CallbackHandler, handler}} ->
handler.(arg1, arg2)
handle_on_update_handler()

{_, _ref, event, origin, {Yex.ObserveCallbackHandler, handler}} ->
handler.(event, origin)
handle_on_update_handler()
after
0 -> :ok
end
end

defp cur_txn(%__MODULE__{reference: ref}) do
Process.get(ref, nil)
end
Expand Down
22 changes: 22 additions & 0 deletions lib/shared_type/array.ex
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,28 @@ defmodule Yex.Array do
Enum.member?(to_list(array), val)
end

@spec observe(
Yex.Array.t(),
handler :: (update :: Yex.ArrayEvent.t(), origin :: term() -> nil)
) :: reference()
def observe(%__MODULE__{doc: doc} = array, handler) do
Yex.SharedType.observe(array,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@spec observe_deep(
Yex.Array.t(),
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
) :: reference()
def observe_deep(%__MODULE__{doc: doc} = array, handler) do
Yex.SharedType.observe_deep(array,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
Process.get(doc_ref, nil)
end
Expand Down
17 changes: 17 additions & 0 deletions lib/shared_type/event.ex
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,20 @@ defmodule Yex.XmlTextEvent do
delta: Yex.Text.delta()
}
end

defmodule Yex.WeakEvent do
@moduledoc """

Event when Weak type changes

@see Yex.SharedType.observe/1
@see Yex.SharedType.observe_deep/1
"""
defstruct [
:path
]

@type t :: %__MODULE__{
path: list(number() | String.t())
}
end
22 changes: 22 additions & 0 deletions lib/shared_type/map.ex
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,28 @@ defmodule Yex.Map do
)
end

@spec observe(
t,
handler :: (update :: Yex.MapEvent.t(), origin :: term() -> nil)
) :: reference()
def observe(%__MODULE__{doc: doc} = map, handler) do
Yex.SharedType.observe(map,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@spec observe_deep(
t,
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
) :: reference()
def observe_deep(%__MODULE__{doc: doc} = map, handler) do
Yex.SharedType.observe_deep(map,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@doc false
# Gets the current transaction reference from the process dictionary for the given document
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
Expand Down
4 changes: 2 additions & 2 deletions lib/shared_type/shared_type.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule Yex.SharedType do
@spec observe(t, keyword()) :: reference()
def observe(%{doc: doc} = shared_type, opt \\ []) do
ref = make_ref()
notify_pid = self()
notify_pid = Keyword.get(opt, :notify_pid, self())

sub =
Doc.run_in_worker_process(doc,
Expand Down Expand Up @@ -87,7 +87,7 @@ defmodule Yex.SharedType do
@spec observe_deep(t, keyword()) :: reference()
def observe_deep(%{doc: doc} = shared_type, opt \\ []) do
ref = make_ref()
notify_pid = self()
notify_pid = Keyword.get(opt, :notify_pid, self())

sub =
Doc.run_in_worker_process(doc,
Expand Down
22 changes: 22 additions & 0 deletions lib/shared_type/text.ex
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,28 @@ defmodule Yex.Text do
)
end

@spec observe(
t,
handler :: (update :: Yex.TextEvent.t(), origin :: term() -> nil)
) :: reference()
def observe(%__MODULE__{doc: doc} = text, handler) do
Yex.SharedType.observe(text,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@spec observe_deep(
t,
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
) :: reference()
def observe_deep(%__MODULE__{doc: doc} = text, handler) do
Yex.SharedType.observe_deep(text,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
Process.get(doc_ref, nil)
end
Expand Down
22 changes: 22 additions & 0 deletions lib/shared_type/xml_element.ex
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,28 @@ defmodule Yex.XmlElement do
)
end

@spec observe(
t,
handler :: (update :: Yex.XmlEvent.t(), origin :: term() -> nil)
) :: reference()
def observe(%__MODULE__{doc: doc} = xml_element, handler) do
Yex.SharedType.observe(xml_element,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@spec observe_deep(
t,
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
) :: reference()
def observe_deep(%__MODULE__{doc: doc} = xml_element, handler) do
Yex.SharedType.observe_deep(xml_element,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
Process.get(doc_ref, nil)
end
Expand Down
22 changes: 22 additions & 0 deletions lib/shared_type/xml_fragment.ex
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,28 @@ defmodule Yex.XmlFragment do
)
end

@spec observe(
t,
handler :: (update :: Yex.XmlEvent.t(), origin :: term() -> nil)
) :: reference()
def observe(%__MODULE__{doc: doc} = xml_fragment, handler) do
Yex.SharedType.observe(xml_fragment,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@spec observe_deep(
t,
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
) :: reference()
def observe_deep(%__MODULE__{doc: doc} = xml_fragment, handler) do
Yex.SharedType.observe_deep(xml_fragment,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@doc false
# Gets the current transaction reference from the process dictionary for the given document
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
Expand Down
22 changes: 22 additions & 0 deletions lib/shared_type/xml_text.ex
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,28 @@ defmodule Yex.XmlText do
)
end

@spec observe(
t,
handler :: (update :: Yex.XmlTextEvent.t(), origin :: term() -> nil)
) :: reference()
def observe(%__MODULE__{doc: doc} = xml_text, handler) do
Yex.SharedType.observe(xml_text,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@spec observe_deep(
t,
handler :: (update :: list(Yex.event_type()), origin :: term() -> nil)
) :: reference()
def observe_deep(%__MODULE__{doc: doc} = xml_text, handler) do
Yex.SharedType.observe_deep(xml_text,
metadata: {Yex.ObserveCallbackHandler, handler},
notify_pid: doc.worker_pid
)
end

@doc false
# Gets the current transaction reference from the process dictionary for the given document
defp cur_txn(%{doc: %Yex.Doc{reference: doc_ref}}) do
Expand Down
7 changes: 7 additions & 0 deletions lib/y_ex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ defmodule Yex do
any_type
| Yex.PrelimType.t()

@type event_type ::
Yex.XmlTextEvent.t()
| Yex.XmlEvent.t()
| Yex.TextEvent.t()
| Yex.MapEvent.t()
| Yex.ArrayEvent.t()
| Yex.WeakEvent.t()
@doc """
Computes the state vector and encodes it into an Binary.
A state vector describes the state of the local client.
Expand Down
Loading
Loading