diff --git a/apps/opentelemetry/src/otel_batch_olp.erl b/apps/opentelemetry/src/otel_batch_olp.erl new file mode 100644 index 00000000..a3d79dae --- /dev/null +++ b/apps/opentelemetry/src/otel_batch_olp.erl @@ -0,0 +1,602 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2023, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc Common overload protection implementation +%% @end +%%%------------------------------------------------------------------------- + +-module(otel_batch_olp). + +-behaviour(gen_statem). + +-export([init_conf/1, + start_link/2, + insert_signal/2, + change_config/3, + force_flush/1]). + +%% gen_statem +-export([init/1, + callback_mode/0, + init_exporter/3, + idle/3, + exporting/3, + terminate/3]). + +-export_type([otel_batch_olp_config/0, + otel_batch_olp_state/0, + max_queue_size/0, + otel_timeout_ms/0]). + +-include_lib("kernel/include/logger.hrl"). + +%% Metrics are implmented separately +-type otel_signal() :: traces | logs. + +-type otel_batch_olp_config() :: + #{reg_name := atom(), + cb_module := module(), + otel_signal := otel_signal(), + max_queue_size := max_queue_size(), + exporting_timeout_ms := pos_integer(), + scheduled_delay_ms := pos_integer(), + exporter := otel_exporter:exporter_config(), + shutdown_timeout_ms => pos_integer(), + extra_ets_opts => [{keypos, pos_integer()}], + resource => otel_resource:t() + }. + +%% External state returned to the caller, necessary to be able to communicate with +%% otel_batch_olp either via calls or inserts to ETS tables. +-type otel_batch_olp_state() :: + #{reg_name := atom(), + cb_module := module(), + otel_signal := otel_signal(), + tables := {ets:table(), ets:table()}, + atomic_ref := atomics:atomic_ref(), + extra_ets_opts => [{keypos, pos_integer()}], + max_queue_size => max_queue_size(), + exporting_timeout_ms => otel_timeout_ms(), + scheduled_delay_ms => otel_timeout_ms(), + shutdown_timeout_ms => otel_timeout_ms(), + exporter => otel_exporter:exporter_config() + }. + +-type max_queue_size() :: pos_integer() | infinity. +-type otel_timeout_ms() :: pos_integer(). + +-define(table_name(_RegName_, _TabName_), list_to_atom(lists:concat([_RegName_, "_", _TabName_]))). +-define(table_1(_RegName_), ?table_name(_RegName_, table1)). +-define(table_2(_RegName_), ?table_name(_RegName_, table2)). + +%% Use of atomics provides much better overload protection comparing to periodic ETS table size check. +%% It allows to enter drop mode as soon as max_queue_size is reached, while periodic table check +%% can overlook a large and fast burst of writes that can result in inserting a much larger amount of +%% log events than the configured max_queue_size. +%% Performance-wise, the cost of `atomics:get/2`, `atomics:sub_get/3` is comparable with +%% `persistent_term:get/2,3` +-define(current_tab(_AtomicRef_), atomics:get(_AtomicRef_, ?CURRENT_TAB_IX)). +-define(tab_name(_TabIx_, _Tabs_), element(_TabIx_, _Tabs_)). +-define(next_tab(_CurrentTab_), case _CurrentTab_ of + ?TAB_1_IX -> ?TAB_2_IX; + ?TAB_2_IX -> ?TAB_1_IX + end). + +-define(set_current_tab(_AtomicRef_, _TabIx_), atomics:put(_AtomicRef_, ?CURRENT_TAB_IX, _TabIx_)). +-define(set_available(_AtomicRef_, _TabIx_, _Size_), atomics:put(_AtomicRef_, _TabIx_, _Size_)). +-define(get_available(_AtomicRef_, _TabIx_), atomics:get(_AtomicRef_, _TabIx_)). +-define(sub_get_available(_AtomicRef_, _TabIx_), atomics:sub_get(_AtomicRef_, _TabIx_, 1)). +-define(disable(_AtomicRef_), atomics:put(_AtomicRef_, ?CURRENT_TAB_IX, 0)). + +-define(MAX_SIGNED_INT, (1 bsl 63)-1). +-define(TAB_1_IX, 1). +-define(TAB_2_IX, 2). +%% signifies which table is currently enabled (0 - disabled, 1 - table_1, 2 - table_2) +-define(CURRENT_TAB_IX, 3). + +-define(DEFAULT_SHUTDOWN_MS, 5000). +-define(DEFAULT_EXPORTER_MODULE, opentelemetry_exporter). + +-define(time_ms, erlang:monotonic_time(millisecond)). +-define(rem_time(_Timeout_, _T0_, _T1_), max(0, _Timeout_ - (_T1_ - _T0_))). + +-define(private_field_err(_FieldName_), {error, {_FieldName_, "private_field_change_not_allowed"}}). +-define(change_not_allowed_err(_FieldName_), {error, {_FieldName_, "field_change_not_allowed"}}). + +-record(data, {exporter :: {module(), State :: term()} | ignore | undefined, + exporter_config :: otel_exporter:exporter_config(), + resource :: otel_resource:t(), + handed_off_table :: ets:table() | undefined, + runner :: {pid(), reference()} | undefined, + tables :: {ets:table(), ets:table()}, + reg_name :: atom(), + max_queue_size :: max_queue_size(), + exporting_timeout_ms :: otel_timeout_ms(), + scheduled_delay_ms :: otel_timeout_ms(), + shutdown_ms :: otel_timeout_ms(), + atomic_ref :: atomics:atomic_ref(), + exporter_timer :: undefined | reference(), + otel_signal :: otel_signal(), + %% for future extensions + cb_module :: module(), + %% Arbitrary config of the callback module + external_config :: term(), + extra = [] %% Unused, for future extensions + }). + +%%-------------------------------------------------------------------- +%% Test utils +%%-------------------------------------------------------------------- + +-ifdef(TEST). +-export([current_tab_to_list/1]). +current_tab_to_list(RegName) -> + {_, #data{tables=Tabs, atomic_ref=AtomicRef}} = sys:get_state(RegName), + case ?current_tab(AtomicRef) of + 0 -> []; + TabIx -> ets:tab2list(?tab_name(TabIx, Tabs)) + end. +-endif. + +%%-------------------------------------------------------------------- +%% otel_batch_olp API +%%-------------------------------------------------------------------- + +-spec init_conf(otel_batch_olp_config() | otel_batch_olp_state()) -> + {ok, otel_batch_olp_state()} | {error, term()}. +init_conf(#{reg_name := RegName, cb_module := _Module, otel_signal := _, exporter := _, + max_queue_size := _, exporting_timeout_ms := _, scheduled_delay_ms := _} = Config) -> + case validate_config(without_state_fields(Config)) of + ok -> + AtomicRef = atomics:new(3, [{signed, true}]), + {ok, Config#{reg_name => RegName, + tables => {?table_1(RegName), ?table_2(RegName)}, + atomic_ref => AtomicRef}}; + Err -> + Err + end. +-spec start_link(otel_batch_olp_state(), term()) -> gen_statem:start_ret(). +start_link(#{reg_name := RegName} = OlpState, ExternalConfig) -> + gen_statem:start_link({local, RegName}, ?MODULE, [OlpState, ExternalConfig], []). + +-spec insert_signal(tuple(), otel_batch_olp_state()) -> true | dropped | {error, term()}. +insert_signal(Record, #{atomic_ref := AtomicRef, tables := Tabs} = State) -> + try + case ?current_tab(AtomicRef) of + 0 -> dropped; + CurrentTab -> + case ?sub_get_available(AtomicRef, CurrentTab) of + Seq when Seq > 0 -> + ets:insert(?tab_name(CurrentTab, Tabs), Record); + 0 -> + %% max_queue_size is reached + Res = ets:insert(?tab_name(CurrentTab, Tabs), Record), + _ = force_flush(State), + Res; + _ -> + dropped + end + end + catch + error:badarg -> + {error, {no_otel_batch_olp, maps:get(otel_signal, State, undefined)}}; + Err:Reason -> + {error, {Err, Reason}} + end. + +-spec force_flush(otel_batch_olp_state()) -> ok. +force_flush(#{reg_name := RegName}) -> + gen_statem:cast(RegName, force_flush). + +-spec change_config(OldConfigState, NewConfigOrState, NewExtConfig) -> + {ok, NewConfigState} | {error, Reason} when + OldConfigState :: otel_batch_olp_state(), + NewConfigOrState :: otel_batch_olp_config() | otel_batch_olp_state(), + NewExtConfig :: term(), + NewConfigState :: otel_batch_olp_state(), + Reason :: term(). +change_config(#{reg_name := RegName}, #{reg_name := RegName1}, _) when RegName =/= RegName1 -> + ?private_field_err(reg_name); +change_config(#{atomic_ref := Ref}, #{atomic_ref := Ref1}, _) when Ref =/= Ref1 -> + ?private_field_err(atomic_ref); +change_config(#{tables := Tabs}, #{tables := Tabs1}, _) when Tabs =/= Tabs1 -> + ?private_field_err(tables); +%% Changing timeout or exporter config requires restart/re-initialiazation of exporter, +%% which is not supported now. If timeout or exporter needs to be changed, +%% the handler should be stopped and started with the new config +change_config(#{exporter := Exporter}, #{exporter := Exporter1}, _) when Exporter =/= Exporter1 -> + ?change_not_allowed_err(exporter); +change_config(#{exporting_timeout_ms := T}, #{exporting_timeout_ms := T1}, _) when T =/= T1 -> + ?change_not_allowed_err(exporting_timeout_ms); +change_config(#{reg_name := RegName} = OldConfig, NewConfigOrState, NewExtConfig) -> + NewConfig = without_state_fields(NewConfigOrState), + case validate_config(NewConfig) of + ok -> + %% This is necessary, so that the config returned to the caller + %% contains all the immutable keys required to communicate with + %% otel_batch_olp + NewConfig1 = copy_required_fields(OldConfig, NewConfig), + gen_statem:call(RegName, {change_config, NewConfig1, NewExtConfig}); + Err -> + Err + end. + +%%-------------------------------------------------------------------- +%% gen_statem callbacks +%%-------------------------------------------------------------------- + +init([OlpState, ExtConfig]) -> + #{atomic_ref := AtomicRef, + reg_name := RegName, + tables := {Tab1, Tab2}, + cb_module := CbModule, + otel_signal := OtelSignal, + max_queue_size := MaxQueueSize, + scheduled_delay_ms := ScheduledDelay, + exporting_timeout_ms := ExportTimeoutMs, + exporter := ExporterConfig + } = OlpState, + process_flag(trap_exit, true), + Resource = maps:get(resource, OlpState, otel_resource_detector:get_resource()), + ExporterConfig1 = exporter_conf_with_timeout(ExporterConfig, ExportTimeoutMs), + + %% assert table names match + Tab1 = ?table_1(RegName), + Tab2 = ?table_2(RegName), + + ExtraEtsOpts = maps:get(extra_ets_opts, OlpState, []), + _Tid1 = new_export_table(Tab1, ExtraEtsOpts), + _Tid2 = new_export_table(Tab2, ExtraEtsOpts), + + %% This is sligthly increased, to give the exporter runner a chance to garcefully time-out + %% before being killed by the handler. + ExportTimeoutMs1 = ExportTimeoutMs + 1000, + + Data = #data{atomic_ref=AtomicRef, + exporter=undefined, + exporter_config=ExporterConfig1, + otel_signal = OtelSignal, + resource=Resource, + tables={Tab1, Tab2}, + reg_name=RegName, + cb_module = CbModule, + exporting_timeout_ms=ExportTimeoutMs1, + scheduled_delay_ms=ScheduledDelay, + shutdown_ms = maps:get(shutdown_timeout_ms, OlpState, ?DEFAULT_SHUTDOWN_MS), + max_queue_size=size_limit(MaxQueueSize), + external_config=ExtConfig}, + %% Also used in change_config API, thus mutable + Data1 = add_mutable_config_to_data(OlpState, ExtConfig, Data), + + ?set_current_tab(AtomicRef, ?TAB_1_IX), + ?set_available(AtomicRef, ?TAB_1_IX, Data1#data.max_queue_size), + ?set_available(AtomicRef, ?TAB_2_IX, Data1#data.max_queue_size), + + {ok, init_exporter, Data1}. + +callback_mode() -> + [state_functions, state_enter]. + +%% TODO: handle exporter crashes and re-init it. +%% This is not expected to happen with the default grpc opentelemetry_exporter, +%% as it keeps running and retrying by itself in case of network failures. +init_exporter(enter, _OldState, _Data) -> + {keep_state_and_data, [{state_timeout, 0, do_init_exporter}]}; +init_exporter(_, do_init_exporter, Data=#data{exporter_config=ExporterConfig, + atomic_ref=AtomicRef, + tables=Tabs, + scheduled_delay_ms=SendInterval, + reg_name=RegName, + otel_signal=Signal}) -> + case do_init_exporter(Signal, RegName, ExporterConfig) of + %% error should be retried + error -> + {keep_state_and_data, [{state_timeout, SendInterval, do_init_exporter}]}; + ignore -> + %% no exporter: disable the insertion of new log events and delete the current table + clear_table_and_disable(AtomicRef, Tabs), + {next_state, idle, Data#data{exporter=ignore}}; + Exporter -> + TimerRef = start_exporting_timer(SendInterval), + {next_state, idle, Data#data{exporter=Exporter, exporter_timer=TimerRef}} + end; +init_exporter(_, _, _) -> + %% Ignore any other, e.g, external events like force_flush in this state + keep_state_and_data. + +idle(enter, _OldState, _Data) -> + keep_state_and_data; +idle(info, {timeout, Ref, export_signals}, Data=#data{exporter_timer=Ref}) -> + {next_state, exporting, Data}; +idle(cast, force_flush, #data{exporter=Exporter}=Data) when Exporter =/= ignore -> + {next_state, exporting, Data}; +idle(EventType, EventContent, Data) -> + handle_event_(idle, EventType, EventContent, Data). + +exporting(info, {timeout, Ref, export_signals}, #data{exporter_timer=Ref}) -> + {keep_state_and_data, [postpone]}; +exporting(enter, _OldState, Data=#data{atomic_ref=AtomicRef, + tables=Tabs, + max_queue_size=MaxSize, + exporting_timeout_ms=ExportingTimeout, + scheduled_delay_ms=SendInterval}) -> + CurrentTab = ?current_tab(AtomicRef), + {Data1, Actions} = + case ?get_available(AtomicRef, CurrentTab) of + %% No events yet, maximum available capacity, nothing to export + MaxSize -> + %% The other table may contain residual (late) writes not exported + %% during the previous run. If current table is not empty, we don't + %% need to check the size of the previous (currently disabled) table, + %% since we will switch to it after this exporter run. + %% However, if current table remains empty for a long time, + %% neither export nor table switch will be triggered, and any + %% residual late log events in the previous table would be left + %% dangling. To avoid such cases, we check other table size + %% and export it if it's not empty. + maybe_export_other_table(CurrentTab, Data); + _ -> + RunnerPidRef = export_signals(CurrentTab, Data), + {Data#data{runner=RunnerPidRef, + handed_off_table=?tab_name(CurrentTab, Tabs)}, + [{state_timeout, ExportingTimeout, exporting_timeout}]} + end, + {keep_state, Data1#data{exporter_timer = start_exporting_timer(SendInterval)}, Actions}; +exporting(state_timeout, empty_table, Data) -> + {next_state, idle, Data}; +exporting(state_timeout, exporting_timeout, Data) -> + %% kill current exporting process because it is taking too long + Data1 = kill_runner(Data), + {next_state, idle, Data1}; +%% Exit reason is ignored, since we don't handle exporter failures specifically for now +exporting(info, {'DOWN', Ref, process, Pid, _Info}, Data=#data{runner={Pid, Ref}}) -> + complete_exporting(Data); +exporting(EventType, Event, Data) -> + handle_event_(exporting, EventType, Event, Data). + +terminate(_Reason, _State, #data{exporter=ignore}) -> + ok; +terminate(_Reason, State, Data=#data{exporter=Exporter, + resource=Resource, + external_config=ExtConfig, + atomic_ref=AtomicRef, + tables={Tab1, Tab2}, + shutdown_ms=ShutdownMs, + otel_signal=Signal + }) -> + ?disable(AtomicRef), + T0 = ?time_ms, + _ = maybe_wait_for_current_runner(State, Data, ShutdownMs), + T1 = ?time_ms, + + %% Check both tables as each one may have some late unexported signals data. + %% NOTE: exports are attempted sequentially to follow the specification restriction: + %% "Export will never be called concurrently for the same exporter instance" + %% (see: https://opentelemetry.io/docs/specs/otel/logs/sdk/#export). + RemTime = ?rem_time(ShutdownMs, T0, T1), + ets:info(Tab1, size) > 0 + andalso export_and_wait(Exporter, Resource, Tab1, ExtConfig, RemTime, Signal), + T2 = ?time_ms, + RemTime1 = ?rem_time(RemTime, T1, T2), + ets:info(Tab2, size) > 0 + andalso export_and_wait(Exporter, Resource, Tab2, ExtConfig, RemTime1, Signal), + + _ = otel_exporter:shutdown(Exporter), + ok. + +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- + +handle_event_(_State, {call, From}, {change_config, NewConfig, NewExtConfig}, Data) -> + {keep_state, + add_mutable_config_to_data(NewConfig, NewExtConfig, Data), + [{reply, From, {ok, NewConfig}}]}; +handle_event_(_State, info, {'EXIT', _Pid, Reason}, _Data) -> + %% This can be a linked exporter process, unless someone linked to the handler process, + %% or explicitly called exit(HandlerPid, Reason) + %% This will call terminate/3 and may try to export current log events, + %% even if the linked exporter process is down. + %% This is safe, though, as all errors of otel_exporter:export/5 are caught. + {stop, Reason}; +handle_event_(_State, _, _, _) -> + keep_state_and_data. + +do_init_exporter(_Signal, _RegName, ExporterConfig) -> + otel_exporter:init(ExporterConfig). + +start_exporting_timer(SendInterval) -> + erlang:start_timer(SendInterval, self(), export_signals). + +maybe_export_other_table(CurrentTab, Data=#data{tables=Tabs, + exporting_timeout_ms=ExportingTimeout}) -> + NextTab = ?next_tab(CurrentTab), + %% Check ETS size instead of the counter, as late writes can't be detected with the atomic counter + case ets:info(?tab_name(NextTab, Tabs), size) of + 0 -> + %% in an `enter' handler we can't return a `next_state' or `next_event' + %% so we rely on a timeout to trigger the transition to `idle' + {Data#data{runner=undefined}, [{state_timeout, 0, empty_table}]}; + _ -> + RunnerPid = export_signals(NextTab, Data), + {Data#data{runner=RunnerPid, handed_off_table=?tab_name(CurrentTab, Tabs)}, + [{state_timeout, ExportingTimeout, exporting_timeout}]} + end. + +export_signals(CurrentTab, #data{exporter=Exporter, + max_queue_size=MaxSize, + resource=Resource, + atomic_ref=AtomicRef, + tables=Tabs, + external_config=ExtConfig, + otel_signal=Signal}) -> + + NewCurrentTab = ?next_tab(CurrentTab), + %% the new table is expected to be empty or hold a few late writes from the previous export, + %% so it safe to set available max size + ?set_available(AtomicRef, NewCurrentTab, MaxSize), + ?set_current_tab(AtomicRef, NewCurrentTab), + export_async(Exporter, Resource, ?tab_name(CurrentTab, Tabs), ExtConfig, Signal). + +export_async(Exporter, Resource, CurrentTab, ExtConfig, Signal) -> + erlang:spawn_monitor(fun() -> export(Exporter, Resource, CurrentTab, ExtConfig, Signal) end). + +export(undefined, _, _, _, _) -> + true; +export({ExporterModule, ExporterState}, Resource, Tab, ExtConfig, Signal) -> + try + %% TODO: API for both signals should probably be aligned in otel_exporter + TabOrTabConfig = case Signal of + traces -> Tab; + logs -> {Tab, ExtConfig} + end, + %% we ignore values, as no retries mechanism, is implemented + otel_exporter:export(Signal, ExporterModule, TabOrTabConfig, Resource, ExporterState) + catch + Class:Reason:St -> + ?LOG_ERROR("~p exporter ~p failed with exception: ~p:~p, stacktrace: ~p", + [Signal, Class, Reason, otel_utils:stack_without_args(St)]), + error + end. + +new_export_table(Name, ExtraEtsOpts) -> + %% log event timestamps used as keys are not guaranteed to always be unique, + %% so we use duplicate_bag + %% Using timestamps as keys instead of instrumentation scopes is expected + %% to have higher entropy which should improve write concurrency + Opts = lists:usort([public, + named_table, + {write_concurrency, true}, + duplicate_bag] ++ ExtraEtsOpts), + ets:new(Name, Opts). + +clear_table_and_disable(AtomicRef, Tabs) -> + case ?current_tab(AtomicRef) of + 0 -> + %% already disabled + ok; + CurrentTab -> + ?disable(AtomicRef), + CurrentTabName = ?tab_name(CurrentTab, Tabs), + ets:delete_all_objects(CurrentTabName), + ok + end. + +complete_exporting(Data) -> + {next_state, idle, Data#data{runner=undefined, + handed_off_table=undefined}}. + +kill_runner(Data=#data{runner={RunnerPid, Ref}, handed_off_table=Tab}) -> + _ = erlang:demonitor(Ref), + %% NOTE: `exit/2` is async, but as we don't delete/recreate export tables, + %% we don't need to wait for runner termination + erlang:exit(RunnerPid, kill), + _ = ets:delete_all_objects(Tab), + Data#data{runner=undefined, handed_off_table=undefined}; +kill_runner(Data=#data{runner=undefined}) -> + Data. + +exporter_conf_with_timeout({?DEFAULT_EXPORTER_MODULE, Conf}, TimeoutMs) -> + {?DEFAULT_EXPORTER_MODULE, Conf#{timeout_ms => TimeoutMs}}; +exporter_conf_with_timeout(OtherExporter, _Timeout) -> + OtherExporter. + +%% terminate/3 helpers + +export_and_wait(Exporter, Resource, Tab, Config, Timeout, Signal) -> + RunnerPidRef = export_async(Exporter, Resource, Tab, Config, Signal), + wait_for_runner(RunnerPidRef, Timeout). + +wait_for_runner({RunnerPid, RunnerRef}, Timeout) -> + receive + {'DOWN', RunnerRef, process, RunnerPid, _Info} -> ok + after Timeout -> + erlang:demonitor(RunnerRef), + erlang:exit(RunnerPid, kill), + ok + end. + +maybe_wait_for_current_runner(exporting, #data{runner={Pid, Ref}}, Timeout) -> + wait_for_runner({Pid, Ref}, Timeout); +maybe_wait_for_current_runner(_State, _Date, _Timeout) -> ok. + +%% Config helpers + +validate_config(Config) -> + Errs = maps:fold(fun(K, Val, Acc) -> + case validate_opt(K, Val, Config) of + ok -> Acc; + Err -> [Err | Acc] + end + end, + [], Config), + case Errs of + [] -> ok; + _ -> {error, Errs} + end. + +validate_opt(max_queue_size, infinity, _Config) -> + ok; +validate_opt(K, Val, _Config) when is_integer(Val), Val > 0, + K =:= max_queue_size; + K =:= exporting_timeout_ms; + K =:= scheduled_delay_ms; + K =:= shutdown_timeout_ms -> + ok; +validate_opt(exporter, {Module, _}, _Config) when is_atom(Module) -> + ok; +validate_opt(exporter, Module, _Config) when is_atom(Module) -> + ok; +validate_opt(otel_signal, Signal, _Config) when Signal =:= traces; Signal =:= logs -> + ok; +validate_opt(cb_module, Module, _Config) -> + Module:module_info(), + ok; +validate_opt(extra_ets_opts, [{keypos, Pos}], _Config) when Pos >= 1 -> + ok; +validate_opt(extra_ets_opts, [], _Config) -> + ok; +validate_opt(reg_name, RegName, _Config) when is_atom(RegName) -> + ok; +validate_opt(resource, Resource, _Config) when is_tuple(Resource) -> + ok; +validate_opt(K, Val, _Config) -> + {invalid_config, K, Val}. + +add_mutable_config_to_data(Config, ExtConfig, Data) -> + #{max_queue_size:=SizeLimit, + scheduled_delay_ms:=ScheduledDelay + } = Config, + Data#data{max_queue_size=size_limit(SizeLimit), + scheduled_delay_ms=ScheduledDelay, + external_config=ExtConfig}. + +%% high enough, must be infeasible to reach +size_limit(infinity) -> + ?MAX_SIGNED_INT; +size_limit(Int) -> + Int. + +copy_required_fields(OldConf, NewConf) -> + #{reg_name := RegName, + otel_signal := Signal, + tables := Tabs, + atomic_ref := AtomicRef} = OldConf, + NewConf#{reg_name => RegName, + otel_signal => Signal, + tables => Tabs, + atomic_ref => AtomicRef}. + +without_state_fields(ConfigOrState) -> + maps:without([tables, atomic_ref], ConfigOrState). diff --git a/apps/opentelemetry/src/otel_batch_processor.erl b/apps/opentelemetry/src/otel_batch_processor.erl index f2dbc6d7..024d6a2d 100644 --- a/apps/opentelemetry/src/otel_batch_processor.erl +++ b/apps/opentelemetry/src/otel_batch_processor.erl @@ -32,94 +32,54 @@ %%%----------------------------------------------------------------------- -module(otel_batch_processor). --behaviour(gen_statem). + -behaviour(otel_span_processor). -export([start_link/1, on_start/3, on_end/2, - force_flush/1, - report_cb/1, - - %% deprecated - set_exporter/1, - set_exporter/2, - set_exporter/3]). - --export([init/1, - callback_mode/0, - idle/3, - exporting/3, - terminate/3]). - -%% uncomment when OTP-23 becomes the minimum required version -%% -deprecated({set_exporter, 1, "set through the otel_tracer_provider instead"}). -%% -deprecated({set_exporter, 2, "set through the otel_tracer_provider instead"}). -%% -deprecated({set_exporter, 3, "set through the otel_tracer_provider instead"}). + force_flush/1]). -include_lib("opentelemetry_api/include/opentelemetry.hrl"). -include_lib("kernel/include/logger.hrl"). -include("otel_span.hrl"). --record(data, {exporter :: {module(), term()} | undefined, - exporter_config :: {module(), term()} | undefined | none, - resource :: otel_resource:t() | undefined, - handed_off_table :: atom() | undefined, - runner_pid :: pid() | undefined, - max_queue_size :: integer() | infinity, - exporting_timeout_ms :: integer(), - check_table_size_ms :: integer() | infinity, - scheduled_delay_ms :: integer(), - table_1 :: atom(), - table_2 :: atom(), - reg_name :: atom()}). - --define(CURRENT_TABLES_KEY(Name), {?MODULE, current_table, Name}). - -%% create unique table names to support multiple batch processors at once --define(TABLE_NAME(TN), lists:concat([TN, "_", erlang:pid_to_list(self())])). --define(TABLE_1, ?REG_NAME(?TABLE_NAME(otel_export_table1))). --define(TABLE_2, ?REG_NAME(?TABLE_NAME(otel_export_table2))). --define(CURRENT_TABLE(RegName), persistent_term:get(?CURRENT_TABLES_KEY(RegName))). +-type batch_processor_config() :: + #{name := atom(), + max_queue_size => otel_batch_olp:max_queue_size(), + exporting_timeout_ms => otel_batch_olp:otel_timeout_ms(), + scheduled_delay_ms => otel_batch_olp:otel_timeout_ms(), + exporter => otel_exporter:exporter_config()}. -define(DEFAULT_MAX_QUEUE_SIZE, 2048). -define(DEFAULT_SCHEDULED_DELAY_MS, timer:seconds(5)). --define(DEFAULT_EXPORTER_TIMEOUT_MS, timer:minutes(5)). --define(DEFAULT_CHECK_TABLE_SIZE_MS, timer:seconds(1)). - --define(ENABLED_KEY(RegName), {?MODULE, enabled_key, RegName}). - --ifdef(TEST). --export([current_tab_to_list/1]). -current_tab_to_list(RegName) -> - ets:tab2list(?CURRENT_TABLE(RegName)). --endif. - -%% require a unique name to distiguish multiple batch processors while -%% still having a single name, instead of a possibly changing pid, to -%% communicate with the processor -%% @doc Starts a Batch Span Processor. -%% @end --spec start_link(#{name := atom() | list()}) -> {ok, pid(), map()}. -start_link(Config=#{name := Name}) -> - RegisterName = ?REG_NAME(Name), - Config1 = Config#{reg_name => RegisterName}, - {ok, Pid} = gen_statem:start_link({local, RegisterName}, ?MODULE, [Config1], []), - {ok, Pid, Config1}. - -%% @deprecated Please use {@link otel_tracer_provider} -set_exporter(Exporter) -> - set_exporter(global, Exporter, []). - -%% @deprecated Please use {@link otel_tracer_provider} --spec set_exporter(module(), term()) -> ok. -set_exporter(Exporter, Options) -> - gen_statem:call(?REG_NAME(global), {set_exporter, {Exporter, Options}}). - -%% @deprecated Please use {@link otel_tracer_provider} --spec set_exporter(atom(), module(), term()) -> ok. -set_exporter(Name, Exporter, Options) -> - gen_statem:call(?REG_NAME(Name), {set_exporter, {Exporter, Options}}). +-define(DEFAULT_EXPORTER_TIMEOUT_MS, timer:seconds(30)). +-define(DEFAULT_EXPORTER_MODULE, opentelemetry_exporter). +-define(DEFAULT_EXPORTER, {?DEFAULT_EXPORTER_MODULE, #{}}). + +-spec start_link(batch_processor_config()) -> + {ok, pid(), otel_batch_olp:otel_batch_olp_state()} | {error, term()}. +start_link(#{name := Name} = Config) -> + %% TODO: resource should be passed in from the tracer server, now it is detectd by otel_batch_olp + Config1 = maps:merge(default_config(), maps:remove(name, Config)), + Config2 = Config1#{reg_name => ?REG_NAME(Name), + cb_module => ?MODULE, + otel_signal => traces, + %% trace_id is expected to have high entropy and, thus, can improve concurrency, + %% it is not unique per span, but it doesn't matter as the table type + %% is `duplicate_bag` + extra_ets_opts => [{keypos, #span.trace_id}]}, + case otel_batch_olp:init_conf(Config2) of + {ok, OlpState} -> + case otel_batch_olp:start_link(OlpState, Config) of + {ok, Pid} -> + {ok, Pid, OlpState}; + Err -> + Err + end; + Err -> + Err + end. %% @private -spec on_start(otel_ctx:t(), opentelemetry:span(), otel_span_processor:processor_config()) @@ -127,339 +87,25 @@ set_exporter(Name, Exporter, Options) -> on_start(_Ctx, Span, _) -> Span. -%% @private --spec on_end(opentelemetry:span(), otel_span_processor:processor_config()) - -> true | dropped | {error, invalid_span} | {error, no_export_buffer}. +-spec on_end(opentelemetry:span(), otel_batch_olp:otel_batch_olp_state()) -> + true | dropped | {error, term()}. on_end(#span{trace_flags=TraceFlags}, _) when not(?IS_SAMPLED(TraceFlags)) -> dropped; -on_end(Span=#span{}, #{reg_name := RegName}) -> - do_insert(RegName, Span); +on_end(Span=#span{}, Config) -> + otel_batch_olp:insert_signal(Span, Config); on_end(_Span, _) -> {error, invalid_span}. -%% @private --spec force_flush(#{reg_name := gen_statem:server_ref()}) -> ok. -force_flush(#{reg_name := RegName}) -> - gen_statem:cast(RegName, force_flush). - -%% @private -init([Args=#{reg_name := RegName}]) -> - process_flag(trap_exit, true), - - SizeLimit = maps:get(max_queue_size, Args, ?DEFAULT_MAX_QUEUE_SIZE), - ExportingTimeout = maps:get(exporting_timeout_ms, Args, ?DEFAULT_EXPORTER_TIMEOUT_MS), - ScheduledDelay = maps:get(scheduled_delay_ms, Args, ?DEFAULT_SCHEDULED_DELAY_MS), - CheckTableSize = maps:get(check_table_size_ms, Args, ?DEFAULT_CHECK_TABLE_SIZE_MS), - - %% TODO: this should be passed in from the tracer server - Resource = case maps:find(resource, Args) of - {ok, R} -> - R; - error -> - otel_resource_detector:get_resource() - end, - %% Resource = otel_tracer_provider:resource(), - - Table1 = ?TABLE_1, - Table2 = ?TABLE_2, - - _Tid1 = new_export_table(Table1), - _Tid2 = new_export_table(Table2), - persistent_term:put(?CURRENT_TABLES_KEY(RegName), Table1), - - %% only enable export table if there is going to be an exporter - case maps:get(exporter, Args, none) of - ExporterConfig when ExporterConfig =:= none ; ExporterConfig =:= undefined -> - disable(RegName); - ExporterConfig -> - enable(RegName) - end, - - {ok, idle, #data{exporter=undefined, - exporter_config=ExporterConfig, - resource = Resource, - handed_off_table=undefined, - max_queue_size=SizeLimit, - exporting_timeout_ms=ExportingTimeout, - check_table_size_ms=CheckTableSize, - scheduled_delay_ms=ScheduledDelay, - table_1=Table1, - table_2=Table2, - reg_name=RegName}}. - -%% @private -callback_mode() -> - [state_functions, state_enter]. - -%% @private -idle(enter, _OldState, Data=#data{exporter=undefined, - exporter_config=ExporterConfig, - scheduled_delay_ms=SendInterval, - check_table_size_ms=CheckInterval, - reg_name=RegName}) -> - Exporter = init_exporter(RegName, ExporterConfig), - {keep_state, Data#data{exporter=Exporter}, - [{{timeout, export_spans}, SendInterval, export_spans}, - {{timeout, check_table_size}, CheckInterval, check_table_size}]}; -idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval, - check_table_size_ms=CheckInterval}) -> - {keep_state_and_data, - [{{timeout, export_spans}, SendInterval, export_spans}, - {{timeout, check_table_size}, CheckInterval, check_table_size}]}; -idle(_, export_spans, Data=#data{exporter=undefined, - exporter_config=ExporterConfig, - reg_name=RegName}) -> - Exporter = init_exporter(RegName, ExporterConfig), - {next_state, exporting, Data#data{exporter=Exporter}}; -idle(_, export_spans, Data) -> - {next_state, exporting, Data}; -idle(EventType, Event, Data) -> - handle_event_(idle, EventType, Event, Data). - -%% receiving an `export_spans' timeout while exporting means the `ExportingTimeout' -%% is shorter than the `SendInterval'. Postponing the event will ensure we export -%% after -%% @private -exporting({timeout, export_spans}, export_spans, _) -> - {keep_state_and_data, [postpone]}; -exporting(enter, _OldState, #data{exporter=undefined, - reg_name=RegName}) -> - %% exporter still undefined, go back to idle - %% first empty the table and disable the processor so no more spans are added - %% we wait until the attempt to export to disable so we don't lose spans - %% on startup but disable once it is clear an exporter isn't being set - clear_table_and_disable(RegName), - - %% use state timeout to transition to `idle' since we can't set a - %% new state in an `enter' handler - {keep_state_and_data, [{state_timeout, 0, no_exporter}]}; -exporting(enter, _OldState, Data=#data{exporting_timeout_ms=ExportingTimeout, - scheduled_delay_ms=SendInterval}) -> - case export_spans(Data) of - ok -> - %% in an `enter' handler we can't return a `next_state' or `next_event' - %% so we rely on a timeout to trigger the transition to `idle' - {keep_state, Data#data{runner_pid=undefined}, [{state_timeout, 0, empty_table}]}; - {OldTableName, RunnerPid} -> - {keep_state, Data#data{runner_pid=RunnerPid, - handed_off_table=OldTableName}, - [{state_timeout, ExportingTimeout, exporting_timeout}, - {{timeout, export_spans}, SendInterval, export_spans}]} - end; - -%% TODO: we need to just check if `exporter=undefined' right? -%% two hacks since we can't transition to a new state or send an action from `enter' -exporting(state_timeout, no_exporter, Data) -> - {next_state, idle, Data}; -exporting(state_timeout, empty_table, Data) -> - {next_state, idle, Data}; +-spec force_flush(otel_batch_olp:otel_batch_olp_state()) -> ok. +force_flush(Config) -> + otel_batch_olp:force_flush(Config). -exporting(state_timeout, exporting_timeout, Data=#data{handed_off_table=ExportingTable}) -> - %% kill current exporting process because it is taking too long - %% which deletes the exporting table, so create a new one and - %% repeat the state to force another span exporting immediately - Data1 = kill_runner(Data), - new_export_table(ExportingTable), - {next_state, idle, Data1}; -%% important to verify runner_pid and FromPid are the same in case it was sent -%% after kill_runner was called but before it had done the unlink -exporting(info, {'EXIT', FromPid, _}, Data=#data{runner_pid=FromPid}) -> - complete_exporting(Data); -%% important to verify runner_pid and FromPid are the same in case it was sent -%% after kill_runner was called but before it had done the unlink -exporting(info, {completed, FromPid}, Data=#data{runner_pid=FromPid}) -> - complete_exporting(Data); -exporting(EventType, Event, Data) -> - handle_event_(exporting, EventType, Event, Data). +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- -%% transition to exporting on a force_flush unless we are already exporting -%% if exporting then postpone the event so the force flush happens after -%% this current exporting is complete -handle_event_(exporting, _, force_flush, _Data) -> - {keep_state_and_data, [postpone]}; -handle_event_(_State, _, force_flush, Data) -> - {next_state, exporting, Data}; - -handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=infinity}) -> - keep_state_and_data; -handle_event_(_State, {timeout, check_table_size}, check_table_size, #data{max_queue_size=MaxQueueSize, - check_table_size_ms=CheckInterval, - reg_name=RegName}) -> - case ets:info(?CURRENT_TABLE(RegName), size) of - M when M >= MaxQueueSize -> - disable(RegName); - _ -> - enable(RegName) - end, - {keep_state_and_data, [{{timeout, check_table_size}, CheckInterval, check_table_size}]}; -handle_event_(_, {call, From}, {set_exporter, ExporterConfig}, Data=#data{exporter=OldExporter, - reg_name=RegName}) -> - otel_exporter:shutdown(OldExporter), - - %% enable immediately or else spans will be dropped for a period even after this call returns - enable(RegName), - - {keep_state, Data#data{exporter=undefined, - exporter_config=ExporterConfig}, [{reply, From, ok}, - {next_event, internal, init_exporter}]}; -handle_event_(_, internal, init_exporter, Data=#data{exporter=undefined, - exporter_config=ExporterConfig, - reg_name=RegName}) -> - Exporter = init_exporter(RegName, ExporterConfig), - {keep_state, Data#data{exporter=Exporter}}; -handle_event_(_, _, _, _) -> - keep_state_and_data. - -%% @private -terminate(_Reason, _State, #data{exporter=Exporter, - resource=Resource, - reg_name=RegName}) -> - CurrentTable = ?CURRENT_TABLE(RegName), - - %% `export' is used to perform a blocking export - _ = export(Exporter, Resource, CurrentTable), - - ok. - -%% - -init_exporter(RegName, ExporterConfig) -> - case otel_exporter:init(ExporterConfig) of - Exporter when Exporter =/= undefined andalso Exporter =/= none -> - enable(RegName), - Exporter; - _ -> - %% exporter is undefined/none - %% disable the insertion of new spans and delete the current table - clear_table_and_disable(RegName), - undefined - end. - -clear_table_and_disable(RegName) -> - disable(RegName), - ets:delete(?CURRENT_TABLE(RegName)), - new_export_table(?CURRENT_TABLE(RegName)). - -enable(RegName)-> - persistent_term:put(?ENABLED_KEY(RegName), true). - -disable(RegName) -> - persistent_term:put(?ENABLED_KEY(RegName), false). - -is_enabled(RegName) -> - persistent_term:get(?ENABLED_KEY(RegName), true). - -do_insert(RegName, Span) -> - try - case is_enabled(RegName) of - true -> - ets:insert(?CURRENT_TABLE(RegName), Span); - _ -> - dropped - end - catch - error:badarg -> - {error, no_batch_span_processor}; - _:_ -> - {error, other} - end. - -complete_exporting(Data=#data{handed_off_table=ExportingTable}) - when ExportingTable =/= undefined -> - new_export_table(ExportingTable), - {next_state, idle, Data#data{runner_pid=undefined, - handed_off_table=undefined}}; -complete_exporting(Data) -> - {next_state, idle, Data#data{runner_pid=undefined, - handed_off_table=undefined}}. - -kill_runner(Data=#data{runner_pid=RunnerPid}) when RunnerPid =/= undefined -> - Mon = erlang:monitor(process, RunnerPid), - erlang:unlink(RunnerPid), - erlang:exit(RunnerPid, kill), - %% Wait for the runner process termination to be sure that - %% the export table is destroyed and can be safely recreated - receive - {'DOWN', Mon, process, RunnerPid, _} -> - Data#data{runner_pid=undefined, handed_off_table=undefined} - end. - -new_export_table(Name) -> - ets:new(Name, [public, - named_table, - {write_concurrency, true}, - duplicate_bag, - %% OpenTelemetry exporter protos group by the - %% instrumentation_scope. So using instrumentation_scope - %% as the key means we can easily lookup all spans for - %% for each instrumentation_scope and export together. - {keypos, #span.instrumentation_scope}]). - -export_spans(#data{exporter=Exporter, - resource=Resource, - table_1=Table1, - table_2=Table2, - reg_name=RegName}) -> - CurrentTable = ?CURRENT_TABLE(RegName), - case ets:info(CurrentTable, size) of - 0 -> - %% nothing to do if the table is empty - ok; - _ -> - NewCurrentTable = case CurrentTable of - Table1 -> - Table2; - Table2 -> - Table1 - end, - - %% an atom is a single word so this does not trigger a global GC - persistent_term:put(?CURRENT_TABLES_KEY(RegName), NewCurrentTable), - %% set the table to accept inserts - enable(RegName), - - Self = self(), - RunnerPid = erlang:spawn_link(fun() -> send_spans(Self, Resource, Exporter) end), - ets:give_away(CurrentTable, RunnerPid, export), - {CurrentTable, RunnerPid} - end. - -send_spans(FromPid, Resource, Exporter) -> - receive - {'ETS-TRANSFER', Table, FromPid, export} -> - export(Exporter, Resource, Table), - ets:delete(Table), - completed(FromPid) - end. - -completed(FromPid) -> - FromPid ! {completed, self()}. - -export(undefined, _, _) -> - true; -export({ExporterModule, Config}, Resource, SpansTid) -> - %% don't let a exporter exception crash us - %% and return true if exporter failed - try - otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config) =:= failed_not_retryable - catch - Kind:Reason:StackTrace -> - ?LOG_INFO(#{source => exporter, - during => export, - kind => Kind, - reason => Reason, - exporter => ExporterModule, - stacktrace => StackTrace}, #{report_cb => fun ?MODULE:report_cb/1}), - true - end. - -%% logger format functions -%% @private -report_cb(#{source := exporter, - during := export, - kind := Kind, - reason := Reason, - exporter := ExporterModule, - stacktrace := StackTrace}) -> - {"span exporter threw exception: exporter=~p ~ts", - [ExporterModule, otel_utils:format_exception(Kind, Reason, StackTrace)]}. +default_config() -> + #{max_queue_size => ?DEFAULT_MAX_QUEUE_SIZE, + exporting_timeout_ms => ?DEFAULT_EXPORTER_TIMEOUT_MS, + scheduled_delay_ms => ?DEFAULT_SCHEDULED_DELAY_MS, + exporter => ?DEFAULT_EXPORTER}. diff --git a/apps/opentelemetry/src/otel_exporter.erl b/apps/opentelemetry/src/otel_exporter.erl index 7a152532..5963b270 100644 --- a/apps/opentelemetry/src/otel_exporter.erl +++ b/apps/opentelemetry/src/otel_exporter.erl @@ -21,25 +21,34 @@ export_traces/4, export_metrics/4, export_logs/4, + export/5, shutdown/1, report_cb/1]). +-export_type([otel_signal/0, + exporter_config/0]). + %% Do any initialization of the exporter here and return configuration %% that will be passed along with a list of spans to the `export' function. --callback init(term()) -> {ok, term()} | ignore. +-callback init(Config) -> {ok, ExporterState} | {error, Reason} | ignore when + Config :: term(), + ExporterState :: term(), + Reason :: term(). %% This function is called when the configured interval expires with any %% spans that have been collected so far and the configuration returned in `init'. %% Do whatever needs to be done to export each span here, the caller will block %% until it returns. --callback export(traces | metrics, ets:tab(), otel_resource:t(), term()) -> ok | - success | - failed_not_retryable | - failed_retryable. --callback shutdown(term()) -> ok. +-callback export(otel_signal(), ets:tab(), otel_resource:t(), term()) -> ok | error | {error, term()}. + +-callback shutdown(State) -> ok when State :: term(). + +-type otel_signal() :: traces | metrics | logs. +-type exporter_config() :: module() | {module(), Config :: term()} | undefined | none | ignore. -include_lib("kernel/include/logger.hrl"). +-spec init(exporter_config()) -> {module(), term()} | error | ignore. init({ExporterModule, Config}) when is_atom(ExporterModule) -> try ExporterModule:init(Config) of {ok, ExporterState} when ExporterModule =:= opentelemetry_exporter -> @@ -49,8 +58,12 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) -> {ok, ExporterState} -> ?LOG_INFO("Exporter ~tp successfully initialized", [ExporterModule]), {ExporterModule, ExporterState}; + {error, Reason} -> + ?LOG_ERROR("Exporter failed to initalize, error: ~p", + [ExporterModule, Reason]), + error; ignore -> - undefined + ignore catch Kind:Reason:StackTrace -> %% logging in debug level since config argument in stacktrace could have secrets @@ -72,14 +85,14 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) -> %% the dependency needs to be added try grpcbox:module_info() of _ -> - undefined + error catch _:_ -> ?LOG_WARNING("OTLP exporter failed to initialize when using the GRPC " "protocol and `grpcbox` module is not available in the " "code path. Verify that you have the `grpcbox` dependency " "included and rerun.", []), - undefined + error end; _ -> %% same as the debug log above @@ -89,17 +102,17 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) -> kind => Kind, reason => Reason, exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}), - undefined + error end; {error, undef} when ExporterModule =:= opentelemetry_exporter -> ?LOG_WARNING("OTLP exporter module `opentelemetry_exporter` not found. " "Verify you have included the `opentelemetry_exporter` dependency.", [ExporterModule]), - undefined; + error; {error, undef} -> ?LOG_WARNING("Exporter module ~tp not found. Verify you have included " "the dependency that contains the exporter module.", [ExporterModule]), - undefined; + error; _ -> %% same as the debug log above %% without the stacktrace and at a higher level @@ -108,22 +121,25 @@ init({ExporterModule, Config}) when is_atom(ExporterModule) -> kind => Kind, reason => Reason, exporter => ExporterModule}, #{report_cb => fun ?MODULE:report_cb/1}), - undefined + error end end; -init(Exporter) when Exporter =:= none ; Exporter =:= undefined -> - undefined; +init(Exporter) when Exporter =:= none; Exporter =:= undefined; Exporter =:= ignore -> + ignore; init(ExporterModule) when is_atom(ExporterModule) -> init({ExporterModule, []}). -export_traces(ExporterModule, SpansTid, Resource, Config) -> - ExporterModule:export(traces, SpansTid, Resource, Config). +export_traces(ExporterModule, SpansTid, Resource, ExporterState) -> + export(traces, ExporterModule, SpansTid, Resource, ExporterState). + +export_metrics(ExporterModule, MetricsTid, Resource, ExporterState) -> + export(metrics, ExporterModule, MetricsTid, Resource, ExporterState). -export_metrics(ExporterModule, MetricsTid, Resource, Config) -> - ExporterModule:export(metrics, MetricsTid, Resource, Config). +export_logs(ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState) -> + export(logs, ExporterModule, LogsTidAndHandlerConfig, Resource, ExporterState). -export_logs(ExporterModule, Batch, Resource, Config) -> - ExporterModule:export(logs, Batch, Resource, Config). +export(OtelSignal, ExporterModule, Tid, Resource, ExporterState) -> + ExporterModule:export(OtelSignal, Tid, Resource, ExporterState). shutdown(undefined) -> ok; diff --git a/apps/opentelemetry/src/otel_exporter_pid.erl b/apps/opentelemetry/src/otel_exporter_pid.erl index 755f1410..5c2732c6 100644 --- a/apps/opentelemetry/src/otel_exporter_pid.erl +++ b/apps/opentelemetry/src/otel_exporter_pid.erl @@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Pid) -> ets:foldl(fun(Span, _Acc) -> Pid ! {span, Span} end, [], SpansTid), + ets:delete_all_objects(SpansTid), ok. shutdown(_) -> diff --git a/apps/opentelemetry/src/otel_exporter_stdout.erl b/apps/opentelemetry/src/otel_exporter_stdout.erl index 43a229e5..8fdc6b4e 100644 --- a/apps/opentelemetry/src/otel_exporter_stdout.erl +++ b/apps/opentelemetry/src/otel_exporter_stdout.erl @@ -31,6 +31,7 @@ export(_, SpansTid, _Resource, _) -> ets:foldl(fun(Span, _Acc) -> io:format("~p~n", [Span]) end, [], SpansTid), + ets:delete_all_objects(SpansTid), ok. shutdown(_) -> diff --git a/apps/opentelemetry/src/otel_exporter_tab.erl b/apps/opentelemetry/src/otel_exporter_tab.erl index 27e421a0..687484c9 100644 --- a/apps/opentelemetry/src/otel_exporter_tab.erl +++ b/apps/opentelemetry/src/otel_exporter_tab.erl @@ -30,6 +30,7 @@ export(traces, SpansTid, _Resource, Tid) -> ets:foldl(fun(Span, _Acc) -> ets:insert(Tid, Span) end, [], SpansTid), + ets:delete_all_objects(SpansTid), ok. shutdown(_) -> diff --git a/apps/opentelemetry/src/otel_simple_processor.erl b/apps/opentelemetry/src/otel_simple_processor.erl index 32495fe5..429a29e2 100644 --- a/apps/opentelemetry/src/otel_simple_processor.erl +++ b/apps/opentelemetry/src/otel_simple_processor.erl @@ -245,7 +245,7 @@ export({ExporterModule, Config}, Resource, SpansTid) -> %% don't let a exporter exception crash us %% and return true if exporter failed try - otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config) =:= failed_not_retryable + otel_exporter:export_traces(ExporterModule, SpansTid, Resource, Config) catch Kind:Reason:StackTrace -> ?LOG_INFO(#{source => exporter, diff --git a/apps/opentelemetry/src/otel_span_processor.erl b/apps/opentelemetry/src/otel_span_processor.erl index 44b84052..f0a0a711 100644 --- a/apps/opentelemetry/src/otel_span_processor.erl +++ b/apps/opentelemetry/src/otel_span_processor.erl @@ -29,8 +29,7 @@ -callback on_end(opentelemetry:span(), processor_config()) -> true | dropped | - {error, invalid_span} | - {error, no_export_buffer}. + {error, term()}. -callback force_flush(processor_config()) -> ok | {error, term()}. diff --git a/apps/opentelemetry/test/opentelemetry_SUITE.erl b/apps/opentelemetry/test/opentelemetry_SUITE.erl index c41db277..9befa264 100644 --- a/apps/opentelemetry/test/opentelemetry_SUITE.erl +++ b/apps/opentelemetry/test/opentelemetry_SUITE.erl @@ -65,7 +65,7 @@ init_per_testcase(disabled_sdk, Config) -> Config; init_per_testcase(no_exporter, Config) -> application:set_env(opentelemetry, processors, - [{otel_batch_processor, #{scheduled_delay_ms => 1}}]), + [{otel_batch_processor, #{scheduled_delay_ms => 1, exporter => none}}]), {ok, _} = application:ensure_all_started(opentelemetry), Config; init_per_testcase(disable_auto_creation, Config) -> @@ -119,6 +119,9 @@ init_per_testcase(too_many_attributes, Config) -> {ok, _} = application:ensure_all_started(opentelemetry), Config1; init_per_testcase(tracer_instrumentation_scope, Config) -> + %% Note: this actually mutes a bug / design drawback, as + %% persistent terms are not cleaned / properly refreshed and may keep stale data. + cleanup_persistent_terms(opentelemetry), Config1 = set_batch_tab_processor(Config), {ok, _} = application:ensure_all_started(opentelemetry), Config1; @@ -128,7 +131,8 @@ init_per_testcase(multiple_tracer_providers, Config) -> {ok, _} = application:ensure_all_started(opentelemetry), Config; init_per_testcase(multiple_processors, Config) -> - application:set_env(opentelemetry, processors, [{otel_batch_processor, #{scheduled_delay_ms => 1, + application:set_env(opentelemetry, processors, [{otel_batch_processor, #{name => first, + scheduled_delay_ms => 1, exporter => {otel_exporter_pid, self()}}}, {otel_batch_processor, #{name => second, scheduled_delay_ms => 1, @@ -268,14 +272,14 @@ logger_metadata(_Config) -> ok. -%% logger metadata will either be undefined, a map without the otel_span_ctx key or -%% with the value of the key being undefined +%% logger metadata will either be undefined, or a map without hex_span_ctx_keys: +%% [otel_trace_id, otel_span_id, otel_trace_flags] empty_metadata() -> case logger:get_process_metadata() of undefined -> true; M -> - maps:get(otel_span_ctx, M, #{}) =:= #{} + maps:with(otel_span:hex_span_ctx_keys(), M) =:= #{} end. propagator_configuration(_Config) -> @@ -589,7 +593,6 @@ update_span_data(Config) -> ok. - tracer_instrumentation_scope(Config) -> Tid = ?config(tid, Config), @@ -606,7 +609,6 @@ tracer_instrumentation_scope(Config) -> otel_span:end_span(SpanCtx1), [Span1] = assert_exported(Tid, SpanCtx1), - ?assertMatch({instrumentation_scope,<<"tracer1">>,<<"1.0.0">>,<<"http://schema.org/myschema">>}, Span1#span.instrumentation_scope). @@ -1080,21 +1082,16 @@ disabled_sdk(_Config) -> no_exporter(_Config) -> SpanCtx1 = ?start_span(<<"span-1">>), - - %% set_exporter will enable the export table even if the exporter ends - %% up being undefined to ensure no spans are lost. so briefly spans - %% will be captured - otel_batch_processor:set_exporter(none), otel_span:end_span(SpanCtx1), %% once the exporter is "initialized" the table is cleared and disabled %% future spans are not added - ?UNTIL([] =:= otel_batch_processor:current_tab_to_list(otel_batch_processor_global)), + ?UNTIL([] =:= otel_batch_olp:current_tab_to_list(otel_batch_processor_global)), SpanCtx2 = ?start_span(<<"span-2">>), otel_span:end_span(SpanCtx2), - ?assertEqual([], otel_batch_processor:current_tab_to_list(otel_batch_processor_global)), + ?assertEqual([], otel_batch_olp:current_tab_to_list(otel_batch_processor_global)), ok. @@ -1118,3 +1115,12 @@ assert_not_exported(Tid, #span_ctx{trace_id=TraceId, span_id=SpanId, _='_'})). +cleanup_persistent_terms(Module) -> + lists:foreach( + fun({Key, _}) -> + case is_tuple(Key) andalso element(1, Key) =:= Module of + true -> persistent_term:erase(Key); + false -> ok + end + end, + persistent_term:get()). diff --git a/apps/opentelemetry/test/otel_batch_processor_SUITE.erl b/apps/opentelemetry/test/otel_batch_processor_SUITE.erl index eef31f35..f6800e75 100644 --- a/apps/opentelemetry/test/otel_batch_processor_SUITE.erl +++ b/apps/opentelemetry/test/otel_batch_processor_SUITE.erl @@ -10,8 +10,8 @@ all() -> [exporting_timeout_test, - check_table_size_test, - exporting_runner_timeout_test]. + exporting_runner_timeout_test, + check_table_size_test]. %% verifies that after the runner has to be killed for taking too long %% that everything is still functional and the exporter does not crash @@ -33,39 +33,11 @@ exporting_timeout_test(_Config) -> ok end. -check_table_size_test(_Config) -> - MaxQueueSize = 10, - CheckTableSizeMs = 1, - {ok, _Pid, #{reg_name := RegName}} = otel_batch_processor:start_link( - #{name => test_processor_check_size_test, - resource => otel_resource:create([]), - exporter => ?MODULE, - exporting_timeout_ms => timer:minutes(10), - %% long enough, so that it never happens during the test - scheduled_delay_ms => timer:minutes(10), - check_table_size_ms => CheckTableSizeMs, - max_queue_size => MaxQueueSize} - ), - %% max_queue_size limit is not reached - true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}), - lists:foreach(fun(_) -> - otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}) - end, - lists:seq(1, MaxQueueSize)), - %% Wait for more than CheckTablesizeMS to be sure check timeout occurred - timer:sleep(CheckTableSizeMs * 5), - dropped = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}), - - otel_batch_processor:force_flush(#{reg_name => RegName}), - %% force_flush is async, have to wait for some long enough time again, - timer:sleep(CheckTableSizeMs * 10), - true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}). - exporting_runner_timeout_test(_Config) -> process_flag(trap_exit, true), - {ok, Pid, #{reg_name := RegName}} = otel_batch_processor:start_link( - #{name => test_processor1, + {ok, Pid, State} = otel_batch_processor:start_link( + #{name => test_processor, resource => otel_resource:create([]), exporter => ?MODULE, exporting_timeout_ms => 1, @@ -73,8 +45,8 @@ exporting_runner_timeout_test(_Config) -> %% Insert a few spans to make sure runner process will be spawned and killed %% because it hangs for 10 minutes (see export/4 below) - true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}), - true = otel_batch_processor:on_end(generate_span(), #{reg_name => RegName}), + true = otel_batch_processor:on_end(generate_span(), State), + true = otel_batch_processor:on_end(generate_span(), State), receive {'EXIT', Pid, _} -> @@ -85,9 +57,35 @@ exporting_runner_timeout_test(_Config) -> ok end. +check_table_size_test(_Config) -> + MaxQueueSize = 10, + {ok, _Pid, State} = otel_batch_processor:start_link( + #{name => test_processor_check_size_test, + resource => otel_resource:create([]), + exporter => ?MODULE, + exporting_timeout_ms => timer:minutes(10), + %% long enough, so that it never happens during the test + scheduled_delay_ms => timer:minutes(10), + max_queue_size => MaxQueueSize} + ), + %% max_queue_size limit is not reached + true = otel_batch_processor:on_end(generate_span(), State), + + insert_spans(State, MaxQueueSize), + + %% Wait a little to give the handler time to transition to the export state + timer:sleep(30), + + %% Insert the same number again, rgis time to the next table, as the previous is being exported, + %% exporter is slow (see init_per_testcase), so we can be sure that we will go to the drop mode, + %% with no chance to switch the table this time. + insert_spans(State, MaxQueueSize), + + dropped = otel_batch_processor:on_end(generate_span(), State). + %% exporter behaviour -init(_) -> +init(_OtelSignal, _ExporterId, _) -> {ok, []}. export(_, _, _, _) -> @@ -98,6 +96,10 @@ shutdown(_) -> %% helpers +insert_spans(State, N) -> + lists:foreach(fun(_) -> otel_batch_processor:on_end(generate_span(), State) end, + lists:seq(1, N)). + generate_span() -> #span{trace_id = otel_id_generator:generate_trace_id(), span_id = otel_id_generator:generate_span_id(), diff --git a/apps/opentelemetry_api/src/otel_ctx.erl b/apps/opentelemetry_api/src/otel_ctx.erl index 118fc1ee..79170e47 100644 --- a/apps/opentelemetry_api/src/otel_ctx.erl +++ b/apps/opentelemetry_api/src/otel_ctx.erl @@ -201,9 +201,5 @@ text_map_injector(Key, ToText) -> text_map_injector_fun(TextMap, Key, ToText) -> TextMap ++ ToText(?MODULE:get_value(Key, undefined)). -%% - -update_logger_process_metadata(undefined) -> - ok; update_logger_process_metadata(Ctx) -> otel_tracer:update_logger_process_metadata(Ctx). diff --git a/apps/opentelemetry_api/src/otel_span.erl b/apps/opentelemetry_api/src/otel_span.erl index 67b6b098..14675d8d 100644 --- a/apps/opentelemetry_api/src/otel_span.erl +++ b/apps/opentelemetry_api/src/otel_span.erl @@ -21,6 +21,7 @@ -export([trace_id/1, span_id/1, hex_span_ctx/1, + hex_span_ctx_keys/0, hex_trace_id/1, hex_span_id/1, tracestate/1, @@ -44,6 +45,9 @@ -include_lib("opentelemetry_semantic_conventions/include/trace.hrl"). -define(is_recording(SpanCtx), SpanCtx =/= undefined andalso SpanCtx#span_ctx.is_recording =:= true). +-define(OTEL_TRACE_ID, otel_trace_id). +-define(OTEL_SPAN_ID, otel_span_id). +-define(OTEL_TRACE_FLAGS, otel_trace_flags). -type start_opts() :: #{attributes := opentelemetry:attributes_map(), links := [opentelemetry:link()], @@ -108,18 +112,22 @@ span_id(#span_ctx{span_id=SpanId}) -> SpanId. %% keys are prefixed with `otel_' because the main use of this function is logger metadata --spec hex_span_ctx(opentelemetry:span_ctx() | undefined) -> #{otel_trace_id := unicode:charlist(), - otel_span_id := unicode:charlist(), - otel_trace_flags := unicode:charlist()} | #{}. +-spec hex_span_ctx(opentelemetry:span_ctx() | undefined) -> #{?OTEL_TRACE_ID := unicode:charlist(), + ?OTEL_SPAN_ID := unicode:charlist(), + ?OTEL_TRACE_FLAGS := unicode:charlist()} | #{}. hex_span_ctx(#span_ctx{trace_id=TraceId, span_id=SpanId, trace_flags=TraceFlags}) -> - #{otel_trace_id => io_lib:format("~32.16.0b", [TraceId]), - otel_span_id => io_lib:format("~16.16.0b", [SpanId]), - otel_trace_flags => case TraceFlags band 1 of 1 -> "01"; _ -> "00" end}; + #{?OTEL_TRACE_ID => io_lib:format("~32.16.0b", [TraceId]), + ?OTEL_SPAN_ID => io_lib:format("~16.16.0b", [SpanId]), + ?OTEL_TRACE_FLAGS => case TraceFlags band 1 of 1 -> "01"; _ -> "00" end}; hex_span_ctx(_) -> #{}. +-spec hex_span_ctx_keys() -> [atom()]. +hex_span_ctx_keys() -> + [?OTEL_TRACE_ID, ?OTEL_SPAN_ID, ?OTEL_TRACE_FLAGS]. + -spec hex_trace_id(opentelemetry:span_ctx()) -> opentelemetry:hex_trace_id(). hex_trace_id(#span_ctx{trace_id=TraceId}) -> case otel_utils:format_binary_string("~32.16.0b", [TraceId]) of diff --git a/apps/opentelemetry_api/src/otel_tracer.erl b/apps/opentelemetry_api/src/otel_tracer.erl index eccd2b08..17032ed6 100644 --- a/apps/opentelemetry_api/src/otel_tracer.erl +++ b/apps/opentelemetry_api/src/otel_tracer.erl @@ -127,8 +127,17 @@ current_span_ctx(Ctx) -> update_logger_process_metadata(Ctx) -> update_logger_process_metadata_from_span_ctx(current_span_ctx(Ctx)). -%% - +%% If the (previous) context is undefined, logger process metadata must be cleared +update_logger_process_metadata_from_span_ctx(undefined) -> + clear_logger_proces_metadata(); update_logger_process_metadata_from_span_ctx(SpanCtx) -> Metadata = otel_span:hex_span_ctx(SpanCtx), logger:update_process_metadata(Metadata). + +clear_logger_proces_metadata() -> + case logger:get_process_metadata() of + M when is_map(M) -> + logger:set_process_metadata(maps:without(otel_span:hex_span_ctx_keys(), M)); + _ -> + ok + end. diff --git a/apps/opentelemetry_api/src/otel_utils.erl b/apps/opentelemetry_api/src/otel_utils.erl index e76091f9..67addcbd 100644 --- a/apps/opentelemetry_api/src/otel_utils.erl +++ b/apps/opentelemetry_api/src/otel_utils.erl @@ -21,7 +21,8 @@ format_binary_string/2, format_binary_string/3, assert_to_binary/1, - unicode_to_binary/1]). + unicode_to_binary/1, + stack_without_args/1]). -if(?OTP_RELEASE >= 24). format_exception(Kind, Reason, StackTrace) -> @@ -56,3 +57,11 @@ unicode_to_binary(String) -> _ -> {error, bad_binary_conversion} end. + +%% Args may contain sensitive data +stack_without_args([{M, F, Args, Info} | T]) when is_list(Args) -> + [{M, F, length(Args), Info} | stack_without_args(T)]; +stack_without_args([StItem | T] ) -> + [StItem | stack_without_args(T)]; +stack_without_args([]) -> + []. diff --git a/apps/opentelemetry_experimental/src/otel_log_handler.erl b/apps/opentelemetry_experimental/src/otel_log_handler.erl index db023e4e..f6adb989 100644 --- a/apps/opentelemetry_experimental/src/otel_log_handler.erl +++ b/apps/opentelemetry_experimental/src/otel_log_handler.erl @@ -1,5 +1,5 @@ %%%------------------------------------------------------------------------ -%% Copyright 2022, OpenTelemetry Authors +%% Copyright 2022-2023, OpenTelemetry Authors %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at @@ -12,239 +12,158 @@ %% See the License for the specific language governing permissions and %% limitations under the License. %% -%% @doc +%% @doc Specification: https://opentelemetry.io/docs/specs/otel/logs/sdk %% @end %%%------------------------------------------------------------------------- -module(otel_log_handler). --behaviour(gen_statem). - -include_lib("kernel/include/logger.hrl"). -include_lib("opentelemetry_api/include/opentelemetry.hrl"). --export([start_link/2]). - +%% Logger handler -export([log/2, adding_handler/1, removing_handler/1, - changing_config/3, - filter_config/1, - report_cb/1]). + changing_config/3]). + +%% OpenTelemetry specific +-export([force_flush/1]). --export([init/1, - callback_mode/0, - idle/3, - exporting/3, - handle_event/3]). +-export_type([config/0, + otel_log_handler_config/0]). -type config() :: #{id => logger:handler_id(), - regname := atom(), - config => term(), level => logger:level() | all | none, module => module(), filter_default => log | stop, filters => [{logger:filter_id(), logger:filter()}], - formatter => {module(), logger:formatter_config()}}. + formatter => {module(), logger:formatter_config()}, + config => otel_log_handler_config() + }. + +-type config_state() :: #{id => logger:handler_id(), + level => logger:level() | all | none, + module => module(), + filter_default => log | stop, + filters => [{logger:filter_id(), logger:filter()}], + formatter => {module(), logger:formatter_config()}, + config := otel_batch_olp:otel_batch_olp_state() + }. + +-type otel_log_handler_config() :: + #{max_queue_size => otel_batch_olp:max_queue_size(), + exporting_timeout_ms => otel_batch_olp:otel_timeout_ms(), + scheduled_delay_ms => otel_batch_olp:otel_timeout_ms(), + exporter => otel_exporter:exporter_config()}. + +-define(SUP, opentelemetry_experimental_sup). --define(DEFAULT_CALL_TIMEOUT, 5000). -define(DEFAULT_MAX_QUEUE_SIZE, 2048). --define(DEFAULT_SCHEDULED_DELAY_MS, timer:seconds(5)). --define(DEFAULT_EXPORTER_TIMEOUT_MS, timer:minutes(5)). +-define(DEFAULT_SCHEDULED_DELAY_MS, timer:seconds(1)). +-define(DEFAULT_EXPORTER_TIMEOUT_MS, timer:seconds(30)). +-define(DEFAULT_EXPORTER_MODULE, opentelemetry_exporter). +-define(DEFAULT_EXPORTER, {?DEFAULT_EXPORTER_MODULE, #{protocol => grpc}}). + +%% Slightly higher than GRACE_SHUTDOWN_MS, so that the supervisor doesn't kill the handler too early +-define(SUP_SHUTDOWN_MS, 5500). +-define(GRACE_SHUTDOWN_MS, 5000). + +%%-------------------------------------------------------------------- +%% Logger handler callbacks +%%-------------------------------------------------------------------- + +-spec adding_handler(config()) -> {ok, config_state()} | {error, term()}. +adding_handler(#{id := Id}=Config) -> + HandlerConfig = maps:merge(default_config(), maps:get(config, Config, #{})), + RegName = ?REG_NAME(Id), + OtelBatchOlpConfig = HandlerConfig#{reg_name => RegName, + cb_module => ?MODULE, + otel_signal => logs}, + + case otel_batch_olp:init_conf(OtelBatchOlpConfig) of + {ok, OlpState} -> + %% logger conf must keep all the fields of otel_batch_olp conf, + %% as it includes table names, reg_name and atomic ref. + Config1 = Config#{config => OlpState}, + start(Id, OlpState, Config1); + Err -> + Err + end. --define(name_to_reg_name(Module, Id), - list_to_atom(lists:concat([Module, "_", Id]))). +-spec changing_config(SetOrUpdate, OldConfigState, NewConfig) -> + {ok, NewConfigState} | {error, Reason} when + SetOrUpdate :: set | update, + OldConfigState :: config_state(), + NewConfig :: config(), + NewConfigState :: config_state(), + Reason :: term(). +changing_config(SetOrUpdate, #{config := #{reg_name := _} = OlpState}, NewConfig) -> + NewOlpConfig = maps:get(config, NewConfig, #{}), + Default = case SetOrUpdate of + update -> OlpState; + set -> default_config() + end, + NewOlpConfig1 = maps:merge(with_changeable_fields(Default), NewOlpConfig), + %% NewConfig which is `logger:handler_config()` is already merged with either old config or default, + %% depending on `SetOrUpdate` value + case otel_batch_olp:change_config(OlpState, NewOlpConfig1, NewConfig) of + {ok, NewOlpState} -> + {ok, NewConfig#{config => NewOlpState}}; + Err -> + Err + end. --record(data, {exporter :: {module(), term()} | undefined, - exporter_config :: {module(), term()} | undefined, - resource :: otel_resource:t(), +-spec removing_handler(config_state()) -> ok | {error, term()}. +removing_handler(_Config=#{id := Id}) -> + Res = supervisor:terminate_child(?SUP, Id), + _ = supervisor:delete_child(?SUP, Id), + Res. - runner_pid :: pid() | undefined, - max_queue_size :: integer() | infinity, - exporting_timeout_ms :: integer(), - scheduled_delay_ms :: integer(), +-spec log(LogEvent, Config) -> true | dropped | {error, term()} when + LogEvent :: logger:log_event(), + Config :: config_state(). +log(LogEvent, #{config := OlpConfig}) -> + Ts = case LogEvent of + #{meta := #{time := Time}} -> Time; + _ -> logger:timestamp() + end, + otel_batch_olp:insert_signal({Ts, LogEvent}, OlpConfig). - config :: #{}, - batch :: #{opentelemetry:instrumentation_scope() => [logger:log_event()]}}). +-spec force_flush(config_state()) -> ok. +force_flush(#{config := OlpConfig}) -> + otel_batch_olp:force_flush(OlpConfig). -start_link(RegName, Config) -> - gen_statem:start_link({local, RegName}, ?MODULE, [RegName, Config], []). +%%-------------------------------------------------------------------- +%% Internal functions +%%-------------------------------------------------------------------- --spec adding_handler(Config) -> {ok, Config} | {error, Reason} when - Config :: config(), - Reason :: term(). -adding_handler(#{id := Id, - module := Module}=Config) -> - RegName = ?name_to_reg_name(Module, Id), +start(Id, OtelBatchOlpState, Config) -> ChildSpec = #{id => Id, - start => {?MODULE, start_link, [RegName, Config]}, - restart => temporary, - shutdown => 2000, + start => {otel_batch_olp, start_link, [OtelBatchOlpState, Config]}, + %% The handler must be stopped gracefully by calling `logger:remove_handler/1`, + %% which calls `supervisor:terminate_child/2` (in `removing_handler/2` cb). + %% Any other termination is abnormal and deserves a restart. + restart => permanent, + shutdown => ?SUP_SHUTDOWN_MS, type => worker, modules => [?MODULE]}, - case supervisor:start_child(opentelemetry_experimental_sup, ChildSpec) of + case supervisor:start_child(?SUP, ChildSpec) of {ok, _Pid} -> - %% ok = logger_handler_watcher:register_handler(Name,Pid), - %% OlpOpts = logger_olp:get_opts(Olp), - {ok, Config#{regname => RegName}}; + {ok, Config}; {error, {Reason, Ch}} when is_tuple(Ch), element(1, Ch) == child -> {error, Reason}; {error, _Reason}=Error -> Error end. -%%%----------------------------------------------------------------- -%%% Updating handler config --spec changing_config(SetOrUpdate, OldConfig, NewConfig) -> - {ok,Config} | {error,Reason} when - SetOrUpdate :: set | update, - OldConfig :: config(), - NewConfig :: config(), - Config :: config(), - Reason :: term(). -changing_config(SetOrUpdate, OldConfig, NewConfig=#{regname := Id}) -> - gen_statem:call(Id, {changing_config, SetOrUpdate, OldConfig, NewConfig}). - -%%%----------------------------------------------------------------- -%%% Handler being removed --spec removing_handler(Config) -> ok when - Config :: config(). -removing_handler(Config=#{regname := Id}) -> - gen_statem:call(Id, {removing_handler, Config}). - -%%%----------------------------------------------------------------- -%%% Log a string or report --spec log(LogEvent, Config) -> ok when - LogEvent :: logger:log_event(), - Config :: config(). -log(LogEvent, _Config=#{regname := Id}) -> - Scope = case LogEvent of - #{meta := #{otel_scope := Scope0=#instrumentation_scope{}}} -> - Scope0; - #{meta := #{mfa := {Module, _, _}}} -> - opentelemetry:get_application_scope(Module); - _ -> - opentelemetry:instrumentation_scope(<<>>, <<>>, <<>>) - end, - - gen_statem:cast(Id, {log, Scope, LogEvent}). - -%%%----------------------------------------------------------------- -%%% Remove internal fields from configuration --spec filter_config(Config) -> Config when - Config :: config(). -filter_config(Config=#{regname := Id}) -> - gen_statem:call(Id, {filter_config, Config}). - -init([_RegName, Config]) -> - process_flag(trap_exit, true), - - Resource = otel_resource_detector:get_resource(), - - SizeLimit = maps:get(max_queue_size, Config, ?DEFAULT_MAX_QUEUE_SIZE), - ExportingTimeout = maps:get(exporting_timeout_ms, Config, ?DEFAULT_EXPORTER_TIMEOUT_MS), - ScheduledDelay = maps:get(scheduled_delay_ms, Config, ?DEFAULT_SCHEDULED_DELAY_MS), - - ExporterConfig = maps:get(exporter, Config, {opentelemetry_exporter, #{protocol => grpc}}), - - {ok, idle, #data{exporter=undefined, - exporter_config=ExporterConfig, - resource=Resource, - config=Config, - max_queue_size=case SizeLimit of - infinity -> infinity; - _ -> SizeLimit div erlang:system_info(wordsize) - end, - exporting_timeout_ms=ExportingTimeout, - scheduled_delay_ms=ScheduledDelay, - batch=#{}}}. - -callback_mode() -> - [state_functions, state_enter]. - -idle(enter, _OldState, Data=#data{exporter=undefined, - exporter_config=ExporterConfig, - scheduled_delay_ms=SendInterval}) -> - Exporter = init_exporter(ExporterConfig), - {keep_state, Data#data{exporter=Exporter}, - [{{timeout, export_logs}, SendInterval, export_logs}]}; -idle(enter, _OldState, #data{scheduled_delay_ms=SendInterval}) -> - {keep_state_and_data, [{{timeout, export_logs}, SendInterval, export_logs}]}; -idle(_, export_logs, Data=#data{exporter=undefined, - exporter_config=ExporterConfig}) -> - Exporter = init_exporter(ExporterConfig), - {next_state, exporting, Data#data{exporter=Exporter}, [{next_event, internal, export}]}; -idle(_, export_logs, Data) -> - {next_state, exporting, Data, [{next_event, internal, export}]}; -idle(EventType, EventContent, Data) -> - handle_event(EventType, EventContent, Data). - -exporting({timeout, export_logs}, export_logs, _) -> - {keep_state_and_data, [postpone]}; -exporting(enter, _OldState, _Data) -> - keep_state_and_data; -exporting(internal, export, Data=#data{exporter=Exporter, - resource=Resource, - config=Config, - batch=Batch}) when map_size(Batch) =/= 0 -> - _ = export(Exporter, Resource, Batch, Config), - {next_state, idle, Data#data{batch=#{}}}; -exporting(EventType, EventContent, Data) -> - handle_event(EventType, EventContent, Data). - -handle_event({call, From}, {changing_config, _SetOrUpdate, _OldConfig, NewConfig}, Data) -> - {keep_state, Data#data{config=NewConfig}, [{reply, From, NewConfig}]}; -handle_event({call, From}, {removing_handler, Config}, _Data) -> - %% TODO: flush - {keep_state_and_data, [{reply, From, Config}]}; -handle_event({call, From}, {filter_handler, Config}, Data) -> - {keep_state, Data, [{reply, From, Config}]}; -handle_event({call, From}, {filter_config, Config}, Data) -> - {keep_state, Data, [{reply, From, Config}]}; -handle_event({call, _From}, _Msg, _Data) -> - keep_state_and_data; -handle_event(cast, {log, Scope, LogEvent}, Data=#data{batch=Logs}) -> - {keep_state, Data#data{batch=maps:update_with(Scope, fun(V) -> - [LogEvent | V] - end, [LogEvent], Logs)}}; -handle_event(_, _, _) -> - keep_state_and_data. - -%% - -init_exporter(ExporterConfig) -> - case otel_exporter:init(ExporterConfig) of - Exporter when Exporter =/= undefined andalso Exporter =/= none -> - Exporter; - _ -> - undefined - end. - -export(undefined, _, _, _) -> - true; -export({ExporterModule, ExporterConfig}, Resource, Batch, Config) -> - %% don't let a exporter exception crash us - %% and return true if exporter failed - try - otel_exporter:export_logs(ExporterModule, {Batch, Config}, Resource, ExporterConfig) - =:= failed_not_retryable - catch - Kind:Reason:StackTrace -> - ?LOG_WARNING(#{source => exporter, - during => export, - kind => Kind, - reason => Reason, - exporter => ExporterModule, - stacktrace => StackTrace}, #{report_cb => fun ?MODULE:report_cb/1}), - true - end. +default_config() -> + #{max_queue_size => ?DEFAULT_MAX_QUEUE_SIZE, + exporting_timeout_ms => ?DEFAULT_EXPORTER_TIMEOUT_MS, + scheduled_delay_ms => ?DEFAULT_SCHEDULED_DELAY_MS, + shutdown_timeout_ms => ?GRACE_SHUTDOWN_MS, + exporter => ?DEFAULT_EXPORTER}. -%% logger format functions -report_cb(#{source := exporter, - during := export, - kind := Kind, - reason := Reason, - exporter := ExporterModule, - stacktrace := StackTrace}) -> - {"log exporter threw exception: exporter=~p ~ts", - [ExporterModule, otel_utils:format_exception(Kind, Reason, StackTrace)]}. +%% Select fields that are allowed to be changed +with_changeable_fields(Config) -> + maps:with([max_queue_size, scheduled_delay_ms], Config). diff --git a/apps/opentelemetry_experimental/src/otel_otlp_logs.erl b/apps/opentelemetry_experimental/src/otel_otlp_logs.erl index beccff71..42df3b1d 100644 --- a/apps/opentelemetry_experimental/src/otel_otlp_logs.erl +++ b/apps/opentelemetry_experimental/src/otel_otlp_logs.erl @@ -31,41 +31,77 @@ -define(IS_STRING(String), (is_list(String) orelse is_binary(String))). -to_proto(Logs, Resource, Config) -> - InstrumentationScopeLogs = to_proto_by_instrumentation_scope(Logs, Config), - Attributes = otel_resource:attributes(Resource), - ResourceLogs = #{resource => #{attributes => otel_otlp_common:to_attributes(Attributes), - dropped_attributes_count => otel_attributes:dropped(Attributes)}, - scope_logs => InstrumentationScopeLogs}, - case otel_resource:schema_url(Resource) of - undefined -> - #{resource_logs => [ResourceLogs]}; - SchemaUrl -> - #{resource_logs => [ResourceLogs#{schema_url => SchemaUrl}]} - end. - +-define(DEFAULT_SCOPE, opentelemetry:instrumentation_scope(<<>>, <<>>, <<>>)). -to_proto_by_instrumentation_scope(Logs, Config) -> - ScopeLogs = logs_by_scope(Logs, Config), - maps:fold(fun(Scope, LogRecords, Acc) -> - [#{scope => otel_otlp_common:to_instrumentation_scope_proto(Scope), - log_records => LogRecords - %% schema_url => unicode:chardata() % = 3, optional - } | Acc] - end, [], ScopeLogs). +-spec to_proto(ets:table(), otel_resource:t(), logger:handler_config()) -> + opentelemetry_exporter_logs_service_pb:export_logs_service_request() | empty. +to_proto(Tab, Resource, LogHandlerConfig) -> + case to_proto_by_instrumentation_scope(Tab, LogHandlerConfig) of + [] -> + empty; + InstrumentationScopeLogs -> + Attributes = otel_resource:attributes(Resource), + ResourceLogs = #{resource => #{attributes => otel_otlp_common:to_attributes(Attributes), + dropped_attributes_count => otel_attributes:dropped(Attributes)}, + scope_logs => InstrumentationScopeLogs}, + case otel_resource:schema_url(Resource) of + undefined -> + #{resource_logs => [ResourceLogs]}; + SchemaUrl -> + #{resource_logs => [ResourceLogs#{schema_url => SchemaUrl}]} + end + end. +to_proto_by_instrumentation_scope(Tab, LogHandlerConfig) -> + %% Even though during the export log events are being inserted to another table, + %% some late writes to the table being exported are still possible. + %% Thus, we can't lookup all the log events from the table and then let otel_log_handler + %% delete the table completely and re-create it as it will imply the risk of losing those (possible) late writes. + %% So, we fix the table and traverse it with ets:take/2. After that, the late writes won't be exported, + %% but they will be kept in the table and ready to be exported in the next exporter runs. + true = ets:safe_fixtable(Tab, true), + try + to_proto_by_instrumentation_scope(Tab, ets:first(Tab), LogHandlerConfig, #{}, ?DEFAULT_SCOPE) + after + _ = ets:safe_fixtable(Tab, false) + end. -logs_by_scope(ScopeLogs, Config) -> - maps:fold(fun(InstrumentationScope, Logs, Acc) -> - LogRecords = [log_record(Log, Config) || Log <- Logs], - Acc#{InstrumentationScope => LogRecords} - end, #{}, ScopeLogs). +to_proto_by_instrumentation_scope(_Tab, '$end_of_table', _Config, ScopeAcc, _DefaultScope) -> + maps:fold( + fun(Scope, LogRecords, Acc) -> + ScopeProto = otel_otlp_common:to_instrumentation_scope_proto(Scope), + [ScopeProto#{log_records => LogRecords} | Acc] + end, + [], + ScopeAcc); +to_proto_by_instrumentation_scope(Tab, Key, LogHandlerConfig, ScopeAcc, DefaultScope) -> + ScopeAcc1 = lists:foldl( + fun({_Ts, LogEvent}, Acc) -> + Scope = scope(LogEvent, DefaultScope), + LogRecord = log_record(LogEvent, LogHandlerConfig), + maps:update_with(Scope, + fun(Logs) -> [LogRecord | Logs] end, + [LogRecord], + Acc) + end, + ScopeAcc, + ets:take(Tab,Key)), + Key1 = ets:next(Tab, Key), + to_proto_by_instrumentation_scope(Tab, Key1, LogHandlerConfig, ScopeAcc1, DefaultScope). +scope(LogEvent, Default) -> + case LogEvent of + #{meta := #{otel_scope := Scope0=#instrumentation_scope{}}} -> + Scope0; + #{meta := #{mfa := {Module, _, _}}} -> + opentelemetry:get_application_scope(Module); + _ -> + Default + end. log_record(#{level := Level, msg := Body, - meta := Metadata=#{time := ObservedTime}}, Config) -> - Time = opentelemetry:timestamp(), + meta := Metadata=#{time := Time}}, Config) -> {SeverityNumber, SeverityText} = level_to_severity(Level), Body1 = case format_msg(Body, Metadata, Config) of S when ?IS_STRING(S) -> @@ -87,19 +123,23 @@ log_record(#{level := Level, DroppedAttributesCount = maps:size(Attributes) - length(Attributes1), Flags = 0, - LogRecord = case Metadata of + LogRecord = case Metadata of #{otel_trace_id := TraceId, otel_span_id := SpanId} -> - #{trace_id => TraceId, - span_id => SpanId}; + #{trace_id => from_hex_str(TraceId, 128), + span_id => from_hex_str(SpanId, 64)}; _ -> #{} end, + %% Time produced by logger, the unit is microsecond: + %% https://www.erlang.org/doc/man/logger#timestamp-0 + TimeNano = erlang:convert_time_unit(Time, microsecond, nanosecond), - - LogRecord#{time_unix_nano => opentelemetry:timestamp_to_nano(Time), - observed_time_unix_nano => erlang:convert_time_unit(ObservedTime, microsecond, nanosecond), + LogRecord#{time_unix_nano => TimeNano, + %% Setting the same logger time to both fields, acc. to the specification: + %% https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-observedtimestamp + observed_time_unix_nano => TimeNano, severity_number => SeverityNumber, severity_text => SeverityText, body => otel_otlp_common:to_any_value(Body1), @@ -108,8 +148,13 @@ log_record(#{level := Level, flags => Flags }. +from_hex_str(Str, Size) -> + B = iolist_to_binary(Str), + <<(binary_to_integer(B, 16)):Size>>. + format_msg({string, Chardata}, Meta, Config) -> format_msg({"~ts", [Chardata]}, Meta, Config); +%% TODO: check it report_cb is formatter config format_msg({report,_}=Msg, Meta, #{report_cb := Fun}=Config) when is_function(Fun,1); is_function(Fun,2) -> format_msg(Msg, Meta#{report_cb => Fun}, maps:remove(report_cb,Config)); diff --git a/apps/opentelemetry_experimental/test/otel_log_handler_SUITE.erl b/apps/opentelemetry_experimental/test/otel_log_handler_SUITE.erl new file mode 100644 index 00000000..d3bf169d --- /dev/null +++ b/apps/opentelemetry_experimental/test/otel_log_handler_SUITE.erl @@ -0,0 +1,654 @@ +-module(otel_log_handler_SUITE). + +-compile(export_all). +-compile(nowarn_export_all). + +-include_lib("stdlib/include/assert.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-define(OTEL_LOG_HANDLER, otel_log_handler). +-define(LOG_MSG, "otel_log_hanlder_SUITE test, please ignore it"). +-define(TRACEPARENT, "00-0226551413cd73a554184b324c82ad51-b7ad6b71432023a2-01"). + +all() -> + [crud_test, + exporting_runner_timeout_test, + check_max_queue_test, + export_max_queue_size_success_test, + scheduled_export_success_test, + flush_on_terminate_test, + sanity_end_to_end_test, + overload_protection_slow_exporter_test, + overload_protection_fast_exporter_test, + late_writes_test, + late_writes_on_terminate_test, + no_exporter_test, + retry_exporter_init_test, + default_opentelemetry_exporter_test, + exporter_exit_test, + disabled_exporter_test]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_testcase(exporting_runner_timeout_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{sleep => infinity}}, + HandlerConf = #{config => #{exporter => ExporterConf, + scheduled_delay_ms => 10, + exporting_timeout_ms => 1}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(check_max_queue_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{sleep => timer:seconds(30)}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:minutes(1), + scheduled_delay_ms => timer:minutes(10), + max_queue_size => 10}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(export_max_queue_size_success_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{reply_to => self()}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => 100, + scheduled_delay_ms => timer:minutes(10), + max_queue_size => 10}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(scheduled_export_success_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{reply_to => self()}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => 10, + scheduled_delay_ms => 5}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(flush_on_terminate_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{reply_to => self()}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => 10, + %% high values to make sure nothing is exported + %% before terminating the handler + scheduled_delay_ms => timer:minutes(10), + max_queue_size => 10000}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(sanity_end_to_end_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{otlp => true, reply_to => self()}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => 1000, + %% setting scheduled delay high enough, + %% so that it doesn't occur during the test, + scheduled_delay_ms => timer:minutes(10), + %% the test will produce and expect 5 log events to be exporterd + max_queue_size => 5}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(overload_protection_slow_exporter_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{sleep => timer:seconds(10), clean_tab => true}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:seconds(30), + max_queue_size => 500, + scheduled_delay_ms => 5}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(overload_protection_fast_exporter_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{clean_tab => true}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:seconds(30), + max_queue_size => 500, + scheduled_delay_ms => 5}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(late_writes_test = TC, Config) -> + Config1 = common_testcase_init(Config), + Key = opentelemetry:instrumentation_scope(<<>>, <<>>, <<>>), + LateWrites = [{Key, log_event()} || _ <- lists:seq(1,5)], + ExporterConf = {?MODULE, #{reply_to => self(), late_writes => LateWrites}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:seconds(5), + %% high enough, as the test relies on dummy exporter replies order + scheduled_delay_ms => 50}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC}, {late_writes, LateWrites} | Config1]; +init_per_testcase(late_writes_on_terminate_test = TC, Config) -> + Config1 = common_testcase_init(Config), + Key = opentelemetry:instrumentation_scope(<<>>, <<>>, <<>>), + LateWrites = [{Key, log_event()} || _ <- lists:seq(1,5)], + ExporterConf = {?MODULE, #{reply_to => self(), late_writes => LateWrites}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:seconds(5), + %% very high, so that export doesn't happen until termination + scheduled_delay_ms => 50, + %% this must be reached during the test to trigger export + %% and late writes insertion + max_queue_size => 10}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC}, {late_writes, LateWrites} | Config1]; +init_per_testcase(no_exporter_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{undefined_exporter => true}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:seconds(5), + max_queue_size => 500, + scheduled_delay_ms => 5}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(retry_exporter_init_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterConf = {?MODULE, #{retries => 5, + counter => atomics:new(1, []), + reply_to => self()}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:seconds(5), + scheduled_delay_ms => 200}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(default_opentelemetry_exporter_test = TC, Config) -> + Config1 = common_testcase_init(Config), + %% no exporter, relying on the default opentelemtry GRPC exporter + HandlerConf = #{config => #{exporting_timeout_ms => timer:seconds(5), + scheduled_delay_ms => 10}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(exporter_exit_test = TC, Config) -> + Config1 = common_testcase_init(Config), + ExporterSleep = timer:seconds(2), + ExporterConf = {?MODULE, #{spawn_link => true, sleep => ExporterSleep, success => true}}, + HandlerConf = #{config => #{exporter => ExporterConf, + exporting_timeout_ms => timer:seconds(5), + scheduled_delay_ms => 10}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC}, {exporter_sleep, ExporterSleep} | Config1]; +init_per_testcase(disabled_exporter_test = TC, Config) -> + Config1 = common_testcase_init(Config), + HandlerConf = #{config => #{exporter => ignore, scheduled_delay_ms => 1}}, + ok = logger:add_handler(TC, ?OTEL_LOG_HANDLER, HandlerConf), + [{handler_id, TC} | Config1]; +init_per_testcase(_TC, Config) -> + common_testcase_init(Config). + +end_per_testcase(_TC, Config) -> + case ?config(handler_id, Config) of + undefined -> ok; + Id -> _ = logger:remove_handler(Id) + end, + _ = common_testcase_cleanup(Config), + ok. + +crud_test(_Config) -> + ExporterConf = {?MODULE, #{success => true}}, + HandlerConf = #{config => #{exporter => ExporterConf}}, + ok = logger:add_handler(otel_handler_test, ?OTEL_LOG_HANDLER, HandlerConf), + ok = logger:add_handler(otel_handler_test1, ?OTEL_LOG_HANDLER, HandlerConf), + ok = logger:remove_handler(otel_handler_test1), + {ok, #{config := #{reg_name := RegName}}} = logger:get_handler_config(otel_handler_test), + true = erlang:is_process_alive(erlang:whereis(RegName)), + + %% Not allowed changes + InvalidHandlerConf = #{config => #{reg_name => new_reg_name}}, + InvalidHandlerConf1 = #{config => #{exporter => {new_module, #{}}}}, + InvalidHandlerConf2 = #{config => #{atomic_ref => new_atomic_ref}}, + InvalidHandlerConf3 = #{config => #{tables => {new_tab1, new_tab2}}}, + InvalidHandlerConf4 = #{config => #{exporter => ExporterConf ,exporting_timeout_ms => 5555}}, + {error, {reg_name, _}} = logger:set_handler_config(otel_handler_test, InvalidHandlerConf), + {error, {exporter, _}} = logger:set_handler_config(otel_handler_test, InvalidHandlerConf1), + {error, {reg_name, _}} = logger:update_handler_config(otel_handler_test, InvalidHandlerConf), + {error, {exporter, _}} = logger:update_handler_config(otel_handler_test, InvalidHandlerConf1), + {error, {exporter, _}} = logger:update_handler_config(otel_handler_test, config, + #{exporter => {new_module, #{}}}), + {error, {atomic_ref, _}} = logger:set_handler_config(otel_handler_test, InvalidHandlerConf2), + {error, {tables, _}} = logger:set_handler_config(otel_handler_test, InvalidHandlerConf3), + {error, {exporting_timeout_ms, _}} = logger:set_handler_config(otel_handler_test, + InvalidHandlerConf4), + {error, {atomic_ref, _}} = logger:update_handler_config(otel_handler_test, InvalidHandlerConf2), + {error, {tables, _}} = logger:update_handler_config(otel_handler_test, InvalidHandlerConf3), + {error, {exporting_timeout_ms, _}} = logger:update_handler_config(otel_handler_test, + InvalidHandlerConf4), + + NewValidConf = #{max_queue_size => infinity, + scheduled_delay_ms => 3000}, + ok = logger:update_handler_config(otel_handler_test, #{config => NewValidConf}), + ok = logger:update_handler_config(otel_handler_test, config, NewValidConf), + ok = logger:set_handler_config(otel_handler_test, #{config => NewValidConf}), + ok = logger:set_handler_config(otel_handler_test, level, debug), + ok = logger:set_handler_config(otel_handler_test, level, critical), + + %% Allowed but invalid values + NewInvalidConf = #{max_queue_size => -100, scheduled_delay_ms => "string"}, + {error, [_|_]} = logger:update_handler_config(otel_handler_test, #{config => NewInvalidConf}), + {error, [_|_]} = logger:update_handler_config(otel_handler_test, config, NewInvalidConf), + {error, [_|_]} = logger:set_handler_config(otel_handler_test, #{config => NewInvalidConf}), + NewInvalidConf1 = #{unknown_opt => 100}, + {error, [_|_]} = logger:update_handler_config(otel_handler_test, #{config => NewInvalidConf1}), + {error, [_|_]} = logger:update_handler_config(otel_handler_test, config, NewInvalidConf1), + {error, [_|_]} = logger:set_handler_config(otel_handler_test, #{config => NewInvalidConf1}), + + ok = logger:remove_handler(otel_handler_test), + + {error, _} = logger:add_handler(otel_handler_test, ?OTEL_LOG_HANDLER, #{config => NewInvalidConf}), + {error, _} = logger:add_handler(otel_handler_test, ?OTEL_LOG_HANDLER, #{config => NewInvalidConf1}). + +exporting_runner_timeout_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{scheduled_delay_ms := Delay, + reg_name := RegName}} = HandlerConf} = logger:get_handler_config(HandlerId), + + Mon = erlang:monitor(process, RegName), + + %% Insert a few log events to make sure runner process will be spawned and killed + %% because it hangs forever (see `init_per_testcase/2` + %% and exporter behaviour defined in this module + true = ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf), + true = ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf), + + %% Enough time to be sure export is triggered and runner killed + Timeout = Delay * 10, + receive + {'DOWN', Mon, process, _Pid, _} -> + %% test is to ensure we don't hit this + ct:fail(otel_log_handler_crash) + after Timeout -> + ok + end. + +check_max_queue_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{max_queue_size := MaxQueueSize}} = HandlerConf} = + logger:get_handler_config(HandlerId), + + true = ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf), + true = ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf), + + insert_events(HandlerConf, MaxQueueSize), + + %% Wait a little to give the handler time to transition to the export state + timer:sleep(100), + + %% Insert the same number again, rgis time to the next table, as the previous is being exported, + %% exporter is slow (see init_per_testcase), so we can be sure that we will go to the drop mode, + %% with no chance to switch the table this time. + insert_events(HandlerConf, MaxQueueSize), + + dropped = ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf). + +export_max_queue_size_success_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{max_queue_size := MaxSize}} = HandlerConf} = + logger:get_handler_config(HandlerId), + + erlang:spawn(fun() -> insert_events(HandlerConf, MaxSize) end), + %% Export must be triggered by reaching max_export_batch_size because + %% scheduled_delay_ms is deliberately set to a high value + receive + {exported, Size} -> + ?assertEqual(MaxSize, Size) + after 5000 -> + ct:fail(otel_log_handler_exporter_failed) + end. + +scheduled_export_success_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, HandlerConf} = logger:get_handler_config(HandlerId), + + LogsNum = 10, + erlang:spawn(fun() -> insert_events(HandlerConf, LogsNum) end), + + receive + {exported, Size} -> + ?assertEqual(LogsNum, Size) + after 5000 -> + ct:fail(otel_log_handler_exporter_failed) + end. + +flush_on_terminate_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{reg_name := RegName}} = HandlerConf} = logger:get_handler_config(HandlerId), + + LogsNum = 15, + insert_events(HandlerConf, LogsNum), + + ?assert(erlang:is_pid(erlang:whereis(RegName))), + ok = logger:remove_handler(HandlerId), + receive + {exported, Size} -> + ?assertEqual(LogsNum, Size) + after 5000 -> + ct:fail(otel_log_handler_exporter_failed) + end, + ?assertEqual(undefined, erlang:whereis(RegName)). + +sanity_end_to_end_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{reg_name := RegName}}} = logger:get_handler_config(HandlerId), + Mon = erlang:monitor(process, RegName), + + otel_propagator_text_map:extract(otel_propagator_trace_context, [{"traceparent",?TRACEPARENT}]), + [_, TraceId, SpanId, _] = string:split(?TRACEPARENT, "-", all), + LoggerMeta = logger:get_process_metadata(), + ?assertMatch(#{otel_span_id := SpanId, otel_trace_id := TraceId}, LoggerMeta), + %% the number of log events must be 5, since init_per_testcase set max_queue_size = 5 + %% Once 5 events are created, the handler will start exporting, so we can reliably expect 5 items + logger:warning(?LOG_MSG), + logger:error(?LOG_MSG), + logger:warning(#{msg => ?LOG_MSG, foo => [bar, baz]}), + logger:error(#{msg => ?LOG_MSG, foo => {bar, [baz]}}), + logger:warning(?LOG_MSG ++ "test term: ~p", [#{foo => bar}]), + receive + {exported, Data} -> + #{resource_logs := [#{scope_logs := [#{log_records := LogRecords}]}]} = Data, + ?assertEqual(5, length(LogRecords)), + {{Y, M, D}, _} = calendar:universal_time(), + lists:foreach( + fun(#{time_unix_nano := T, observed_time_unix_nano := T, + trace_id := LogTraceId, span_id := LogSpanId}) -> + %% this is to check that timestamps unit is actually nanosecond, + %% as otel_otlp_logs relies on OTP logger producing microsecond timestamps, + %% if it's not nanosecond, calendar:system_time_to_universal_time/2 + %% is expected to produce some unrealistic dates + {{LogY, LogM, LogD}, _} = calendar:system_time_to_universal_time(T, nanosecond), + ?assertEqual({Y, M, D}, {LogY, LogM, LogD}), + ?assertEqual(hex_str_to_bin(TraceId, 128), LogTraceId), + ?assertEqual(hex_str_to_bin(SpanId, 64), LogSpanId) + end, + LogRecords) + after 5000 -> + ct:fail(otel_log_handler_exporter_failed) + end, + + %% No crash is expected during the test + receive + {'DOWN', Mon, process, _Pid, _} -> + %% test is to ensure we don't hit this + ct:fail(otel_log_handler_crash) + after 100 -> + ok + end. + +overload_protection_slow_exporter_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := + #{tables := {Tab1, Tab2}, + max_queue_size := MaxSize, + reg_name := RegName}} = HandlerConf} = logger:get_handler_config(HandlerId), + + Pid1 = spawn(fun() -> insert_loop(HandlerConf) end), + Pid2 = spawn(fun() -> insert_loop(HandlerConf) end), + + TimeSec = 5, + ct:pal("Running ~p test case for ~p seconds...", [?FUNCTION_NAME, TimeSec]), + timer:sleep(timer:seconds(TimeSec)), + + ?assert(ets:info(Tab1, size) =< MaxSize), + ?assert(ets:info(Tab2, size) =< MaxSize), + ct:pal("otel_log_handler status: ~p", [sys:get_status(RegName)]), + + exit(Pid1, kill), + exit(Pid2, kill). + +overload_protection_fast_exporter_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := + #{tables := {Tab1, Tab2}, + max_queue_size := MaxSize, + reg_name := RegName}} = HandlerConf} = logger:get_handler_config(HandlerId), + + Pid1 = spawn(fun() -> insert_loop(HandlerConf) end), + Pid2 = spawn(fun() -> insert_loop(HandlerConf) end), + + TimeSec = 5, + ct:pal("Running ~p test case for ~p seconds...", [?FUNCTION_NAME, TimeSec]), + timer:sleep(timer:seconds(TimeSec)), + + %% TODO: it probably can be flaky under some race conditions + ?assert(ets:info(Tab1, size) =< MaxSize), + ?assert(ets:info(Tab2, size) =< MaxSize), + ct:pal("otel_log_handler status: ~p", [sys:get_status(RegName)]), + + exit(Pid1, kill), + exit(Pid2, kill). + + +%% this test mimics late inserts, that must be exported during the next exporter run +late_writes_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, HandlerConf} = logger:get_handler_config(HandlerId), + LateWritesSize = length(?config(late_writes, Config)), + NormalEventsSize = 7, + insert_events(HandlerConf, NormalEventsSize), + + receive + {exported, Size} -> + ?assertEqual(NormalEventsSize, Size), + %% There is no strict ordering guarantee, as exported replies are sent by + %% different runner processes. The test relies on the fact that there is + %% a delay between two exporter runs. + receive + {exported, Size1} -> + ?assertEqual(LateWritesSize, Size1) + after 5000 -> + ct:fail(otel_log_handler_late_writes_exporter_failed) + end + after 5000 -> + ct:fail(otel_log_handler_exporter_failed) + end. + +%% similar to late_writes_test but checks that the other table is checked and exported on termination +late_writes_on_terminate_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{max_queue_size := MaxSize}} = HandlerConf} = + logger:get_handler_config(HandlerId), + LateWritesSize = length(?config(late_writes, Config)), + insert_events(HandlerConf, MaxSize), + + receive + {exported, Size} -> + ?assertEqual(MaxSize, Size) + after 5000 -> + ct:fail(otel_log_handler_exporter_failed) + end, + + ok = logger:remove_handler(HandlerId), + receive + {exported, Size1} -> + ?assertEqual(LateWritesSize, Size1) + after 5000 -> + ct:fail(otel_log_handler_late_writes_exporter_failed) + end. + +no_exporter_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{reg_name := RegName, scheduled_delay_ms := Delay}} = HandlerConf} = + logger:get_handler_config(HandlerId), + Mon = erlang:monitor(process, RegName), + + %% Enough time to be sure export is triggered and state transition happened + Timeout = Delay * 10, + receive + {'DOWN', Mon, process, _Pid, _} -> + %% test is to ensure we don't hit this + ct:fail(otel_log_handler_crash) + after Timeout -> + ok + end, + %% Must be disabled + ?assertEqual(dropped, ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf)). + +retry_exporter_init_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, HandlerConf} = logger:get_handler_config(HandlerId), + receive + exporter_not_ready -> + %% expect accepting events: exporter is not ready yet, but max_queue_size is not reached + ?assertEqual(true, ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf)), + receive + exporter_ready -> + %% give some time to the handler to enable + timer:sleep(50), + ?assertEqual(true, ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf)) + after + %% higher timeout, as we need to wait for several exporter init retries + 5000 -> + ct:fail(otel_log_handler_exporter_init_failed) + end + after 2000 -> + ct:fail(otel_log_handler_exporter_failed) + end. + +%% opentelmetry_exporter is expected to successfully initialize and keep running +%% in disconected state (even though there is no collector to connect to). +%% This test only checks that no crashes occur +default_opentelemetry_exporter_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{scheduled_delay_ms := Delay, reg_name := RegName}} = HandlerConf} = + logger:get_handler_config(HandlerId), + Mon = erlang:monitor(process, RegName), + + insert_events(HandlerConf, 10), + %% Give it some time to run and do state transition + Timeout = Delay * 10, + receive + {'DOWN', Mon, process, _Pid, _} -> + %% test is to ensure we don't hit this + ct:fail(otel_log_handler_crash) + after Timeout -> + ok + end. + +exporter_exit_test(Config) -> + HandlerId = ?config(handler_id, Config), + ExporterSleep = ?config(exporter_sleep, Config), + {ok, #{config := #{reg_name := RegName}}} = logger:get_handler_config(HandlerId), + Mon = erlang:monitor(process, RegName), + + Timeout = ExporterSleep * 2, + receive + {'DOWN', Mon, process, _Pid, test_exporter_crash} -> + %% supervisor must restart the handler + ?assert(wait_for_restart(RegName, 1000)) + after Timeout -> + ct:fail(otel_log_handler_no_exit) + end. + +disabled_exporter_test(Config) -> + HandlerId = ?config(handler_id, Config), + {ok, #{config := #{scheduled_delay_ms := Delay}} = HandlerConf} = + logger:get_handler_config(HandlerId), + + %% Enough time to be sure exporter init is triggered and handler is in permanent drop mode + Timeout = Delay * 10, + timer:sleep(Timeout), + %% Must be disabled + ?assertEqual(dropped, ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf)). + +%% exporter behaviour +init(#{undefined_exporter := true}) -> + ignore; +init(#{retries := N, counter := Ref, reply_to := Pid} = ExpConfig) -> + case atomics:add_get(Ref, 1, 1) of + N -> + Pid ! exporter_ready, + {ok, ExpConfig}; + _ -> + Pid ! exporter_not_ready, + {error, not_ready} + end; +init(#{spawn_link := true, sleep := Time} = ExpConfig) -> + F = fun () -> timer:sleep(Time), + exit(test_exporter_crash) + end, + Pid = erlang:spawn_link(F), + {ok, ExpConfig#{exporter_pid => Pid}}; +init(ExpConfig) -> + {ok, ExpConfig}. + +export(logs, {Tab, _LogHandlerConfig}, _Resource, #{sleep := Time} = State) -> + timer:sleep(Time), + case State of + %% Mimic an exporter that must take records from the table + %% (even if export failed, taken records are not inserted back to the table) + #{clean_tab := true} -> + {ok, _Size} = traverse_clean(Tab), + ok; + _ -> ok + end, + ok; +export(logs, {_Tab, _LogHandlerConfig}, _Resource, #{success := true} =_State) -> + ok; +export(logs, {Tab, LogHandlerConfig}, Resource, #{otlp := true, reply_to := Pid} = _State) -> + Res = otel_otlp_logs:to_proto(Tab, Resource, LogHandlerConfig), + Pid ! {exported, Res}; +export(logs, {Tab, _LogHandlerConfig}, _Resource, #{reply_to := Pid} = State) -> + {ok, Size} = traverse_clean(Tab), + Pid ! {exported, Size}, + case State of + #{late_writes := Records} -> + ets:insert(Tab, Records); + _ -> ok + end, + ok; +export(logs, {Tab, _LogHandlerConfig}, _Resource, #{clean_tab := true} = _State) -> + {ok, _Size} = traverse_clean(Tab), + ok. + +shutdown(_) -> + ok. + +%% helpers + +common_testcase_init(Config) -> + {ok, _} = application:ensure_all_started(opentelemetry_exporter), + {ok, _} = application:ensure_all_started(opentelemetry_experimental), + Config. + +common_testcase_cleanup(_Config) -> + _ = application:stop(opentelemetry_experimental), + _ = application:stop(opentelemetry_exporter), + ok. + +log_event() -> + #{level => warning, + meta => #{gl => erlang:group_leader(), pid => erlang:self(), time => os:system_time(microsecond)}, + msg => {string, ?LOG_MSG}}. + +hex_str_to_bin(Str, Size) -> + B = iolist_to_binary(Str), + <<(binary_to_integer(B, 16)):Size>>. + +insert_events(HandlerConf, N) -> + lists:foreach(fun(_) -> ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf) end, + lists:seq(1, N)). + +insert_loop(HandlerConf) -> + ?OTEL_LOG_HANDLER:log(log_event(), HandlerConf), + insert_loop(HandlerConf). + +traverse_clean(Tab) -> + try + ets:safe_fixtable(Tab, true), + traverse_clean(Tab, ets:first(Tab), 0) + after + ets:safe_fixtable(Tab, false) + end. + +traverse_clean(Tab, '$end_of_table', Seq) -> + ct:pal("Exported: ~p records from table: ~p", [Seq, Tab]), + {ok, Seq}; +traverse_clean(Tab, Key, Seq) -> + %% Tab type is duplicate_bag, keys can be non unique + Records = ets:take(Tab, Key), + traverse_clean(Tab, ets:next(Tab, Key), Seq+length(Records)). + +wait_for_restart(_, 0) -> false; +wait_for_restart(RegName, Retries) -> + timer:sleep(1), + is_pid(whereis(RegName)) orelse wait_for_restart(RegName, Retries-1). diff --git a/apps/opentelemetry_exporter/src/otel_otlp_traces.erl b/apps/opentelemetry_exporter/src/otel_otlp_traces.erl index 31a19cbc..c17dc0bf 100644 --- a/apps/opentelemetry_exporter/src/otel_otlp_traces.erl +++ b/apps/opentelemetry_exporter/src/otel_otlp_traces.erl @@ -48,22 +48,43 @@ to_proto(Tab, Resource) -> end. to_proto_by_instrumentation_scope(Tab) -> - Key = ets:first(Tab), - to_proto_by_instrumentation_scope(Tab, Key). + %% Even though during the export trace spans are being inserted to another table, + %% some late writes to the table being exported are still possible. + %% Thus, we can't lookup all spans from the table and then let the processor + %% delete the table completely and re-create it as it will imply the risk of losing those (possible) late writes. + %% So, we fix the table and traverse it with ets:take/2. After that, the late writes won't be exported, + %% but they will be kept in the table and ready to be exported in the next exporter runs. + true = ets:safe_fixtable(Tab, true), + try + to_proto_by_instrumentation_scope(Tab, ets:first(Tab), #{}) + after + _ = ets:safe_fixtable(Tab, false) + end. -to_proto_by_instrumentation_scope(_Tab, '$end_of_table') -> - []; -to_proto_by_instrumentation_scope(Tab, InstrumentationScope) -> - InstrumentationScopeSpans = lists:foldl(fun(Span, Acc) -> - [to_proto(Span) | Acc] - end, [], ets:lookup(Tab, InstrumentationScope)), - InstrumentationScopeSpansProto = otel_otlp_common:to_instrumentation_scope_proto(InstrumentationScope), - [InstrumentationScopeSpansProto#{spans => InstrumentationScopeSpans} - | to_proto_by_instrumentation_scope(Tab, ets:next(Tab, InstrumentationScope))]. +to_proto_by_instrumentation_scope(_Tab, '$end_of_table', ScopeAcc) -> + maps:fold( + fun(Scope, Spans, Acc) -> + ScopeProto = otel_otlp_common:to_instrumentation_scope_proto(Scope), + [ScopeProto#{spans => Spans} | Acc] + end, + [], + ScopeAcc); +to_proto_by_instrumentation_scope(Tab, Key, ScopeAcc) -> + ScopeAcc1 = lists:foldl( + fun(#span{instrumentation_scope=Scope}=Span, Acc) -> + SpanProto = to_proto(Span), + maps:update_with(Scope, + fun(Spans) -> [SpanProto | Spans] end, + [SpanProto], + Acc) + end, + ScopeAcc, + ets:take(Tab, Key)), + Key1 = ets:next(Tab, Key), + to_proto_by_instrumentation_scope(Tab, Key1, ScopeAcc1). %% TODO: figure out why this type spec fails %% -spec to_proto(#span{}) -> opentelemetry_exporter_trace_service_pb:span(). - to_proto(#span{trace_id=TraceId, span_id=SpanId, tracestate=TraceState,