Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
602 changes: 602 additions & 0 deletions apps/opentelemetry/src/otel_batch_olp.erl

Large diffs are not rendered by default.

454 changes: 50 additions & 404 deletions apps/opentelemetry/src/otel_batch_processor.erl

Large diffs are not rendered by default.

58 changes: 37 additions & 21 deletions apps/opentelemetry/src/otel_exporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,34 @@
export_traces/4,
export_metrics/4,
export_logs/4,
export/5,
shutdown/1,
report_cb/1]).

-export_type([otel_signal/0,
exporter_config/0]).

%% Do any initialization of the exporter here and return configuration
%% that will be passed along with a list of spans to the `export' function.
-callback init(term()) -> {ok, term()} | ignore.
-callback init(Config) -> {ok, ExporterState} | {error, Reason} | ignore when
Config :: term(),
ExporterState :: term(),
Reason :: term().

%% This function is called when the configured interval expires with any
%% spans that have been collected so far and the configuration returned in `init'.
%% Do whatever needs to be done to export each span here, the caller will block
%% until it returns.
-callback export(traces | metrics, ets:tab(), otel_resource:t(), term()) -> ok |
success |
failed_not_retryable |
failed_retryable.
-callback shutdown(term()) -> ok.
-callback export(otel_signal(), ets:tab(), otel_resource:t(), term()) -> ok | error | {error, term()}.

-callback shutdown(State) -> ok when State :: term().

-type otel_signal() :: traces | metrics | logs.
-type exporter_config() :: module() | {module(), Config :: term()} | undefined | none | ignore.

-include_lib("kernel/include/logger.hrl").

-spec init(exporter_config()) -> {module(), term()} | error | ignore.
init({ExporterModule, Config}) when is_atom(ExporterModule) ->
try ExporterModule:init(Config) of
{ok, ExporterState} when ExporterModule =:= opentelemetry_exporter ->
Expand All @@ -49,8 +58,12 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
{ok, ExporterState} ->
?LOG_INFO("Exporter ~tp successfully initialized", [ExporterModule]),
{ExporterModule, ExporterState};
{error, Reason} ->
?LOG_ERROR("Exporter failed to initalize, error: ~p",
[ExporterModule, Reason]),
error;
ignore ->
undefined
ignore
catch
Kind:Reason:StackTrace ->
%% logging in debug level since config argument in stacktrace could have secrets
Expand All @@ -72,14 +85,14 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
%% the dependency needs to be added
try grpcbox:module_info() of
_ ->
undefined
error
catch
_:_ ->
?LOG_WARNING("OTLP exporter failed to initialize when using the GRPC "
"protocol and `grpcbox` module is not available in the "
"code path. Verify that you have the `grpcbox` dependency "
"included and rerun.", []),
undefined
error
end;
_ ->
%% same as the debug log above
Expand All @@ -89,17 +102,17 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
error
end;
{error, undef} when ExporterModule =:= opentelemetry_exporter ->
?LOG_WARNING("OTLP exporter module `opentelemetry_exporter` not found. "
"Verify you have included the `opentelemetry_exporter` dependency.",
[ExporterModule]),
undefined;
error;
{error, undef} ->
?LOG_WARNING("Exporter module ~tp not found. Verify you have included "
"the dependency that contains the exporter module.", [ExporterModule]),
undefined;
error;
_ ->
%% same as the debug log above
%% without the stacktrace and at a higher level
Expand All @@ -108,22 +121,25 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
kind => Kind,
reason => Reason,
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
undefined
error
end
end;
init(Exporter) when Exporter =:= none ; Exporter =:= undefined ->
undefined;
init(Exporter) when Exporter =:= none; Exporter =:= undefined; Exporter =:= ignore ->
ignore;
init(ExporterModule) when is_atom(ExporterModule) ->
init({ExporterModule, []}).

export_traces(ExporterModule, SpansTid, Resource, Config) ->
ExporterModule:export(traces, SpansTid, Resource, Config).
export_traces(ExporterModule, SpansTid, Resource, ExporterState) ->
export(traces, ExporterModule, SpansTid, Resource, ExporterState).

export_metrics(ExporterModule, MetricsTid, Resource, ExporterState) ->
export(metrics, ExporterModule, MetricsTid, Resource, ExporterState).

export_metrics(ExporterModule, MetricsTid, Resource, Config) ->
ExporterModule:export(metrics, MetricsTid, Resource, Config).
export_logs(ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState) ->
export(logs, ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState).

export_logs(ExporterModule, Batch, Resource, Config) ->
ExporterModule:export(logs, Batch, Resource, Config).
export(OtelSignal, ExporterModule, Tid, Resource, ExporterState) ->
ExporterModule:export(OtelSignal, Tid, Resource, ExporterState).

shutdown(undefined) ->
ok;
Expand Down
1 change: 1 addition & 0 deletions apps/opentelemetry/src/otel_exporter_pid.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Pid) ->
ets:foldl(fun(Span, _Acc) ->
Pid ! {span, Span}
end, [], SpansTid),
ets:delete_all_objects(SpansTid),
ok.

shutdown(_) ->
Expand Down
1 change: 1 addition & 0 deletions apps/opentelemetry/src/otel_exporter_stdout.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export(_, SpansTid, _Resource, _) ->
ets:foldl(fun(Span, _Acc) ->
io:format("~p~n", [Span])
end, [], SpansTid),
ets:delete_all_objects(SpansTid),
ok.

shutdown(_) ->
Expand Down
1 change: 1 addition & 0 deletions apps/opentelemetry/src/otel_exporter_tab.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Tid) ->
ets:foldl(fun(Span, _Acc) ->
ets:insert(Tid, Span)
end, [], SpansTid),
ets:delete_all_objects(SpansTid),
ok.

shutdown(_) ->
Expand Down
2 changes: 1 addition & 1 deletion apps/opentelemetry/src/otel_simple_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ export({ExporterModule, Config}, Resource, SpansTid) ->
%% don't let a exporter exception crash us
%% and return true if exporter failed
try
otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config) =:= failed_not_retryable
otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config)
catch
Kind:Reason:StackTrace ->
?LOG_INFO(#{source => exporter,
Expand Down
3 changes: 1 addition & 2 deletions apps/opentelemetry/src/otel_span_processor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@

-callback on_end(opentelemetry:span(), processor_config()) -> true |
dropped |
{error, invalid_span} |
{error, no_export_buffer}.
{error, term()}.

-callback force_flush(processor_config()) -> ok |
{error, term()}.
Expand Down
34 changes: 20 additions & 14 deletions apps/opentelemetry/test/opentelemetry_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ init_per_testcase(disabled_sdk, Config) ->
Config;
init_per_testcase(no_exporter, Config) ->
application:set_env(opentelemetry, processors,
[{otel_batch_processor, #{scheduled_delay_ms => 1}}]),
[{otel_batch_processor, #{scheduled_delay_ms => 1, exporter => none}}]),
{ok, _} = application:ensure_all_started(opentelemetry),
Config;
init_per_testcase(disable_auto_creation, Config) ->
Expand Down Expand Up @@ -119,6 +119,9 @@ init_per_testcase(too_many_attributes, Config) ->
{ok, _} = application:ensure_all_started(opentelemetry),
Config1;
init_per_testcase(tracer_instrumentation_scope, Config) ->
%% Note: this actually mutes a bug / design drawback, as
%% persistent terms are not cleaned / properly refreshed and may keep stale data.
cleanup_persistent_terms(opentelemetry),
Config1 = set_batch_tab_processor(Config),
{ok, _} = application:ensure_all_started(opentelemetry),
Config1;
Expand All @@ -128,7 +131,8 @@ init_per_testcase(multiple_tracer_providers, Config) ->
{ok, _} = application:ensure_all_started(opentelemetry),
Config;
init_per_testcase(multiple_processors, Config) ->
application:set_env(opentelemetry, processors, [{otel_batch_processor, #{scheduled_delay_ms => 1,
application:set_env(opentelemetry, processors, [{otel_batch_processor, #{name => first,
scheduled_delay_ms => 1,
exporter => {otel_exporter_pid, self()}}},
{otel_batch_processor, #{name => second,
scheduled_delay_ms => 1,
Expand Down Expand Up @@ -268,14 +272,14 @@ logger_metadata(_Config) ->

ok.

%% logger metadata will either be undefined, a map without the otel_span_ctx key or
%% with the value of the key being undefined
%% logger metadata will either be undefined, or a map without hex_span_ctx_keys:
%% [otel_trace_id, otel_span_id, otel_trace_flags]
empty_metadata() ->
case logger:get_process_metadata() of
undefined ->
true;
M ->
maps:get(otel_span_ctx, M, #{}) =:= #{}
maps:with(otel_span:hex_span_ctx_keys(), M) =:= #{}
end.

propagator_configuration(_Config) ->
Expand Down Expand Up @@ -589,7 +593,6 @@ update_span_data(Config) ->

ok.


tracer_instrumentation_scope(Config) ->
Tid = ?config(tid, Config),

Expand All @@ -606,7 +609,6 @@ tracer_instrumentation_scope(Config) ->
otel_span:end_span(SpanCtx1),

[Span1] = assert_exported(Tid, SpanCtx1),

?assertMatch({instrumentation_scope,<<"tracer1">>,<<"1.0.0">>,<<"http://schema.org/myschema">>},
Span1#span.instrumentation_scope).

Expand Down Expand Up @@ -1080,21 +1082,16 @@ disabled_sdk(_Config) ->

no_exporter(_Config) ->
SpanCtx1 = ?start_span(<<"span-1">>),

%% set_exporter will enable the export table even if the exporter ends
%% up being undefined to ensure no spans are lost. so briefly spans
%% will be captured
otel_batch_processor:set_exporter(none),
otel_span:end_span(SpanCtx1),

%% once the exporter is "initialized" the table is cleared and disabled
%% future spans are not added
?UNTIL([] =:= otel_batch_processor:current_tab_to_list(otel_batch_processor_global)),
?UNTIL([] =:= otel_batch_olp:current_tab_to_list(otel_batch_processor_global)),

SpanCtx2 = ?start_span(<<"span-2">>),
otel_span:end_span(SpanCtx2),

?assertEqual([], otel_batch_processor:current_tab_to_list(otel_batch_processor_global)),
?assertEqual([], otel_batch_olp:current_tab_to_list(otel_batch_processor_global)),

ok.

Expand All @@ -1118,3 +1115,12 @@ assert_not_exported(Tid, #span_ctx{trace_id=TraceId,
span_id=SpanId,
_='_'})).

cleanup_persistent_terms(Module) ->
lists:foreach(
fun({Key, _}) ->
case is_tuple(Key) andalso element(1, Key) =:= Module of
true -> persistent_term:erase(Key);
false -> ok
end
end,
persistent_term:get()).
72 changes: 37 additions & 35 deletions apps/opentelemetry/test/otel_batch_processor_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

all() ->
[exporting_timeout_test,
check_table_size_test,
exporting_runner_timeout_test].
exporting_runner_timeout_test,
check_table_size_test].

%% verifies that after the runner has to be killed for taking too long
%% that everything is still functional and the exporter does not crash
Expand All @@ -33,48 +33,20 @@ exporting_timeout_test(_Config) ->
ok
end.

check_table_size_test(_Config) ->
MaxQueueSize = 10,
CheckTableSizeMs = 1,
{ok, _Pid, #{reg_name := RegName}} = otel_batch_processor:start_link(
#{name => test_processor_check_size_test,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => timer:minutes(10),
%% long enough, so that it never happens during the test
scheduled_delay_ms => timer:minutes(10),
check_table_size_ms => CheckTableSizeMs,
max_queue_size => MaxQueueSize}
),
%% max_queue_size limit is not reached
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
lists:foreach(fun(_) ->
otel_batch_processor:on_end(generate_span(), #{reg_name => RegName})
end,
lists:seq(1, MaxQueueSize)),
%% Wait for more than CheckTablesizeMS to be sure check timeout occurred
timer:sleep(CheckTableSizeMs * 5),
dropped = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),

otel_batch_processor:force_flush(#{reg_name => RegName}),
%% force_flush is async, have to wait for some long enough time again,
timer:sleep(CheckTableSizeMs * 10),
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}).

exporting_runner_timeout_test(_Config) ->
process_flag(trap_exit, true),

{ok, Pid, #{reg_name := RegName}} = otel_batch_processor:start_link(
#{name => test_processor1,
{ok, Pid, State} = otel_batch_processor:start_link(
#{name => test_processor,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => 1,
scheduled_delay_ms => 1}),

%% Insert a few spans to make sure runner process will be spawned and killed
%% because it hangs for 10 minutes (see export/4 below)
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
true = otel_batch_processor:on_end(generate_span(), State),
true = otel_batch_processor:on_end(generate_span(), State),

receive
{'EXIT', Pid, _} ->
Expand All @@ -85,9 +57,35 @@ exporting_runner_timeout_test(_Config) ->
ok
end.

check_table_size_test(_Config) ->
MaxQueueSize = 10,
{ok, _Pid, State} = otel_batch_processor:start_link(
#{name => test_processor_check_size_test,
resource => otel_resource:create([]),
exporter => ?MODULE,
exporting_timeout_ms => timer:minutes(10),
%% long enough, so that it never happens during the test
scheduled_delay_ms => timer:minutes(10),
max_queue_size => MaxQueueSize}
),
%% max_queue_size limit is not reached
true = otel_batch_processor:on_end(generate_span(), State),

insert_spans(State, MaxQueueSize),

%% Wait a little to give the handler time to transition to the export state
timer:sleep(30),

%% Insert the same number again, rgis time to the next table, as the previous is being exported,
%% exporter is slow (see init_per_testcase), so we can be sure that we will go to the drop mode,
%% with no chance to switch the table this time.
insert_spans(State, MaxQueueSize),

dropped = otel_batch_processor:on_end(generate_span(), State).

%% exporter behaviour

init(_) ->
init(_OtelSignal, _ExporterId, _) ->
{ok, []}.

export(_, _, _, _) ->
Expand All @@ -98,6 +96,10 @@ shutdown(_) ->

%% helpers

insert_spans(State, N) ->
lists:foreach(fun(_) -> otel_batch_processor:on_end(generate_span(), State) end,
lists:seq(1, N)).

generate_span() ->
#span{trace_id = otel_id_generator:generate_trace_id(),
span_id = otel_id_generator:generate_span_id(),
Expand Down
4 changes: 0 additions & 4 deletions apps/opentelemetry_api/src/otel_ctx.erl
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,5 @@ text_map_injector(Key, ToText) ->
text_map_injector_fun(TextMap, Key, ToText) ->
TextMap ++ ToText(?MODULE:get_value(Key, undefined)).

%%

update_logger_process_metadata(undefined) ->
ok;
update_logger_process_metadata(Ctx) ->
otel_tracer:update_logger_process_metadata(Ctx).
Loading