Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions lib/absinthe/subscription/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Absinthe.Subscription.Local do
require Logger

alias Absinthe.Pipeline.BatchResolver
alias Absinthe.{Phase, Pipeline}

# This module handles running and broadcasting documents that are local to this
# node.
Expand All @@ -23,19 +24,42 @@ defmodule Absinthe.Subscription.Local do
def publish_mutation(pubsub, mutation_result, subscribed_fields) do
docs_and_topics =
for {field, key_strategy} <- subscribed_fields,
{topic, doc} <- get_docs(pubsub, field, mutation_result, key_strategy) do
{topic, doc} <- get_docs_with_telemetry(pubsub, field, mutation_result, key_strategy) do
{topic, key_strategy, doc}
end

run_docset_fn =
if function_exported?(pubsub, :run_docset, 3), do: &pubsub.run_docset/3, else: &run_docset/3

run_docset_fn.(pubsub, docs_and_topics, mutation_result)
:telemetry.span(
[:absinthe, :subscription, :local, :run_docset],
%{
run_docset_fn: run_docset_fn,
mutation_result: mutation_result,
docs_and_topics: docs_and_topics
},
fn ->
{run_docset_fn.(pubsub, docs_and_topics, mutation_result), %{}}
end
)

:ok
end

alias Absinthe.{Phase, Pipeline}
defp get_docs_with_telemetry(pubsub, field, mutation_result, key_strategy) do
:telemetry.span(
[:absinthe, :subscription, :local, :get_docs],
%{
pubsub: pubsub,
field: field,
mutation_result: mutation_result,
key_strategy: key_strategy
},
fn ->
{get_docs(pubsub, field, mutation_result, key_strategy), %{}}
end
)
end

defp run_docset(pubsub, docs_and_topics, mutation_result) do
for {topic, key_strategy, doc} <- docs_and_topics do
Expand Down
33 changes: 32 additions & 1 deletion test/absinthe/execution/subscription_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,11 @@ defmodule Absinthe.Execution.SubscriptionTest do
[:absinthe, :execute, :operation, :start],
[:absinthe, :execute, :operation, :stop],
[:absinthe, :subscription, :publish, :start],
[:absinthe, :subscription, :publish, :stop]
[:absinthe, :subscription, :publish, :stop],
[:absinthe, :subscription, :local, :get_docs, :start],
[:absinthe, :subscription, :local, :run_docset, :start],
[:absinthe, :subscription, :local, :get_docs, :stop],
[:absinthe, :subscription, :local, :run_docset, :stop]
],
&Absinthe.TestTelemetryHelper.send_to_pid/4,
%{}
Expand Down Expand Up @@ -823,6 +827,33 @@ defmodule Absinthe.Execution.SubscriptionTest do
assert_receive {:telemetry_event,
{[:absinthe, :subscription, :publish, :stop], _, %{id: ^id}, _config}}

assert_receive {:telemetry_event,
{[:absinthe, :subscription, :local, :get_docs, :start], _, metadata, _config}}

assert %{pubsub: PubSub, field: :thing, mutation_result: "foo", key_strategy: "abc"} ==
Map.drop(metadata, [:telemetry_span_context])

assert is_reference(metadata.telemetry_span_context)

assert_receive {:telemetry_event,
{[:absinthe, :subscription, :local, :get_docs, :stop], _, metadata, _config}}

assert %{} == Map.drop(metadata, [:telemetry_span_context])
assert is_reference(metadata.telemetry_span_context)

assert_receive {:telemetry_event,
{[:absinthe, :subscription, :local, :run_docset, :start], _, metadata,
_config}}

assert is_list(metadata.docs_and_topics)
assert metadata.mutation_result == "foo"
assert is_function(metadata.run_docset_fn, 3)

assert_receive {:telemetry_event,
{[:absinthe, :subscription, :local, :run_docset, :stop], _, metadata, _config}}

assert is_reference(metadata.telemetry_span_context)

:telemetry.detach(context.test)
end

Expand Down