diff --git a/apps/opentelemetry/include/otel_span.hrl b/apps/opentelemetry/include/otel_span.hrl index 20707c4a..3bf45637 100644 --- a/apps/opentelemetry/include/otel_span.hrl +++ b/apps/opentelemetry/include/otel_span.hrl @@ -59,7 +59,18 @@ %% trace flags lowest bit is 1 but simply not propagated. is_recording :: boolean() | undefined | '_', - instrumentation_scope :: opentelemetry:instrumentation_scope() | undefined | '_' + instrumentation_scope :: opentelemetry:instrumentation_scope() | undefined | '_', + + + %% this is the Erlang process the span is or has was last active in. + %% It is used for the optional process monitoring feature where a process + %% can be monitored and have all spans + %% active in that process be ended if the process exits for any reason. + pid :: pid() | undefined, + + %% the span processors to run on start and end + %% mainly here so the span monitor can end the span + on_end_processors :: fun() }). -record(span_limits, { diff --git a/apps/opentelemetry/src/otel_attributes.erl b/apps/opentelemetry/src/otel_attributes.erl index e0a520d9..2ed076c3 100644 --- a/apps/opentelemetry/src/otel_attributes.erl +++ b/apps/opentelemetry/src/otel_attributes.erl @@ -38,9 +38,9 @@ new(List, CountLimit, ValueLengthLimit) when is_list(List) -> new(maps:from_list(List), CountLimit, ValueLengthLimit); new(Map, CountLimit, ValueLengthLimit) when is_map(Map) -> update_attributes(Map, #attributes{count_limit=CountLimit, - value_length_limit=ValueLengthLimit, - dropped=0, - map=#{}}); + value_length_limit=ValueLengthLimit, + dropped=0, + map=#{}}); new(_, CountLimit, ValueLengthLimit) -> #attributes{count_limit=CountLimit, value_length_limit=ValueLengthLimit, diff --git a/apps/opentelemetry/src/otel_span_ets.erl b/apps/opentelemetry/src/otel_span_ets.erl index ce7f214f..5e03568a 100644 --- a/apps/opentelemetry/src/otel_span_ets.erl +++ b/apps/opentelemetry/src/otel_span_ets.erl @@ -25,7 +25,7 @@ handle_call/3, handle_cast/2]). --export([start_span/7, +-export([start_span/8, end_span/1, end_span/2, get_ctx/1, @@ -55,13 +55,13 @@ start_link(Opts) -> %% @doc Start a span and insert into the active span ets table. -spec start_span(otel_ctx:t(), opentelemetry:span_name(), otel_sampler:t(), otel_id_generator:t(), - otel_span:start_opts(), fun(), otel_tracer_server:instrumentation_scope() | undefined) + otel_span:start_opts(), fun(), fun(), otel_tracer_server:instrumentation_scope() | undefined) -> opentelemetry:span_ctx(). -start_span(Ctx, Name, Sampler, IdGeneratorModule, Opts, Processors, InstrumentationScope) -> - case otel_span_utils:start_span(Ctx, Name, Sampler, IdGeneratorModule, Opts) of +start_span(Ctx, Name, Sampler, IdGeneratorModule, Opts, OnStartProcessors, OnEndProcessors, InstrumentationScope) -> + case otel_span_utils:start_span(Ctx, Name, Sampler, IdGeneratorModule, OnEndProcessors, Opts) of {SpanCtx=#span_ctx{is_recording=true}, Span=#span{}} -> Span1 = Span#span{instrumentation_scope=InstrumentationScope}, - Span2 = Processors(Ctx, Span1), + Span2 = OnStartProcessors(Ctx, Span1), case storage_insert(Span2) of true -> SpanCtx; diff --git a/apps/opentelemetry/src/otel_span_monitor.erl b/apps/opentelemetry/src/otel_span_monitor.erl new file mode 100644 index 00000000..86b9f5b0 --- /dev/null +++ b/apps/opentelemetry/src/otel_span_monitor.erl @@ -0,0 +1,110 @@ +%%%------------------------------------------------------------------------ +%% Copyright 2020, OpenTelemetry Authors +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% @doc +%% Process that can optionally monitor the process a span is in and end the +%% span if the process stops for any reason with the span still unfinished. +%% @end +%%%------------------------------------------------------------------------- +-module(otel_span_monitor). + +-behaviour(gen_server). + +-export([start_link/0, + add/1]). + +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2]). + +-include("otel_span_ets.hrl"). +-include("otel_span.hrl"). +-include("otel_tracer.hrl"). + +-define(SERVER, ?MODULE). + +-record(state, {monitors :: #{pid() => reference()}}). + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%% @doc Monitor a process and end all spans that have been active in it +%% and are still alive the process stops. +-spec add(pid()) -> ok. +add(Pid) -> + gen_server:call(?SERVER, {monitor, Pid}). + +init(_Opts) -> + {ok, #state{monitors=#{}}}. + +handle_call({monitor, Pid}, _From, State=#state{monitors=Monitors}) + when is_map_key(Pid, Monitors) -> + %% already being monitored + {reply, ok, State}; +handle_call({monitor, Pid}, _From, State=#state{monitors=Monitors}) -> + Ref = erlang:monitor(process, Pid), + {reply, ok, State#state{monitors=Monitors#{Pid => Ref}}}. + + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({'DOWN', Ref, process, Pid, Reason}, State=#state{monitors=Monitors}) -> + case maps:take(Pid, Monitors) of + {Ref, Monitors1} -> + end_spans(Pid, Reason), + {noreply, State#state{monitors=Monitors1}}; + error -> + {noreply, State} + end. + +%% + +%% ignore these functions because dialyzer doesn't like match spec use of '_' +-dialyzer({nowarn_function, end_spans/2}). +-dialyzer({nowarn_function, match_spec/2}). +-dialyzer({nowarn_function, end_span/3}). +-dialyzer({nowarn_function, select/1}). + +%% TODO: need a `select_take' or `match_take' in ets +end_spans(Pid, Reason) -> + ReasonString = otel_utils:assert_to_binary(io_lib:format("~p", [Reason])), + DownAttributes = otel_span:process_attributes(#{finished_by_monitor => true}), + DownEvent = opentelemetry:event('process died', #{reason => ReasonString}), + Spans = select(Pid), + [begin + case ets:take(?SPAN_TAB, SpanId) of + [] -> + ok; + [Span] -> + end_span(Span, DownEvent, DownAttributes) + end + end || SpanId <- Spans], + ok. + +select(Pid) -> + ets:select(?SPAN_TAB, match_spec(Pid, '$1')). + +match_spec(Pid, Return) -> + [{#span{span_id='$1', pid='$2', _='_'}, + [{'=:=', '$2', Pid}], + [Return]}]. + +end_span(Span=#span{attributes=Attributes, + events=Events, + on_end_processors=Processors}, DownEvent, DownAttributes) -> + Span1 = Span#span{attributes=otel_attributes:set(DownAttributes, Attributes), + events=otel_events:add([DownEvent], Events)}, + Processors(Span1). diff --git a/apps/opentelemetry/src/otel_span_sup.erl b/apps/opentelemetry/src/otel_span_sup.erl index 040a371c..ca8dad0e 100644 --- a/apps/opentelemetry/src/otel_span_sup.erl +++ b/apps/opentelemetry/src/otel_span_sup.erl @@ -45,6 +45,13 @@ init([Config]) -> type => worker, modules => [otel_span_sweeper]}, + Monitor = #{id => otel_span_monitor, + start => {otel_span_monitor, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => [otel_span_monitor]}, + SpanHandler = #{id => otel_span_ets, start => {otel_span_ets, start_link, [[]]}, restart => permanent, @@ -52,7 +59,7 @@ init([Config]) -> type => worker, modules => [otel_span_ets]}, - ChildSpecs = [SpanHandler, Sweeper], + ChildSpecs = [SpanHandler, Monitor, Sweeper], {ok, {SupFlags, ChildSpecs}}. %% internal functions diff --git a/apps/opentelemetry/src/otel_span_utils.erl b/apps/opentelemetry/src/otel_span_utils.erl index 3cf0891e..a35d3bd4 100644 --- a/apps/opentelemetry/src/otel_span_utils.erl +++ b/apps/opentelemetry/src/otel_span_utils.erl @@ -18,7 +18,7 @@ %%%------------------------------------------------------------------------- -module(otel_span_utils). --export([start_span/5, +-export([start_span/6, end_span/1, end_span/2]). @@ -27,8 +27,8 @@ -include("otel_span.hrl"). -spec start_span(otel_ctx:t(), opentelemetry:span_name(), otel_sampler:t(), otel_id_generator:t(), - otel_span:start_opts()) -> {opentelemetry:span_ctx(), opentelemetry:span() | undefined}. -start_span(Ctx, Name, Sampler, IdGenerator, Opts) -> + fun(), otel_span:start_opts()) -> {opentelemetry:span_ctx(), opentelemetry:span() | undefined}. +start_span(Ctx, Name, Sampler, IdGenerator, OnEndProcessors, Opts) -> SpanAttributeCountLimit = otel_span_limits:attribute_count_limit(), SpanAttributeValueLengthLimit= otel_span_limits:attribute_value_length_limit(), EventCountLimit = otel_span_limits:event_count_limit(), @@ -48,9 +48,12 @@ start_span(Ctx, Name, Sampler, IdGenerator, Opts) -> Kind = maps:get(kind, Opts, ?SPAN_KIND_INTERNAL), StartTime = maps:get(start_time, Opts, opentelemetry:timestamp()), - new_span(Ctx, Name, Sampler, IdGenerator, StartTime, Kind, Attributes, Events, Links). -new_span(Ctx, Name, Sampler, IdGeneratorModule, StartTime, Kind, Attributes, Events, Links) -> + case maps:get(monitor, Opts) of true -> otel_span_monitor:add(self()); false -> ok end, + + new_span(Ctx, Name, Sampler, IdGenerator, StartTime, Kind, Attributes, Events, Links, OnEndProcessors). + +new_span(Ctx, Name, Sampler, IdGeneratorModule, StartTime, Kind, Attributes, Events, Links, OnEndProcessors) -> {NewSpanCtx, ParentSpanId} = new_span_ctx(Ctx, IdGeneratorModule), TraceId = NewSpanCtx#span_ctx.trace_id, @@ -70,7 +73,9 @@ new_span(Ctx, Name, Sampler, IdGeneratorModule, StartTime, Kind, Attributes, Eve events=Events, links=Links, trace_flags=TraceFlags, - is_recording=IsRecording}, + is_recording=IsRecording, + pid=self(), + on_end_processors=OnEndProcessors}, {NewSpanCtx#span_ctx{trace_flags=TraceFlags, tracestate=TraceState, diff --git a/apps/opentelemetry/src/otel_tracer_default.erl b/apps/opentelemetry/src/otel_tracer_default.erl index c983e30b..c669e680 100644 --- a/apps/opentelemetry/src/otel_tracer_default.erl +++ b/apps/opentelemetry/src/otel_tracer_default.erl @@ -29,12 +29,12 @@ %% @doc Starts an inactive Span and returns its SpanCtx. -spec start_span(otel_ctx:t(), opentelemetry:tracer(), opentelemetry:span_name(), otel_span:start_opts()) -> opentelemetry:span_ctx(). -start_span(Ctx, {_, #tracer{on_start_processors=Processors, +start_span(Ctx, {_, #tracer{on_start_processors=OnStartProcessors, on_end_processors=OnEndProcessors, sampler=Sampler, id_generator=IdGeneratorModule, instrumentation_scope=InstrumentationScope}}, Name, Opts) -> - SpanCtx = otel_span_ets:start_span(Ctx, Name, Sampler, IdGeneratorModule, Opts, Processors, InstrumentationScope), + SpanCtx = otel_span_ets:start_span(Ctx, Name, Sampler, IdGeneratorModule, Opts, OnStartProcessors, OnEndProcessors, InstrumentationScope), SpanCtx#span_ctx{span_sdk={otel_span_ets, OnEndProcessors}}. -spec with_span(otel_ctx:t(), opentelemetry:tracer(), opentelemetry:span_name(), diff --git a/apps/opentelemetry/test/opentelemetry_SUITE.erl b/apps/opentelemetry/test/opentelemetry_SUITE.erl index c41db277..722ad6d0 100644 --- a/apps/opentelemetry/test/opentelemetry_SUITE.erl +++ b/apps/opentelemetry/test/opentelemetry_SUITE.erl @@ -35,7 +35,7 @@ all() -> all_cases() -> [with_span, macros, child_spans, disabled_sdk, update_span_data, tracer_instrumentation_scope, tracer_previous_ctx, stop_temporary_app, - reset_after, attach_ctx, default_sampler, non_recording_ets_table, + reset_after, attach_ctx, parent_ctx, default_sampler, non_recording_ets_table, root_span_sampling_always_on, root_span_sampling_always_off, record_but_not_sample, record_exception_works, record_exception_with_message_works, propagator_configuration, propagator_configuration_with_os_env, force_flush, @@ -756,6 +756,40 @@ attach_ctx(Config) -> ok. +parent_ctx(Config) -> + Tid = ?config(tid, Config), + + Tracer = opentelemetry:get_tracer(), + + SpanCtx1 = otel_tracer:start_span(Tracer, <<"span-1">>, #{}), + ?set_current_span(SpanCtx1), + ?assertMatch(SpanCtx1, ?current_span_ctx), + + ParentCtx = otel_ctx:get_current(), + + Pid = self(), + + erlang:spawn(fun() -> + SpanCtx2 = otel_tracer:start_span(ParentCtx, Tracer, <<"span-2">>, #{}), + otel_span:end_span(SpanCtx2), + + [Span2] = assert_all_exported(Tid, [SpanCtx2]), + + Pid ! {span2, Span2} + end), + + otel_span:end_span(SpanCtx1), + + [Span1] = assert_all_exported(Tid, [SpanCtx1]), + + receive + {span2, Span2} -> + ?assertEqual(Span1#span.span_id, Span2#span.parent_span_id) + after + 1000 -> + ct:fail(timeout) + end. + reset_after(Config) -> Tid = ?config(tid, Config), @@ -1101,7 +1135,9 @@ no_exporter(_Config) -> %% assert_all_exported(Tid, SpanCtxs) -> - [assert_exported(Tid, SpanCtx) || SpanCtx <- SpanCtxs]. + lists:flatmap(fun(SpanCtx) -> + assert_exported(Tid, SpanCtx) + end, SpanCtxs). assert_exported(Tid, #span_ctx{trace_id=TraceId, span_id=SpanId}) -> diff --git a/apps/opentelemetry/test/otel_span_monitor_SUITE.erl b/apps/opentelemetry/test/otel_span_monitor_SUITE.erl new file mode 100644 index 00000000..fbed6678 --- /dev/null +++ b/apps/opentelemetry/test/otel_span_monitor_SUITE.erl @@ -0,0 +1,107 @@ +%%% --------------------------------------------------------------------------- +%%% @doc +%%% @end +%%% --------------------------------------------------------------------------- +-module(otel_span_monitor_SUITE). + +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). + +-include("otel_test_utils.hrl"). +-include("otel_span.hrl"). +-include_lib("opentelemetry_api/include/opentelemetry.hrl"). +-include_lib("opentelemetry_api/include/otel_tracer.hrl"). + +-include("otel_span_ets.hrl"). + +all() -> + [{group, normal}, {group, abnormal}]. + +groups() -> + [{normal, [], [monitor_pid]}, + {abnormal, [], [monitor_pid]}]. + +init_per_suite(Config) -> + application:load(opentelemetry), + Config. + +end_per_suite(_Config) -> + application:unload(opentelemetry), + ok. + +init_per_testcase(ExitType, Config) -> + application:set_env(opentelemetry, processors, [{otel_batch_processor, #{exporter => {otel_exporter_pid, self()}, + scheduled_delay_ms => 1}}]), + {ok, _} = application:ensure_all_started(opentelemetry), + [{exit_type, ExitType} | Config]. + +end_per_testcase(_, _Config) -> + _ = application:stop(opentelemetry), + ok. + +monitor_pid(Config) -> + process_flag(trap_exit, true), + ExitType = ?config(exit_type, Config), + + SpanName1 = <<"span-1">>, + SpanName2 = <<"span-2">>, + + Attr1 = <<"attr-1">>, + AttrValue1 = <<"attr-value-1">>, + + SpanCtx1 = ?start_span(SpanName1, #{monitor => true}), + ?set_current_span(SpanCtx1), + Ctx = otel_ctx:get_current(), + ?add_event('some event on span 1', #{a => 1}), + + Pid1 = self(), + Pid2 = erlang:spawn_link(fun() -> + SpanCtx2 = ?start_span(Ctx, SpanName2, #{monitor => true}), + ?set_current_span(SpanCtx2), + ?assertMatch(SpanCtx2, ?current_span_ctx), + + ?add_event('some event on span 2', #{a => 2}), + ?set_attribute(Attr1, AttrValue1), + + erlang:exit(ExitType) + end), + + otel_span:end_span(SpanCtx1), + + receive + {'EXIT', Pid2, Reason} when Reason =:= ExitType -> + receive + {span, #span{name=SpanName2, + parent_span_id=ParentSpanId2, + pid=SpanPid2, + attributes=SpanAttributes2, + events=Events2}} -> + ?assertEqual(SpanCtx1#span_ctx.span_id, ParentSpanId2), + ?assertMatch([#event{name='process died'}, + #event{name='some event on span 2'}], otel_events:list(Events2)), + ?assertEqual(Pid2, SpanPid2), + ?assertEqual(#{Attr1 => AttrValue1, + finished_by_monitor => true}, otel_attributes:map(SpanAttributes2)), + receive + {span, #span{name=SpanName1, + pid=SpanPid1, + attributes=SpanAttributes1, + events=Events1}} -> + ?assertMatch([#event{name='some event on span 1'}], otel_events:list(Events1)), + ?assertEqual(Pid1, SpanPid1), + ?assertEqual(#{}, otel_attributes:map(SpanAttributes1)) + after + 1000 -> + ct:fail(span_1_timeout) + end + after + 1000 -> + ct:fail(span_2_timeout) + end, + ok + after + 1000 -> + ct:fail(monitor_timeout) + end. diff --git a/apps/opentelemetry_api/include/otel_tracer.hrl b/apps/opentelemetry_api/include/otel_tracer.hrl index ca61e1ea..aa4ea095 100644 --- a/apps/opentelemetry_api/include/otel_tracer.hrl +++ b/apps/opentelemetry_api/include/otel_tracer.hrl @@ -10,12 +10,18 @@ -define(start_span(SpanName, StartOpts), otel_tracer:start_span(?current_tracer, SpanName, StartOpts)). +-define(start_span(Ctx, SpanName, StartOpts), + otel_tracer:start_span(Ctx, ?current_tracer, SpanName, StartOpts)). + -define(with_span(SpanName, Fun), otel_tracer:with_span(?current_tracer, SpanName, #{}, Fun)). -define(with_span(SpanName, StartOpts, Fun), otel_tracer:with_span(?current_tracer, SpanName, StartOpts, Fun)). +-define(with_span(Ctx, SpanName, StartOpts, Fun), + otel_tracer:with_span(Ctx, ?current_tracer, SpanName, StartOpts, Fun)). + -define(set_current_span(SpanCtx), otel_tracer:set_current_span(SpanCtx)). diff --git a/apps/opentelemetry_api/src/otel_span.erl b/apps/opentelemetry_api/src/otel_span.erl index 649cc2a8..69517447 100644 --- a/apps/opentelemetry_api/src/otel_span.erl +++ b/apps/opentelemetry_api/src/otel_span.erl @@ -56,7 +56,8 @@ links => [opentelemetry:link()], is_recording => boolean(), start_time => opentelemetry:timestamp(), - kind => opentelemetry:span_kind()}. + kind => opentelemetry:span_kind(), + monitor => boolean()}. -export_type([start_opts/0]). @@ -64,12 +65,14 @@ validate_start_opts(Opts) when is_map(Opts) -> Attributes = maps:get(attributes, Opts, #{}), Links = maps:get(links, Opts, []), + Monitor = maps:get(monitor, Opts, false), Kind = maps:get(kind, Opts, ?SPAN_KIND_INTERNAL), StartTime = maps:get(start_time, Opts, opentelemetry:timestamp()), IsRecording = maps:get(is_recording, Opts, true), #{ attributes => process_attributes(Attributes), links => Links, + monitor => Monitor, kind => Kind, start_time => StartTime, is_recording => IsRecording