Skip to content

Commit 15ea883

Browse files
committed
refactor: implememnt common batch overload protection module and use it in log_handler
1 parent 8e86b3b commit 15ea883

15 files changed

+1632
-717
lines changed

apps/opentelemetry/src/otel_batch_olp.erl

Lines changed: 602 additions & 0 deletions
Large diffs are not rendered by default.

apps/opentelemetry/src/otel_batch_processor.erl

Lines changed: 50 additions & 396 deletions
Large diffs are not rendered by default.

apps/opentelemetry/src/otel_exporter.erl

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,25 +21,34 @@
2121
export_traces/4,
2222
export_metrics/4,
2323
export_logs/4,
24+
export/5,
2425
shutdown/1,
2526
report_cb/1]).
2627

28+
-export_type([otel_signal/0,
29+
exporter_config/0]).
30+
2731
%% Do any initialization of the exporter here and return configuration
2832
%% that will be passed along with a list of spans to the `export' function.
29-
-callback init(term()) -> {ok, term()} | ignore.
33+
-callback init(Config) -> {ok, ExporterState} | {error, Reason} | ignore when
34+
Config :: term(),
35+
ExporterState :: term(),
36+
Reason :: term().
3037

3138
%% This function is called when the configured interval expires with any
3239
%% spans that have been collected so far and the configuration returned in `init'.
3340
%% Do whatever needs to be done to export each span here, the caller will block
3441
%% until it returns.
35-
-callback export(traces | metrics, ets:tab(), otel_resource:t(), term()) -> ok |
36-
success |
37-
failed_not_retryable |
38-
failed_retryable.
39-
-callback shutdown(term()) -> ok.
42+
-callback export(otel_signal(), ets:tab(), otel_resource:t(), term()) -> ok | error | {error, term()}.
43+
44+
-callback shutdown(State) -> ok when State :: term().
45+
46+
-type otel_signal() :: traces | metrics | logs.
47+
-type exporter_config() :: module() | {module(), Config :: term()} | undefined | none | ignore.
4048

4149
-include_lib("kernel/include/logger.hrl").
4250

51+
-spec init(exporter_config()) -> {module(), term()} | error | ignore.
4352
init({ExporterModule, Config}) when is_atom(ExporterModule) ->
4453
try ExporterModule:init(Config) of
4554
{ok, ExporterState} when ExporterModule =:= opentelemetry_exporter ->
@@ -49,8 +58,12 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
4958
{ok, ExporterState} ->
5059
?LOG_INFO("Exporter ~tp successfully initialized", [ExporterModule]),
5160
{ExporterModule, ExporterState};
61+
{error, Reason} ->
62+
?LOG_ERROR("Exporter failed to initalize, error: ~p",
63+
[ExporterModule, Reason]),
64+
error;
5265
ignore ->
53-
undefined
66+
ignore
5467
catch
5568
Kind:Reason:StackTrace ->
5669
%% logging in debug level since config argument in stacktrace could have secrets
@@ -72,14 +85,14 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
7285
%% the dependency needs to be added
7386
try grpcbox:module_info() of
7487
_ ->
75-
undefined
88+
error
7689
catch
7790
_:_ ->
7891
?LOG_WARNING("OTLP exporter failed to initialize when using the GRPC "
7992
"protocol and `grpcbox` module is not available in the "
8093
"code path. Verify that you have the `grpcbox` dependency "
8194
"included and rerun.", []),
82-
undefined
95+
error
8396
end;
8497
_ ->
8598
%% same as the debug log above
@@ -89,17 +102,17 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
89102
kind => Kind,
90103
reason => Reason,
91104
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
92-
undefined
105+
error
93106
end;
94107
{error, undef} when ExporterModule =:= opentelemetry_exporter ->
95108
?LOG_WARNING("OTLP exporter module `opentelemetry_exporter` not found. "
96109
"Verify you have included the `opentelemetry_exporter` dependency.",
97110
[ExporterModule]),
98-
undefined;
111+
error;
99112
{error, undef} ->
100113
?LOG_WARNING("Exporter module ~tp not found. Verify you have included "
101114
"the dependency that contains the exporter module.", [ExporterModule]),
102-
undefined;
115+
error;
103116
_ ->
104117
%% same as the debug log above
105118
%% without the stacktrace and at a higher level
@@ -108,22 +121,25 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) ->
108121
kind => Kind,
109122
reason => Reason,
110123
exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}),
111-
undefined
124+
error
112125
end
113126
end;
114-
init(Exporter) when Exporter =:= none ; Exporter =:= undefined ->
115-
undefined;
127+
init(Exporter) when Exporter =:= none; Exporter =:= undefined; Exporter =:= ignore ->
128+
ignore;
116129
init(ExporterModule) when is_atom(ExporterModule) ->
117130
init({ExporterModule, []}).
118131

119-
export_traces(ExporterModule, SpansTid, Resource, Config) ->
120-
ExporterModule:export(traces, SpansTid, Resource, Config).
132+
export_traces(ExporterModule, SpansTid, Resource, ExporterState) ->
133+
export(traces, ExporterModule, SpansTid, Resource, ExporterState).
134+
135+
export_metrics(ExporterModule, MetricsTid, Resource, ExporterState) ->
136+
export(metrics, ExporterModule, MetricsTid, Resource, ExporterState).
121137

122-
export_metrics(ExporterModule, MetricsTid, Resource, Config) ->
123-
ExporterModule:export(metrics, MetricsTid, Resource, Config).
138+
export_logs(ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState) ->
139+
export(logs, ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState).
124140

125-
export_logs(ExporterModule, Batch, Resource, Config) ->
126-
ExporterModule:export(logs, Batch, Resource, Config).
141+
export(OtelSignal, ExporterModule, Tid, Resource, ExporterState) ->
142+
ExporterModule:export(OtelSignal, Tid, Resource, ExporterState).
127143

128144
shutdown(undefined) ->
129145
ok;

apps/opentelemetry/src/otel_exporter_pid.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Pid) ->
3030
ets:foldl(fun(Span, _Acc) ->
3131
Pid ! {span, Span}
3232
end, [], SpansTid),
33+
ets:delete_all_objects(SpansTid),
3334
ok.
3435

3536
shutdown(_) ->

apps/opentelemetry/src/otel_exporter_stdout.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ export(_, SpansTid, _Resource, _) ->
3131
ets:foldl(fun(Span, _Acc) ->
3232
io:format("~p~n", [Span])
3333
end, [], SpansTid),
34+
ets:delete_all_objects(SpansTid),
3435
ok.
3536

3637
shutdown(_) ->

apps/opentelemetry/src/otel_exporter_tab.erl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Tid) ->
3030
ets:foldl(fun(Span, _Acc) ->
3131
ets:insert(Tid, Span)
3232
end, [], SpansTid),
33+
ets:delete_all_objects(SpansTid),
3334
ok.
3435

3536
shutdown(_) ->

apps/opentelemetry/src/otel_simple_processor.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ export({ExporterModule, Config}, Resource, SpansTid) ->
237237
%% don't let a exporter exception crash us
238238
%% and return true if exporter failed
239239
try
240-
otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config) =:= failed_not_retryable
240+
otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config)
241241
catch
242242
Kind:Reason:StackTrace ->
243243
?LOG_INFO(#{source => exporter,

apps/opentelemetry/src/otel_span_processor.erl

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
-callback on_start(otel_ctx:t(), opentelemetry:span(), processor_config()) -> opentelemetry:span().
2929
-callback on_end(opentelemetry:span(), processor_config()) -> true |
3030
dropped |
31-
{error, invalid_span} |
32-
{error, no_export_buffer}.
31+
{error, term()}.
3332
-callback force_flush(processor_config()) -> ok |
3433
{error, term()}.
3534

apps/opentelemetry/test/opentelemetry_SUITE.erl

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ init_per_testcase(disabled_sdk, Config) ->
6565
Config;
6666
init_per_testcase(no_exporter, Config) ->
6767
application:set_env(opentelemetry, processors,
68-
[{otel_batch_processor, #{scheduled_delay_ms => 1}}]),
68+
[{otel_batch_processor, #{scheduled_delay_ms => 1, exporter => none}}]),
6969
{ok, _} = application:ensure_all_started(opentelemetry),
7070
Config;
7171
init_per_testcase(disable_auto_creation, Config) ->
@@ -128,7 +128,8 @@ init_per_testcase(multiple_tracer_providers, Config) ->
128128
{ok, _} = application:ensure_all_started(opentelemetry),
129129
Config;
130130
init_per_testcase(multiple_processors, Config) ->
131-
application:set_env(opentelemetry, processors, [{otel_batch_processor, #{scheduled_delay_ms => 1,
131+
application:set_env(opentelemetry, processors, [{otel_batch_processor, #{name => first,
132+
scheduled_delay_ms => 1,
132133
exporter => {otel_exporter_pid, self()}}},
133134
{otel_batch_processor, #{name => second,
134135
scheduled_delay_ms => 1,
@@ -269,7 +270,7 @@ logger_metadata(_Config) ->
269270
ok.
270271

271272
%% logger metadata will either be undefined, or a map without hex_span_ctx_keys:
272-
%% [otel_trace_id, otel_span_id, ,otel_trace_flags]
273+
%% [otel_trace_id, otel_span_id, otel_trace_flags]
273274
empty_metadata() ->
274275
case logger:get_process_metadata() of
275276
undefined ->
@@ -589,7 +590,6 @@ update_span_data(Config) ->
589590

590591
ok.
591592

592-
593593
tracer_instrumentation_scope(Config) ->
594594
Tid = ?config(tid, Config),
595595

@@ -606,7 +606,6 @@ tracer_instrumentation_scope(Config) ->
606606
otel_span:end_span(SpanCtx1),
607607

608608
[Span1] = assert_exported(Tid, SpanCtx1),
609-
610609
?assertMatch({instrumentation_scope,<<"tracer1">>,<<"1.0.0">>,<<"http://schema.org/myschema">>},
611610
Span1#span.instrumentation_scope).
612611

@@ -1080,21 +1079,16 @@ disabled_sdk(_Config) ->
10801079

10811080
no_exporter(_Config) ->
10821081
SpanCtx1 = ?start_span(<<"span-1">>),
1083-
1084-
%% set_exporter will enable the export table even if the exporter ends
1085-
%% up being undefined to ensure no spans are lost. so briefly spans
1086-
%% will be captured
1087-
otel_batch_processor:set_exporter(none),
10881082
otel_span:end_span(SpanCtx1),
10891083

10901084
%% once the exporter is "initialized" the table is cleared and disabled
10911085
%% future spans are not added
1092-
?UNTIL([] =:= otel_batch_processor:current_tab_to_list(otel_batch_processor_global)),
1086+
?UNTIL([] =:= otel_batch_olp:current_tab_to_list(otel_batch_processor_global)),
10931087

10941088
SpanCtx2 = ?start_span(<<"span-2">>),
10951089
otel_span:end_span(SpanCtx2),
10961090

1097-
?assertEqual([], otel_batch_processor:current_tab_to_list(otel_batch_processor_global)),
1091+
?assertEqual([], otel_batch_olp:current_tab_to_list(otel_batch_processor_global)),
10981092

10991093
ok.
11001094

apps/opentelemetry/test/otel_batch_processor_SUITE.erl

Lines changed: 37 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010

1111
all() ->
1212
[exporting_timeout_test,
13-
check_table_size_test,
14-
exporting_runner_timeout_test].
13+
exporting_runner_timeout_test,
14+
check_table_size_test].
1515

1616
%% verifies that after the runner has to be killed for taking too long
1717
%% that everything is still functional and the exporter does not crash
@@ -33,48 +33,20 @@ exporting_timeout_test(_Config) ->
3333
ok
3434
end.
3535

36-
check_table_size_test(_Config) ->
37-
MaxQueueSize = 10,
38-
CheckTableSizeMs = 1,
39-
{ok, _Pid, #{reg_name := RegName}} = otel_batch_processor:start_link(
40-
#{name => test_processor_check_size_test,
41-
resource => otel_resource:create([]),
42-
exporter => ?MODULE,
43-
exporting_timeout_ms => timer:minutes(10),
44-
%% long enough, so that it never happens during the test
45-
scheduled_delay_ms => timer:minutes(10),
46-
check_table_size_ms => CheckTableSizeMs,
47-
max_queue_size => MaxQueueSize}
48-
),
49-
%% max_queue_size limit is not reached
50-
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
51-
lists:foreach(fun(_) ->
52-
otel_batch_processor:on_end(generate_span(), #{reg_name => RegName})
53-
end,
54-
lists:seq(1, MaxQueueSize)),
55-
%% Wait for more than CheckTablesizeMS to be sure check timeout occurred
56-
timer:sleep(CheckTableSizeMs * 5),
57-
dropped = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
58-
59-
otel_batch_processor:force_flush(#{reg_name => RegName}),
60-
%% force_flush is async, have to wait for some long enough time again,
61-
timer:sleep(CheckTableSizeMs * 10),
62-
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}).
63-
6436
exporting_runner_timeout_test(_Config) ->
6537
process_flag(trap_exit, true),
6638

67-
{ok, Pid, #{reg_name := RegName}} = otel_batch_processor:start_link(
68-
#{name => test_processor1,
39+
{ok, Pid, State} = otel_batch_processor:start_link(
40+
#{name => test_processor,
6941
resource => otel_resource:create([]),
7042
exporter => ?MODULE,
7143
exporting_timeout_ms => 1,
7244
scheduled_delay_ms => 1}),
7345

7446
%% Insert a few spans to make sure runner process will be spawned and killed
7547
%% because it hangs for 10 minutes (see export/4 below)
76-
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
77-
true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}),
48+
true = otel_batch_processor:on_end(generate_span(), State),
49+
true = otel_batch_processor:on_end(generate_span(), State),
7850

7951
receive
8052
{'EXIT', Pid, _} ->
@@ -85,9 +57,35 @@ exporting_runner_timeout_test(_Config) ->
8557
ok
8658
end.
8759

60+
check_table_size_test(_Config) ->
61+
MaxQueueSize = 10,
62+
{ok, _Pid, State} = otel_batch_processor:start_link(
63+
#{name => test_processor_check_size_test,
64+
resource => otel_resource:create([]),
65+
exporter => ?MODULE,
66+
exporting_timeout_ms => timer:minutes(10),
67+
%% long enough, so that it never happens during the test
68+
scheduled_delay_ms => timer:minutes(10),
69+
max_queue_size => MaxQueueSize}
70+
),
71+
%% max_queue_size limit is not reached
72+
true = otel_batch_processor:on_end(generate_span(), State),
73+
74+
insert_spans(State, MaxQueueSize),
75+
76+
%% Wait a little to give the handler time to transition to the export state
77+
timer:sleep(30),
78+
79+
%% Insert the same number again, rgis time to the next table, as the previous is being exported,
80+
%% exporter is slow (see init_per_testcase), so we can be sure that we will go to the drop mode,
81+
%% with no chance to switch the table this time.
82+
insert_spans(State, MaxQueueSize),
83+
84+
dropped = otel_batch_processor:on_end(generate_span(), State).
85+
8886
%% exporter behaviour
8987

90-
init(_) ->
88+
init(_OtelSignal, _ExporterId, _) ->
9189
{ok, []}.
9290

9391
export(_, _, _, _) ->
@@ -98,6 +96,10 @@ shutdown(_) ->
9896

9997
%% helpers
10098

99+
insert_spans(State, N) ->
100+
lists:foreach(fun(_) -> otel_batch_processor:on_end(generate_span(), State) end,
101+
lists:seq(1, N)).
102+
101103
generate_span() ->
102104
#span{trace_id = otel_id_generator:generate_trace_id(),
103105
span_id = otel_id_generator:generate_span_id(),

0 commit comments

Comments
 (0)