|
26 | 26 |
|
27 | 27 | -export([start_link/3,
|
28 | 28 | collect/1,
|
29 |
| - shutdown/1]). |
| 29 | + shutdown/1, |
| 30 | + collect_/4]). |
30 | 31 |
|
31 | 32 | -export([init/1,
|
32 | 33 | handle_call/3,
|
@@ -64,7 +65,7 @@ start_link(ReaderId, ProviderSup, Config) ->
|
64 | 65 | gen_server:start_link(?MODULE, [ReaderId, ProviderSup, Config], []).
|
65 | 66 |
|
66 | 67 | collect(ReaderPid) ->
|
67 |
| - ReaderPid ! collect. |
| 68 | + gen_server:call(ReaderPid, collect). |
68 | 69 |
|
69 | 70 | shutdown(ReaderPid) ->
|
70 | 71 | gen_server:call(ReaderPid, shutdown).
|
@@ -111,62 +112,44 @@ handle_continue(register_with_server, State=#state{provider_sup=ProviderSup,
|
111 | 112 |
|
112 | 113 | handle_call(shutdown, _From, State) ->
|
113 | 114 | {reply, ok, State};
|
114 |
| -handle_call(_, _From, State) -> |
115 |
| - {noreply, State}. |
116 |
| - |
117 |
| -handle_cast(_, State) -> |
118 |
| - {noreply, State}. |
119 |
| - |
120 |
| -handle_info(collect, State=#state{exporter=undefined, |
121 |
| - export_interval_ms=ExporterIntervalMs, |
122 |
| - tref=TRef}) when TRef =/= undefined andalso |
123 |
| - ExporterIntervalMs =/= undefined -> |
124 |
| - erlang:cancel_timer(TRef, [{async, true}]), |
125 |
| - NewTRef = erlang:send_after(ExporterIntervalMs, self(), collect), |
126 |
| - {noreply, State#state{tref=NewTRef}}; |
127 |
| -handle_info(collect, State=#state{id=ReaderId, |
128 |
| - exporter={ExporterModule, Config}, |
129 |
| - export_interval_ms=undefined, |
130 |
| - tref=undefined, |
| 115 | +handle_call(collect, _From, State=#state{id=ReaderId, |
| 116 | + exporter=Exporter, |
131 | 117 | callbacks_tab=CallbacksTab,
|
132 | 118 | view_aggregation_tab=ViewAggregationTab,
|
133 | 119 | metrics_tab=MetricsTab,
|
134 | 120 | resource=Resource
|
135 | 121 | }) ->
|
136 |
| - %% collect from view aggregations table and then export |
137 |
| - Metrics = collect_(CallbacksTab, ViewAggregationTab, MetricsTab, ReaderId), |
138 |
| - |
139 |
| - otel_exporter:export_metrics(ExporterModule, Metrics, Resource, Config), |
140 |
| - |
141 |
| - {noreply, State}; |
142 |
| -handle_info(collect, State=#state{id=ReaderId, |
143 |
| - exporter={ExporterModule, Config}, |
144 |
| - export_interval_ms=ExporterIntervalMs, |
145 |
| - tref=TRef, |
146 |
| - callbacks_tab=CallbacksTab, |
147 |
| - view_aggregation_tab=ViewAggregationTab, |
148 |
| - metrics_tab=MetricsTab, |
149 |
| - resource=Resource |
150 |
| - }) when TRef =/= undefined andalso |
151 |
| - ExporterIntervalMs =/= undefined -> |
152 |
| - erlang:cancel_timer(TRef, [{async, true}]), |
153 |
| - NewTRef = erlang:send_after(ExporterIntervalMs, self(), collect), |
154 |
| - |
155 |
| - %% collect from view aggregations table and then export |
156 |
| - Metrics = collect_(CallbacksTab, ViewAggregationTab, MetricsTab, ReaderId), |
157 |
| - |
158 |
| - |
159 |
| - otel_exporter:export_metrics(ExporterModule, Metrics, Resource, Config), |
| 122 | + TRef = update_timer(State#state.tref, State#state.export_interval_ms), |
| 123 | + Reply = collect_and_export(ReaderId, Exporter, CallbacksTab, ViewAggregationTab, MetricsTab, Resource), |
| 124 | + {reply, Reply, State#state{tref=TRef}}; |
| 125 | +handle_call(_, _From, State) -> |
| 126 | + {noreply, State}. |
160 | 127 |
|
161 |
| - {noreply, State#state{tref=NewTRef}}; |
162 |
| -%% no tref or exporter, do nothing at all |
| 128 | +handle_info(collect, State) -> |
| 129 | + {reply, _, NewState} = handle_call(collect, undefined, State), |
| 130 | + {noreply, NewState}; |
163 | 131 | handle_info(_, State) ->
|
164 | 132 | {noreply, State}.
|
165 | 133 |
|
| 134 | +handle_cast(_, State) -> |
| 135 | + {noreply, State}. |
| 136 | + |
166 | 137 | code_change(State) ->
|
167 | 138 | {ok, State}.
|
168 | 139 |
|
169 | 140 | %%
|
| 141 | +collect_and_export(_ReaderId, undefined, _CallbacksTab, _ViewAggregationTab, _MetricsTab, _Resource) -> |
| 142 | + ok; |
| 143 | +collect_and_export(ReaderId, {ExporterModule, Config}, CallbacksTab, ViewAggregationTab, MetricsTab, Resource) -> |
| 144 | + %% collect from view aggregations table and then export |
| 145 | + Metrics = collect_(CallbacksTab, ViewAggregationTab, MetricsTab, ReaderId), |
| 146 | + otel_exporter:export_metrics(ExporterModule, Metrics, Resource, Config). |
| 147 | + |
| 148 | +update_timer(undefined, undefined) -> |
| 149 | + undefined; |
| 150 | +update_timer(TRef, ExporterIntervalMs) -> |
| 151 | + erlang:cancel_timer(TRef, [{async, true}]), |
| 152 | + erlang:send_after(ExporterIntervalMs, self(), collect). |
170 | 153 |
|
171 | 154 | -spec collect_(any(), ets:table(), any(), reference()) -> [any()].
|
172 | 155 | collect_(CallbacksTab, ViewAggregationTab, MetricsTab, ReaderId) ->
|
|
0 commit comments