Skip to content

Commit 4118aca

Browse files
authored
Merge pull request #10 from SimpleBet/attributes_and_testing
Added OTLP Attributes and testing
2 parents c086658 + 89a7035 commit 4118aca

24 files changed

+1181
-85
lines changed

lib/commanded/aggregate.ex

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,23 @@ defmodule OpentelemetryCommanded.Aggregate do
3434

3535
def handle_start(_event, _, meta, _) do
3636
context = meta.execution_context
37-
trace_headers = decode_headers(context.metadata["trace_ctx"])
38-
:otel_propagator_text_map.extract(trace_headers)
37+
38+
safe_context_propagation(context.metadata["trace_ctx"])
3939

4040
attributes = [
41-
"command.type": struct_name(context.command),
42-
"command.handler": context.handler,
43-
"aggregate.uuid": meta.aggregate_uuid,
44-
"aggregate.version": meta.aggregate_version,
45-
application: meta.application,
46-
"causation.id": context.causation_id,
47-
"correlation.id": context.correlation_id,
48-
"aggregate.function": context.function,
49-
"aggregate.lifespan": context.lifespan
41+
"commanded.aggregate_uuid": meta.aggregate_uuid,
42+
"commanded.aggregate_version": meta.aggregate_version,
43+
"commanded.application": meta.application,
44+
"commanded.causation_id": context.causation_id,
45+
"commanded.command": struct_name(context.command),
46+
"commanded.correlation_id": context.correlation_id,
47+
"commanded.function": context.function,
48+
"messaging.conversation_id": context.correlation_id,
49+
"messaging.destination": context.handler,
50+
"messaging.destination_kind": "aggregate",
51+
"messaging.message_id": context.causation_id,
52+
"messaging.operation": "receive",
53+
"messaging.system": "commanded"
5054
]
5155

5256
OpentelemetryTelemetry.start_telemetry_span(
@@ -65,7 +69,7 @@ defmodule OpentelemetryCommanded.Aggregate do
6569
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)
6670

6771
events = Map.get(meta, :events, [])
68-
Span.set_attribute(ctx, :"event.count", Enum.count(events))
72+
Span.set_attribute(ctx, :"commanded.event_count", Enum.count(events))
6973

7074
if error = meta[:error] do
7175
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error)))

lib/commanded/application.ex

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,20 @@ defmodule OpentelemetryCommanded.Application do
2828
def handle_start(_event, _, meta, _) do
2929
context = meta.execution_context
3030

31+
safe_context_propagation(context.metadata["trace_ctx"])
32+
3133
attributes = [
32-
"command.type": struct_name(context.command),
33-
"command.handler": context.handler,
34-
application: meta.application,
35-
"causation.id": context.causation_id,
36-
"correlation.id": context.correlation_id,
37-
"aggregate.function": context.function,
38-
"aggregate.lifespan": context.lifespan
34+
"commanded.application": meta.application,
35+
"commanded.causation_id": context.causation_id,
36+
"commanded.command": struct_name(context.command),
37+
"commanded.correlation_id": context.correlation_id,
38+
"commanded.function": context.function,
39+
"messaging.conversation_id": context.correlation_id,
40+
"messaging.destination": context.handler,
41+
"messaging.destination_kind": "command_handler",
42+
"messaging.message_id": context.causation_id,
43+
"messaging.operation": "receive",
44+
"messaging.system": "commanded"
3945
]
4046

4147
OpentelemetryTelemetry.start_telemetry_span(

lib/commanded/event_handler.ex

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,28 @@ defmodule OpentelemetryCommanded.EventHandler do
3333
end
3434

3535
def handle_start(_event, _measurements, meta, _) do
36-
event = meta.recorded_event
37-
trace_headers = decode_headers(event.metadata["trace_ctx"])
38-
:otel_propagator_text_map.extract(trace_headers)
36+
recorded_event = meta.recorded_event
37+
38+
safe_context_propagation(recorded_event.metadata["trace_ctx"])
3939

4040
attributes = [
41-
"causation.id": event.causation_id,
42-
"correlation.id": event.correlation_id,
43-
"event.id": event.event_id,
44-
"event.number": event.event_number,
45-
"event.type": event.event_type,
46-
"stream.id": event.stream_id,
47-
"stream.version": event.stream_version,
48-
application: meta.application,
41+
"commanded.application": meta.application,
42+
"commanded.causation_id": recorded_event.causation_id,
43+
"commanded.correlation_id": recorded_event.correlation_id,
44+
"commanded.event": recorded_event.event_type,
45+
"commanded.event_id": recorded_event.event_id,
46+
"commanded.event_number": recorded_event.event_number,
47+
"commanded.handler_name": meta.handler_name,
48+
"commanded.stream_id": recorded_event.stream_id,
49+
"commanded.stream_version": recorded_event.stream_version,
50+
"messaging.conversation_id": recorded_event.correlation_id,
51+
"messaging.destination": meta.handler_module,
52+
"messaging.destination_kind": "event_handler",
53+
"messaging.message_id": recorded_event.causation_id,
54+
"messaging.operation": "receive",
55+
"messaging.system": "commanded"
4956
# TODO add back
5057
# consistency: meta.consistency,
51-
"handler.module": meta.handler_module,
52-
"handler.name": meta.handler_name
5358
# TODO add this back into commanded
5459
# "event.last_seen": meta.last_seen_event
5560
]

lib/commanded/event_store.ex

Lines changed: 90 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -3,48 +3,105 @@ defmodule OpentelemetryCommanded.EventStore do
33

44
require OpenTelemetry.Tracer
55

6-
alias OpenTelemetry.{Tracer, Span}
6+
import OpentelemetryCommanded.Util
7+
8+
alias OpenTelemetry.Span
9+
10+
@tracer_id __MODULE__
711

812
def setup do
9-
:telemetry.attach_many(
10-
{__MODULE__, :stop},
11-
[
12-
[:commanded, :event_store, :stream_forward, :stop],
13-
[:commanded, :event_store, :append_to_stream, :stop]
14-
],
15-
&__MODULE__.handle_stop/4,
16-
[]
17-
)
13+
~w(
14+
ack_event
15+
adapter
16+
append_to_stream
17+
delete_snapshot
18+
delete_subscription
19+
read_snapshot
20+
record_snapshot
21+
stream_forward
22+
stream_forward
23+
stream_forward
24+
subscribe
25+
subscribe_to
26+
subscribe_to
27+
unsubscribe
28+
)a
29+
|> Enum.each(fn event ->
30+
:telemetry.attach(
31+
{__MODULE__, :start},
32+
[:commanded, :event_store, event, :start],
33+
&__MODULE__.handle_start/4,
34+
[]
35+
)
1836

19-
:telemetry.attach_many(
20-
{__MODULE__, :exception},
21-
[
22-
[:commanded, :event_store, :stream_forward, :exception],
23-
[:commanded, :event_store, :append_to_stream, :exception]
24-
],
25-
&__MODULE__.handle_stop/4,
26-
[]
27-
)
37+
:telemetry.attach(
38+
{__MODULE__, :stop},
39+
[:commanded, :event_store, event, :stop],
40+
&__MODULE__.handle_stop/4,
41+
[]
42+
)
43+
44+
:telemetry.attach(
45+
{__MODULE__, :exception},
46+
[:commanded, :event_store, event, :exception],
47+
&__MODULE__.handle_exception/4,
48+
[]
49+
)
50+
end)
2851
end
2952

30-
def handle_stop([_, _, action, type], measurements, meta, _) do
31-
end_time = :opentelemetry.timestamp()
32-
start_time = end_time - measurements.duration
33-
attributes = meta |> Map.take([:application, :stream_uuid]) |> Enum.to_list()
34-
span_name = :"commanded.event_store.#{action}"
53+
def handle_start([_, _, action, _type], _measurements, meta, _) do
54+
event = meta.event
55+
56+
safe_context_propagation(event.metadata["trace_ctx"])
57+
58+
attributes = [
59+
"commanded.application": meta.application,
60+
"commanded.causation_id": event.causation_id,
61+
"commanded.correlation_id": event.correlation_id,
62+
"commanded.event": event.event_type,
63+
"commanded.event_id": event.event_id,
64+
"commanded.event_number": event.event_number,
65+
"commanded.stream_id": event.stream_id,
66+
"commanded.stream_version": event.stream_version
67+
]
3568

36-
Tracer.start_span(span_name, %{start_time: start_time, attributes: attributes})
69+
OpentelemetryTelemetry.start_telemetry_span(
70+
@tracer_id,
71+
"commanded.event_store.#{action}",
72+
meta,
73+
%{
74+
kind: :internal,
75+
attributes: attributes
76+
}
77+
)
78+
end
3779

38-
if type == :exception do
39-
ctx = Tracer.current_span_ctx()
40-
reason = meta[:reason]
41-
stacktrace = meta[:stacktrace]
80+
def handle_stop(_event, _measurements, meta, _) do
81+
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)
4282

43-
exception = Exception.normalize(meta[:kind], reason, stacktrace)
44-
Span.record_exception(ctx, exception, stacktrace)
45-
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
83+
if error = meta[:error] do
84+
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error)))
4685
end
4786

48-
Tracer.end_span()
87+
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
88+
end
89+
90+
def handle_exception(
91+
_event,
92+
_measurements,
93+
%{kind: kind, reason: reason, stacktrace: stacktrace} = meta,
94+
_config
95+
) do
96+
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)
97+
98+
# try to normalize all errors to Elixir exceptions
99+
exception = Exception.normalize(kind, reason, stacktrace)
100+
101+
# record exception and mark the span as errored
102+
Span.record_exception(ctx, exception, stacktrace)
103+
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason)))
104+
105+
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
49106
end
50107
end

lib/commanded/process_manager.ex

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,30 @@ defmodule OpentelemetryCommanded.ProcessManager do
3333
end
3434

3535
def handle_start(_event, _, meta, _) do
36-
event = meta.recorded_event
37-
trace_headers = decode_headers(event.metadata["trace_ctx"])
38-
:otel_propagator_text_map.extract(trace_headers)
36+
recorded_event = meta.recorded_event
37+
safe_context_propagation(recorded_event.metadata["trace_ctx"])
3938

4039
attributes = [
41-
application: meta.application,
42-
"process_manager.uuid": meta.process_uuid,
43-
"process_manager.name": meta.process_manager_name,
44-
"process_manager.module": meta.process_manager_module,
45-
"event.id": event.event_id,
46-
"event.number": event.event_number,
47-
"event.type": event.event_type,
48-
"correlation.id": event.correlation_id,
49-
"causation.id": event.causation_id,
50-
"stream.id": event.stream_id,
51-
"stream.version": event.stream_version
40+
"commanded.application": meta.application,
41+
"commanded.causation_id": recorded_event.causation_id,
42+
"commanded.correlation_id": recorded_event.correlation_id,
43+
"commanded.event": recorded_event.event_type,
44+
"commanded.event_id": recorded_event.event_id,
45+
"commanded.event_number": recorded_event.event_number,
46+
"commanded.handler_name": meta.process_manager_name,
47+
"commanded.process_uuid": meta.process_uuid,
48+
"commanded.stream_id": recorded_event.stream_id,
49+
"commanded.stream_version": recorded_event.stream_version,
50+
"messaging.conversation_id": recorded_event.correlation_id,
51+
"messaging.destination": meta.process_manager_module,
52+
"messaging.destination_kind": "process_manager",
53+
"messaging.message_id": recorded_event.causation_id,
54+
"messaging.operation": "receive",
55+
"messaging.system": "commanded"
56+
# TODO add back
57+
# consistency: meta.consistency,
58+
# TODO add this back into commanded
59+
# "event.last_seen": meta.last_seen_event
5260
]
5361

5462
OpentelemetryTelemetry.start_telemetry_span(
@@ -67,7 +75,11 @@ defmodule OpentelemetryCommanded.ProcessManager do
6775
ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)
6876

6977
commands = Map.get(meta, :commands, [])
70-
Span.set_attribute(ctx, :"command.count", Enum.count(commands))
78+
Span.set_attribute(ctx, :"commanded.command_count", Enum.count(commands))
79+
80+
if error = meta[:error] do
81+
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error)))
82+
end
7183

7284
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
7385
end
@@ -85,7 +97,7 @@ defmodule OpentelemetryCommanded.ProcessManager do
8597

8698
# record exception and mark the span as errored
8799
Span.record_exception(ctx, exception, stacktrace)
88-
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
100+
Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason)))
89101

90102
OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
91103
end

lib/commanded/util.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
defmodule OpentelemetryCommanded.Util do
22
@moduledoc false
33

4+
def safe_context_propagation(trace_ctx) when is_nil(trace_ctx) do
5+
nil
6+
end
7+
8+
def safe_context_propagation(trace_ctx) do
9+
trace_ctx
10+
|> decode_headers()
11+
|> :otel_propagator_text_map.extract()
12+
end
13+
414
def encode_headers(headers), do: Enum.map(headers, &Tuple.to_list/1)
515

616
def decode_headers(headers), do: Enum.map(headers, &List.to_tuple/1)

mix.exs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ defmodule OpentelemetryCommanded.MixProject do
66
app: :opentelemetry_commanded,
77
version: "0.2.0",
88
elixir: "~> 1.10",
9+
elixirc_paths: elixirc_paths(Mix.env()),
910
start_permanent: Mix.env() == :prod,
1011
package: package(),
1112
deps: deps(),
@@ -15,6 +16,15 @@ defmodule OpentelemetryCommanded.MixProject do
1516
]
1617
end
1718

19+
defp elixirc_paths(env) when env in [:test],
20+
do: [
21+
"lib",
22+
"test/support",
23+
"test/dummy_app"
24+
]
25+
26+
defp elixirc_paths(_env), do: ["lib"]
27+
1828
# Run "mix help compile.app" to learn about applications.
1929
def application do
2030
[]
@@ -29,10 +39,15 @@ defmodule OpentelemetryCommanded.MixProject do
2939

3040
defp deps do
3141
[
32-
{:commanded, "~> 1.3.1"},
42+
{:commanded,
43+
github: "commanded/commanded",
44+
ref: "75b19cb3a994aa36984b63bd9b5bffab4d6f8310"},
45+
# {:commanded, "~> 1.3.1"},
3346
{:opentelemetry_telemetry, "~> 1.0.0-beta.7"},
3447
{:telemetry, "~> 1.0"},
3548
{:opentelemetry, "~> 1.0"},
49+
{:jason, "~> 1.2", only: :test},
50+
{:ecto, "~> 3.7.1", only: :test},
3651
{:ex_doc, "~> 0.23.0", only: [:dev], runtime: false}
3752
]
3853
end

mix.lock

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
%{
22
"backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"},
3-
"commanded": {:hex, :commanded, "1.3.1", "d18a73bface68c04cbbda69647604a3cc1918fbdf8af4a784fc3a3a30ca34a13", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:jason, "~> 1.2", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "9bd03ef6fc05e3a8fb4d0808f13a2106688e60ee4b2bdb78cf7e63a6788c9faf"},
3+
"commanded": {:git, "https://github.com/commanded/commanded.git", "75b19cb3a994aa36984b63bd9b5bffab4d6f8310", [ref: "75b19cb3a994aa36984b63bd9b5bffab4d6f8310"]},
4+
"decimal": {:hex, :decimal, "2.0.0", "a78296e617b0f5dd4c6caf57c714431347912ffb1d0842e998e9792b5642d697", [:mix], [], "hexpm", "34666e9c55dea81013e77d9d87370fe6cb6291d1ef32f46a1600230b1d44f577"},
45
"earmark_parser": {:hex, :earmark_parser, "1.4.10", "6603d7a603b9c18d3d20db69921527f82ef09990885ed7525003c7fe7dc86c56", [:mix], [], "hexpm", "8e2d5370b732385db2c9b22215c3f59c84ac7dda7ed7e544d7c459496ae519c0"},
6+
"ecto": {:hex, :ecto, "3.7.1", "a20598862351b29f80f285b21ec5297da1181c0442687f9b8329f0445d228892", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d36e5b39fc479e654cffd4dbe1865d9716e4a9b6311faff799b6f90ab81b8638"},
57
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
68
"ex_doc": {:hex, :ex_doc, "0.23.0", "a069bc9b0bf8efe323ecde8c0d62afc13d308b1fa3d228b65bca5cf8703a529d", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}], "hexpm", "f5e2c4702468b2fd11b10d39416ddadd2fcdd173ba2a0285ebd92c39827a5a16"},
9+
"jason": {:hex, :jason, "1.3.0", "fa6b82a934feb176263ad2df0dbd91bf633d4a46ebfdffea0c8ae82953714946", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "53fc1f51255390e0ec7e50f9cb41e751c260d065dcba2bf0d08dc51a4002c2ac"},
710
"makeup": {:hex, :makeup, "1.0.5", "d5a830bc42c9800ce07dd97fa94669dfb93d3bf5fcf6ea7a0c67b2e0e4a7f26c", [:mix], [{:nimble_parsec, "~> 0.5 or ~> 1.0", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "cfa158c02d3f5c0c665d0af11512fed3fba0144cf1aadee0f2ce17747fba2ca9"},
811
"makeup_elixir": {:hex, :makeup_elixir, "0.15.0", "98312c9f0d3730fde4049985a1105da5155bfe5c11e47bdc7406d88e01e4219b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.1", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "75ffa34ab1056b7e24844c90bfc62aaf6f3a37a15faa76b07bc5eba27e4a8b4a"},
912
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},

0 commit comments

Comments
 (0)