Skip to content

Commit 051cc53

Browse files
committed
Add subscription ordinal support
1 parent c6ded5a commit 051cc53

File tree

4 files changed

+101
-58
lines changed

4 files changed

+101
-58
lines changed

lib/absinthe/phoenix/channel.ex

Lines changed: 62 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ defmodule Absinthe.Phoenix.Channel do
3737
socket =
3838
socket
3939
|> assign(:absinthe, absinthe_config)
40-
|> assign(:async_procs, [])
4140
{:ok, socket}
4241
end
4342

@@ -90,31 +89,56 @@ defmodule Absinthe.Phoenix.Channel do
9089
{:reply, {:ok, %{subscriptionId: doc_id}}, socket}
9190
end
9291

92+
def handle_info(
93+
%Phoenix.Socket.Broadcast{payload: %{result: %{ordinal: ordinal}}} = msg,
94+
socket
95+
) when not is_nil(ordinal) do
96+
absinthe_assigns = Map.get(socket.assigns, :absinthe, %{})
97+
last_ordinal = absinthe_assigns[:subscription_ordinals][msg.topic]
98+
99+
cond do
100+
last_ordinal == nil or last_ordinal < ordinal ->
101+
send_msg(msg, socket)
102+
socket = update_ordinal(socket, msg.topic, ordinal)
103+
{:noreply, socket}
104+
true ->
105+
{:noreply, socket}
106+
end
107+
end
108+
109+
def handle_info(msg, socket) do
110+
send_msg(msg, socket)
111+
{:noreply, socket}
112+
end
113+
114+
defp send_msg(msg, socket) do
115+
encoded_msg = socket.serializer.fastlane!(msg)
116+
send(socket.transport_pid, encoded_msg)
117+
end
118+
119+
defp update_ordinal(socket, topic, ordinal) do
120+
absinthe_assigns = Map.get(socket, :absinthe, %{})
121+
ordinals =
122+
absinthe_assigns
123+
|> Map.get(:subscription_ordinals, %{})
124+
|> Map.put(topic, ordinal)
125+
126+
Phoenix.Socket.assign(socket, :absinthe, Map.put(absinthe_assigns, :subscription_ordinals, ordinals))
127+
end
128+
93129
defp run_doc(socket, query, config, opts) do
94130
case run(query, config[:schema], config[:pipeline], opts) do
95131
{:ok, %{"subscribed" => topic}, context} ->
96-
%{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server} =
97-
socket
98-
99-
:ok =
100-
Phoenix.PubSub.subscribe(
101-
pubsub_server,
102-
topic,
103-
metadata: {:fastlane, transport_pid, serializer, []},
104-
link: true
105-
)
106-
132+
pubsub_subscribe(topic, socket)
107133
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
134+
108135
{{:ok, %{subscriptionId: topic}}, socket}
109136

110137
{:more, %{"subscribed" => topic}, continuation, context} ->
111-
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
112138
reply(socket_ref(socket), {:ok, %{subscriptionId: topic}})
113139

114-
:ok = Phoenix.PubSub.subscribe(socket.pubsub_server, topic, [
115-
fastlane: {socket.transport_pid, socket.serializer, []},
116-
link: true,
117-
])
140+
pubsub_subscribe(topic, socket)
141+
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
118142

119143
handle_subscription_continuation(continuation, topic, socket)
120144

@@ -129,10 +153,11 @@ defmodule Absinthe.Phoenix.Channel do
129153
{{:error, reply}, socket}
130154

131155
{:more, %{data: _} = reply, continuation, context} ->
132-
socket = Absinthe.Phoenix.Socket.put_options(socket, context: context)
133-
134156
id = new_query_id()
135-
socket = handle_continuation(continuation, id, socket)
157+
socket =
158+
socket
159+
|> Absinthe.Phoenix.Socket.put_options(context: context)
160+
|> handle_continuation(continuation, id)
136161

137162
{{:ok, add_query_id(reply, id)}, socket}
138163

@@ -154,6 +179,20 @@ defmodule Absinthe.Phoenix.Channel do
154179
end
155180
end
156181

182+
defp pubsub_subscribe(
183+
topic,
184+
%{transport_pid: transport_pid, serializer: serializer, pubsub_server: pubsub_server}
185+
) do
186+
:ok =
187+
Phoenix.PubSub.subscribe(
188+
pubsub_server,
189+
topic,
190+
metadata: {:fastlane, transport_pid, serializer, ["subscription:data"]},
191+
link: true
192+
)
193+
end
194+
195+
157196
defp extract_variables(payload) do
158197
case Map.get(payload, "variables", %{}) do
159198
nil -> %{}
@@ -167,40 +206,16 @@ defmodule Absinthe.Phoenix.Channel do
167206
|> Absinthe.Pipeline.for_document(options)
168207
end
169208

170-
def handle_info({:DOWN, ref, :process, _pid, _reason}, socket) do
171-
procs = List.delete(socket.assigns.async_procs, ref)
172-
{:noreply, assign(socket, :async_procs, procs)}
173-
end
174-
175-
def handle_info(_, state) do
176-
{:noreply, state}
177-
end
178-
179-
defp handle_continuation(continuation, id, socket) do
180-
max_procs = socket.assigns[:max_async_procs] || 0
181-
182-
case socket.assigns.async_procs do
183-
procs when length(procs) < max_procs ->
184-
{_pid, ref} = Process.spawn(
185-
fn -> do_handle_continuation(continuation, id, socket) end,
186-
[:link, :monitor])
187-
assign(socket, :async_procs, [ref | procs])
188-
_ ->
189-
do_handle_continuation(continuation, id, socket)
190-
socket
191-
end
192-
end
193-
194-
defp do_handle_continuation(continuation, id, socket) do
209+
defp handle_continuation(socket, continuation, id) do
195210
case Absinthe.Pipeline.continue(continuation) do
196-
{:ok, %{result: %{continuation: continuation} = result}, _phases} ->
211+
{:ok, %{result: %{continuation: next_continuation} = result}, _phases} ->
197212
result =
198213
result
199214
|> Map.delete(:continuation)
200215
|> add_query_id(id)
201216

202217
push socket, "doc", result
203-
do_handle_continuation(continuation, id, socket)
218+
handle_continuation(socket, next_continuation, id)
204219
{:ok, %{result: result}, _phases} ->
205220
push socket, "doc", add_query_id(result, id)
206221
{:ok, %{errors: errors}, _phases} ->

mix.lock

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,21 @@
11
%{
2-
"absinthe": {:git, "https://github.com/circles-learning-labs/absinthe.git", "a88e7c2fc645b489815eafbd423f93e01754bbaf", [branch: "subscription-prime"]},
2+
"absinthe": {:git, "https://github.com/circles-learning-labs/absinthe.git", "d7bddaa17f01b475932ea1e69866a09f478d4726", [branch: "subscription-prime"]},
33
"absinthe_plug": {:hex, :absinthe_plug, "1.5.0", "018ef544cf577339018d1f482404b4bed762e1b530c78be9de4bbb88a6f3a805", [:mix], [{:absinthe, "~> 1.5.0", [hex: :absinthe, repo: "hexpm", optional: false]}, {:plug, "~> 1.3.2 or ~> 1.4", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "4c160f4ce9a1233a4219a42de946e4e05d0e8733537cd5d8d20e7d4ef8d4b7c7"},
4-
"decimal": {:hex, :decimal, "1.9.0", "83e8daf59631d632b171faabafb4a9f4242c514b0a06ba3df493951c08f64d07", [:mix], [], "hexpm", "b1f2343568eed6928f3e751cf2dffde95bfaa19dd95d09e8a9ea92ccfd6f7d85"},
4+
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
55
"earmark": {:hex, :earmark, "1.4.4", "4821b8d05cda507189d51f2caeef370cf1e18ca5d7dfb7d31e9cafe6688106a4", [:mix], [], "hexpm", "1f93aba7340574847c0f609da787f0d79efcab51b044bb6e242cae5aca9d264d"},
66
"earmark_parser": {:hex, :earmark_parser, "1.4.12", "b245e875ec0a311a342320da0551da407d9d2b65d98f7a9597ae078615af3449", [:mix], [], "hexpm", "711e2cc4d64abb7d566d43f54b78f7dc129308a63bc103fbd88550d2174b3160"},
77
"ex_doc": {:hex, :ex_doc, "0.24.1", "15673de99154f93ca7f05900e4e4155ced1ee0cd34e0caeee567900a616871a4", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "07972f17bdf7dc7b5bd76ec97b556b26178ed3f056e7ec9288eb7cea7f91cce2"},
8-
"jason": {:hex, :jason, "1.2.1", "12b22825e22f468c02eb3e4b9985f3d0cb8dc40b9bd704730efa11abd2708c44", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "b659b8571deedf60f79c5a608e15414085fa141344e2716fbd6988a084b5f993"},
8+
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
99
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
1010
"makeup_elixir": {:hex, :makeup_elixir, "0.14.1", "4f0e96847c63c17841d42c08107405a005a2680eb9c7ccadfd757bd31dabccfb", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f2438b1a80eaec9ede832b5c41cd4f373b38fd7aa33e3b22d9db79e640cbde11"},
1111
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
12-
"mime": {:hex, :mime, "1.3.1", "30ce04ab3175b6ad0bdce0035cba77bba68b813d523d1aac73d9781b4d193cf8", [:mix], [], "hexpm", "6cbe761d6a0ca5a31a0931bf4c63204bceb64538e664a8ecf784a9a6f3b875f1"},
12+
"mime": {:hex, :mime, "2.0.2", "0b9e1a4c840eafb68d820b0e2158ef5c49385d17fb36855ac6e7e087d4b1dcc5", [:mix], [], "hexpm", "e6a3f76b4c277739e36c2e21a2c640778ba4c3846189d5ab19f97f126df5f9b7"},
1313
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
1414
"phoenix": {:hex, :phoenix, "1.5.1", "95156589879dc69201d5fc0ebdbfdfc7901a09a3616ea611ec297f81340275a2", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "fc272b38e79d2881790fccae6f67a9fbe9b790103d6878175ea03d23003152eb"},
15-
"phoenix_html": {:hex, :phoenix_html, "2.14.2", "b8a3899a72050f3f48a36430da507dd99caf0ac2d06c77529b1646964f3d563e", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "58061c8dfd25da5df1ea0ca47c972f161beb6c875cd293917045b92ffe1bf617"},
16-
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"},
17-
"plug": {:hex, :plug, "1.10.0", "6508295cbeb4c654860845fb95260737e4a8838d34d115ad76cd487584e2fc4d", [:mix], [{:mime, "~> 1.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "422a9727e667be1bf5ab1de03be6fa0ad67b775b2d84ed908f3264415ef29d4a"},
18-
"plug_crypto": {:hex, :plug_crypto, "1.1.2", "bdd187572cc26dbd95b87136290425f2b580a116d3fb1f564216918c9730d227", [:mix], [], "hexpm", "6b8b608f895b6ffcfad49c37c7883e8df98ae19c6a28113b02aa1e9c5b22d6b5"},
15+
"phoenix_html": {:hex, :phoenix_html, "2.14.3", "51f720d0d543e4e157ff06b65de38e13303d5778a7919bcc696599e5934271b8", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "efd697a7fff35a13eeeb6b43db884705cba353a1a41d127d118fda5f90c8e80f"},
16+
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.1.1", "ba04e489ef03763bf28a17eb2eaddc2c20c6d217e2150a61e3298b0f4c2012b5", [:mix], [], "hexpm", "81367c6d1eea5878ad726be80808eb5a787a23dee699f96e72b1109c57cdd8d9"},
17+
"phoenix_view": {:hex, :phoenix_view, "1.1.2", "1b82764a065fb41051637872c7bd07ed2fdb6f5c3bd89684d4dca6e10115c95a", [:mix], [{:phoenix_html, "~> 2.14.2 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}], "hexpm", "7ae90ad27b09091266f6adbb61e1d2516a7c3d7062c6789d46a7554ec40f3a56"},
18+
"plug": {:hex, :plug, "1.13.6", "187beb6b67c6cec50503e940f0434ea4692b19384d47e5fdfd701e93cadb4cc2", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "02b9c6b9955bce92c829f31d6284bf53c591ca63c4fb9ff81dfd0418667a34ff"},
19+
"plug_crypto": {:hex, :plug_crypto, "1.2.2", "05654514ac717ff3a1843204b424477d9e60c143406aa94daf2274fdd280794d", [:mix], [], "hexpm", "87631c7ad914a5a445f0a3809f99b079113ae4ed4b867348dd9eec288cecb6db"},
1920
"telemetry": {:hex, :telemetry, "0.4.3", "a06428a514bdbc63293cd9a6263aad00ddeb66f608163bdec7c8995784080818", [:rebar3], [], "hexpm", "eb72b8365ffda5bed68a620d1da88525e326cb82a75ee61354fc24b844768041"},
2021
}

test/absinthe/phoenix_test.exs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ defmodule Absinthe.PhoenixTest do
100100
assert_push("subscription:data", push)
101101

102102
expected = %{
103-
result: %{data: %{"commentAdded" => %{"contents" => "hello world"}}},
103+
result: %{data: %{"commentAdded" => %{"contents" => "hello world"}}, ordinal: nil},
104104
subscriptionId: subscription_ref
105105
}
106106

@@ -145,6 +145,27 @@ defmodule Absinthe.PhoenixTest do
145145
assert expected == push
146146
end
147147

148+
test "subscription with ordinal", %{socket: socket} do
149+
ref = push socket, "doc", %{"query" => "subscription { ordinal }"}
150+
151+
assert_reply ref, :ok, %{subscriptionId: subscription_ref}
152+
153+
Absinthe.Subscription.publish(TestEndpoint, 1, ordinal: "ordinal_topic")
154+
155+
assert_push "subscription:data", push
156+
expected = %{result: %{data: %{"ordinal" => 1}, ordinal: 1}, subscriptionId: subscription_ref}
157+
assert expected == push
158+
159+
Absinthe.Subscription.publish(TestEndpoint, 0, ordinal: "ordinal_topic")
160+
# This message should not generate a notification because it has a lower ordinal
161+
162+
Absinthe.Subscription.publish(TestEndpoint, 2, ordinal: "ordinal_topic")
163+
164+
assert_push "subscription:data", push
165+
expected = %{result: %{data: %{"ordinal" => 2}, ordinal: 2}, subscriptionId: subscription_ref}
166+
assert expected == push
167+
end
168+
148169
test "context changes are persisted across documents", %{socket: socket} do
149170
ref =
150171
push(socket, "doc", %{

test/support/schema.ex

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,17 @@ defmodule Schema do
9999
end
100100

101101
field :prime, :string do
102-
config fn _, %{context: %{pubsub: pubsub}} ->
103-
{:ok, topic: "prime_topic", prime: fn ->
102+
config fn _, _ ->
103+
{:ok, topic: "prime_topic", prime: fn _ ->
104104
{:ok, ["prime1", "prime2"]}
105105
end}
106106
end
107107
end
108+
109+
field :ordinal, :integer do
110+
config fn _, _ ->
111+
{:ok, topic: "ordinal_topic", ordinal: fn value -> value end}
112+
end
113+
end
108114
end
109115
end

0 commit comments

Comments
 (0)