Skip to content

Commit 468e491

Browse files
committed
307 - reverted some changes
1 parent 17329ce commit 468e491

File tree

3 files changed

+54
-2
lines changed

3 files changed

+54
-2
lines changed

lib/rig/event_stream/kafka_to_filter.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ defmodule Rig.EventStream.KafkaToFilter do
1919
def kafka_handler(body, headers) do
2020
case Cloudevents.from_kafka_message(body, headers) do
2121
{:ok, cloud_event} ->
22-
Tracing.CloudEvent.with_child_span "kafka_to_filter", cloud_event do
22+
Tracing.CloudEvent.with_child_span_temp "kafka_to_filter", cloud_event do
2323
cloud_event =
2424
cloud_event
2525
|> Tracing.append_context_with_mode(Tracing.context(), :private)

lib/rig/tracing.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ defmodule RIG.Tracing do
9090
end
9191
end
9292

93-
# Temporary function for Kafka as we are transitioning to new cloudevents library, once everything is migrated, we can remove function above
93+
# Temporary function for Kafka as we are transitioning to new cloudevents library,
94+
# once everything is migrated, we can remove function above
9495
@spec append_context_with_mode(Cloudevents.t(), t(), mode :: atom()) ::
9596
CloudEvent.t()
9697
def append_context_with_mode(cloudevent, context, mode) do

lib/rig/tracing/cloud_event.ex

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule RIG.Tracing.CloudEvent do
22
@moduledoc """
33
Distributed Tracing instrumenter for cloudevents
44
"""
5+
alias RigCloudEvents.Parser.PartialParser
56

67
@doc "Like Opencensus.Trace.with_child_span (https://hexdocs.pm/opencensus_elixir/Opencensus.Trace.html#with_child_span/3),
78
but for cloudevents distributed tracing extension defined in https://github.com/cloudevents/spec/blob/master/extensions/distributed-tracing.md.
@@ -14,6 +15,56 @@ defmodule RIG.Tracing.CloudEvent do
1415
file = __CALLER__.file
1516
function = format_function(__CALLER__.function)
1617

18+
computed_attributes =
19+
compute_attributes(attributes, %{
20+
line: line,
21+
module: module,
22+
file: file,
23+
function: function
24+
})
25+
26+
quote do
27+
tracecontext = []
28+
29+
tracecontext =
30+
case PartialParser.context_attribute(unquote(event).parsed, "traceparent") do
31+
{:ok, traceparent} -> Enum.concat(tracecontext, %{"traceparent" => traceparent})
32+
_ -> tracecontext
33+
end
34+
35+
tracecontext =
36+
case PartialParser.context_attribute(unquote(event).parsed, "tracestate") do
37+
{:ok, tracestate} -> Enum.concat(tracecontext, %{"tracestate" => tracestate})
38+
_ -> tracecontext
39+
end
40+
41+
parent_span_ctx = :oc_propagation_http_tracecontext.from_headers(tracecontext)
42+
43+
new_span_ctx =
44+
:oc_trace.start_span(unquote(label), parent_span_ctx, %{
45+
:attributes => unquote(computed_attributes)
46+
})
47+
48+
:ocp.with_span_ctx(new_span_ctx)
49+
50+
try do
51+
unquote(block)
52+
after
53+
:oc_trace.finish_span(new_span_ctx)
54+
:ocp.with_span_ctx(parent_span_ctx)
55+
end
56+
end
57+
end
58+
59+
# ---
60+
61+
# temporary function to handle new cloudevents library for Kafka
62+
defmacro with_child_span_temp(label, event, attributes \\ quote(do: %{}), do: block) do
63+
line = __CALLER__.line
64+
module = __CALLER__.module
65+
file = __CALLER__.file
66+
function = format_function(__CALLER__.function)
67+
1768
computed_attributes =
1869
compute_attributes(attributes, %{
1970
line: line,

0 commit comments

Comments
 (0)