Skip to content

Commit df549cc

Browse files
committed
remove temporary function and use pattern matching instead
1 parent 468e491 commit df549cc

File tree

5 files changed

+55
-95
lines changed

5 files changed

+55
-95
lines changed

lib/rig/event_stream/kafka_to_filter.ex

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ defmodule Rig.EventStream.KafkaToFilter do
55
"""
66
use Rig.KafkaConsumerSetup
77

8+
import RIG.Tracing.CloudEvent
9+
810
alias Rig.EventFilter
911
alias RIG.Tracing
1012

@@ -19,10 +21,10 @@ defmodule Rig.EventStream.KafkaToFilter do
1921
def kafka_handler(body, headers) do
2022
case Cloudevents.from_kafka_message(body, headers) do
2123
{:ok, cloud_event} ->
22-
Tracing.CloudEvent.with_child_span_temp "kafka_to_filter", cloud_event do
24+
Tracing.CloudEvent.with_child_span "kafka_to_filter", cloud_event do
2325
cloud_event =
2426
cloud_event
25-
|> Tracing.append_context_with_mode(Tracing.context(), :private)
27+
|> Tracing.append_context(Tracing.context(), :private)
2628

2729
Logger.debug(fn -> inspect(cloud_event) end)
2830
EventFilter.forward_event(cloud_event)

lib/rig/event_stream/kafka_to_http.ex

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ defmodule Rig.EventStream.KafkaToHttp do
55
"""
66
use Rig.KafkaConsumerSetup, [:targets]
77

8+
import RIG.Tracing.CloudEvent
9+
810
alias HTTPoison
911
alias RIG.Tracing
1012
alias RigCloudEvents.CloudEvent

lib/rig/event_stream/kinesis_to_filter.ex

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ defmodule Rig.EventStream.KinesisToFilter do
44
55
"""
66

7+
import RIG.Tracing.CloudEvent
8+
79
alias Rig.EventFilter
810
alias RIG.Tracing
911
alias RigCloudEvents.CloudEvent
@@ -23,7 +25,7 @@ defmodule Rig.EventStream.KinesisToFilter do
2325
Tracing.CloudEvent.with_child_span "kinesis_to_filter", cloud_event do
2426
cloud_event =
2527
cloud_event
26-
|> Tracing.append_context_with_mode(Tracing.context(), :private)
28+
|> Tracing.append_context(Tracing.context(), :private)
2729

2830
Logger.debug(fn -> inspect(cloud_event.parsed) end)
2931
EventFilter.forward_event(cloud_event)

lib/rig/tracing.ex

Lines changed: 20 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -71,35 +71,23 @@ defmodule RIG.Tracing do
7171

7272
# ---
7373

74-
@spec append_context_with_mode(CloudEvent.t(), t(), mode :: atom()) ::
75-
CloudEvent.t()
76-
def append_context_with_mode(%CloudEvent{} = cloudevent, context, mode) do
77-
cloudevent =
78-
cloudevent.json
79-
|> Jason.decode!()
80-
|> append_context(context)
81-
|> CloudEvent.parse!()
82-
83-
case mode do
84-
:private ->
85-
Logger.debug(fn -> "private mode, remove tracestate." end)
86-
remove_tracestate(cloudevent)
74+
def append_context(a, b, mode \\ :public)
8775

88-
_ ->
89-
cloudevent
90-
end
76+
@spec append_context(CloudEvent.t(), t(), mode :: atom()) :: CloudEvent.t()
77+
def append_context(%CloudEvent{} = cloudevent, context, mode) do
78+
cloudevent.json
79+
|> Jason.decode!()
80+
|> append_context(context, mode)
81+
|> CloudEvent.parse!()
9182
end
9283

93-
# Temporary function for Kafka as we are transitioning to new cloudevents library,
94-
# once everything is migrated, we can remove function above
95-
@spec append_context_with_mode(Cloudevents.t(), t(), mode :: atom()) ::
96-
CloudEvent.t()
97-
def append_context_with_mode(cloudevent, context, mode) do
98-
{:ok, cloudevent} =
99-
cloudevent
100-
|> append_context(context)
101-
|> Cloudevents.to_json()
102-
|> CloudEvent.parse()
84+
@spec append_context(map, t()) :: map
85+
def append_context(%{} = map, context, mode) do
86+
cloudevent =
87+
Enum.reduce(context, map, fn trace_header, acc ->
88+
{key, val} = trace_header
89+
Map.put(acc, key, val)
90+
end)
10391

10492
case mode do
10593
:private ->
@@ -113,23 +101,17 @@ defmodule RIG.Tracing do
113101

114102
# ---
115103

116-
def append_context(a, b, mode \\ :public)
117-
118-
@spec append_context(map, t()) :: map
119-
def append_context(%{} = map, context, _mode) do
120-
Enum.reduce(context, map, fn trace_header, acc ->
121-
{key, val} = trace_header
122-
Map.put(acc, key, val)
123-
end)
124-
end
125-
126-
# ---
127-
128104
@spec remove_tracestate(CloudEvent.t()) :: CloudEvent.t()
129105
defp remove_tracestate(%CloudEvent{} = cloudevent) do
130106
cloudevent.json
131107
|> Jason.decode!()
132108
|> Map.delete("tracestate")
133109
|> CloudEvent.parse!()
134110
end
111+
112+
@spec remove_tracestate(map) :: map
113+
defp remove_tracestate(%{} = cloudevent) do
114+
cloudevent
115+
|> Map.delete("tracestate")
116+
end
135117
end

lib/rig/tracing/cloud_event.ex

Lines changed: 26 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule RIG.Tracing.CloudEvent do
22
@moduledoc """
33
Distributed Tracing instrumenter for cloudevents
44
"""
5+
alias RigCloudEvents.CloudEvent
56
alias RigCloudEvents.Parser.PartialParser
67

78
@doc "Like Opencensus.Trace.with_child_span (https://hexdocs.pm/opencensus_elixir/Opencensus.Trace.html#with_child_span/3),
@@ -24,19 +25,7 @@ defmodule RIG.Tracing.CloudEvent do
2425
})
2526

2627
quote do
27-
tracecontext = []
28-
29-
tracecontext =
30-
case PartialParser.context_attribute(unquote(event).parsed, "traceparent") do
31-
{:ok, traceparent} -> Enum.concat(tracecontext, %{"traceparent" => traceparent})
32-
_ -> tracecontext
33-
end
34-
35-
tracecontext =
36-
case PartialParser.context_attribute(unquote(event).parsed, "tracestate") do
37-
{:ok, tracestate} -> Enum.concat(tracecontext, %{"tracestate" => tracestate})
38-
_ -> tracecontext
39-
end
28+
tracecontext = compute_context(unquote(event))
4029

4130
parent_span_ctx = :oc_propagation_http_tracecontext.from_headers(tracecontext)
4231

@@ -58,54 +47,37 @@ defmodule RIG.Tracing.CloudEvent do
5847

5948
# ---
6049

61-
# temporary function to handle new cloudevents library for Kafka
62-
defmacro with_child_span_temp(label, event, attributes \\ quote(do: %{}), do: block) do
63-
line = __CALLER__.line
64-
module = __CALLER__.module
65-
file = __CALLER__.file
66-
function = format_function(__CALLER__.function)
67-
68-
computed_attributes =
69-
compute_attributes(attributes, %{
70-
line: line,
71-
module: module,
72-
file: file,
73-
function: function
74-
})
75-
76-
quote do
77-
tracecontext = []
78-
79-
tracecontext =
80-
case Map.get(unquote(event), "traceparent") do
81-
nil -> tracecontext
82-
traceparent -> Enum.concat(tracecontext, %{"traceparent" => traceparent})
83-
end
84-
85-
tracecontext =
86-
case Map.get(unquote(event), "tracestate") do
87-
nil -> tracecontext
88-
tracestate -> Enum.concat(tracecontext, %{"tracestate" => tracestate})
89-
end
50+
def compute_context(%CloudEvent{} = event) do
51+
tracecontext = []
9052

91-
parent_span_ctx = :oc_propagation_http_tracecontext.from_headers(tracecontext)
92-
93-
new_span_ctx =
94-
:oc_trace.start_span(unquote(label), parent_span_ctx, %{
95-
:attributes => unquote(computed_attributes)
96-
})
53+
tracecontext =
54+
case PartialParser.context_attribute(event.parsed, "traceparent") do
55+
{:ok, traceparent} -> Enum.concat(tracecontext, %{"traceparent" => traceparent})
56+
_ -> tracecontext
57+
end
9758

98-
:ocp.with_span_ctx(new_span_ctx)
59+
case PartialParser.context_attribute(event.parsed, "tracestate") do
60+
{:ok, tracestate} -> Enum.concat(tracecontext, %{"tracestate" => tracestate})
61+
_ -> tracecontext
62+
end
63+
end
9964

100-
try do
101-
unquote(block)
102-
after
103-
:oc_trace.finish_span(new_span_ctx)
104-
:ocp.with_span_ctx(parent_span_ctx)
65+
def compute_context(%{} = event) do
66+
tracecontext = []
67+
tracecontext =
68+
case Map.get(event, "traceparent") do
69+
nil -> tracecontext
70+
traceparent -> Enum.concat(tracecontext, %{"traceparent" => traceparent})
10571
end
72+
73+
case Map.get(event, "tracestate") do
74+
nil -> tracecontext
75+
tracestate -> Enum.concat(tracecontext, %{"tracestate" => tracestate})
10676
end
10777
end
10878

79+
# ---
80+
10981
defp format_function(nil), do: nil
11082
defp format_function({name, arity}), do: "#{name}/#{arity}"
11183

0 commit comments

Comments
 (0)