|
27 | 27 | -export([start_link/3,
|
28 | 28 | collect/1,
|
29 | 29 | checkpoint_generation/1,
|
30 |
| - shutdown/1]). |
| 30 | + shutdown/1, |
| 31 | + collect_/4]). |
31 | 32 |
|
32 | 33 | -export([init/1,
|
33 | 34 | handle_call/3,
|
@@ -67,7 +68,7 @@ start_link(ReaderId, ProviderSup, Config) ->
|
67 | 68 | gen_server:start_link(?MODULE, [ReaderId, ProviderSup, Config], []).
|
68 | 69 |
|
69 | 70 | collect(ReaderPid) ->
|
70 |
| - ReaderPid ! collect. |
| 71 | + gen_server:call(ReaderPid, collect). |
71 | 72 |
|
72 | 73 | shutdown(ReaderPid) ->
|
73 | 74 | gen_server:call(ReaderPid, shutdown).
|
@@ -140,64 +141,63 @@ handle_continue(register_with_server, State=#state{provider_sup=ProviderSup,
|
140 | 141 |
|
141 | 142 | handle_call(shutdown, _From, State) ->
|
142 | 143 | {reply, ok, State};
|
143 |
| -handle_call(_, _From, State) -> |
144 |
| - {noreply, State}. |
145 |
| - |
146 |
| -handle_cast(_, State) -> |
147 |
| - {noreply, State}. |
148 | 144 |
|
149 |
| -%% eqwalizer:fixme get an unbound record error until the fixme for state record is resolved |
150 |
| -handle_info(collect, State=#state{exporter=undefined, |
151 |
| - export_interval_ms=ExporterIntervalMs, |
152 |
| - tref=TRef}) when TRef =/= undefined andalso |
153 |
| - ExporterIntervalMs =/= undefined -> |
154 |
| - erlang:cancel_timer(TRef, [{async, true}]), |
155 |
| - NewTRef = erlang:send_after(ExporterIntervalMs, self(), collect), |
156 |
| - {noreply, State#state{tref=NewTRef}}; |
157 |
| -handle_info(collect, State=#state{id=ReaderId, |
158 |
| - exporter={ExporterModule, Config}, |
159 |
| - export_interval_ms=undefined, |
160 |
| - tref=undefined, |
| 145 | +handle_call(collect, _From, State=#state{id=ReaderId, |
| 146 | + exporter=Exporter, |
161 | 147 | callbacks_tab=CallbacksTab,
|
162 | 148 | streams_tab=StreamsTab,
|
163 | 149 | metrics_tab=MetricsTab,
|
164 | 150 | exemplars_tab=ExemplarsTab,
|
165 | 151 | resource=Resource,
|
166 | 152 | producers=Producers
|
167 | 153 | }) ->
|
| 154 | + TRef = update_timer(State#state.tref, State#state.export_interval_ms), |
| 155 | + Reply = collect_and_export(ReaderId, Exporter, CallbacksTab, ViewAggregationTab, MetricsTab, Resource), |
| 156 | + {reply, Reply, State#state{tref=TRef}}; |
| 157 | + |
| 158 | +handle_call(collect, _From, State=#state{id=ReaderId, |
| 159 | + exporter={ExporterModule, Config}, |
| 160 | + export_interval_ms=ExporterIntervalMs, |
| 161 | + tref=TRef, |
| 162 | + callbacks_tab=CallbacksTab, |
| 163 | + streams_tab=StreamsTab, |
| 164 | + metrics_tab=MetricsTab, |
| 165 | + exemplars_tab=ExemplarsTab, |
| 166 | + resource=Resource, |
| 167 | + producers=Producers |
| 168 | + }) -> |
| 169 | + TRef = update_timer(State#state.tref, State#state.export_interval_ms), |
| 170 | + |
168 | 171 | Metrics = run_collection(CallbacksTab, StreamsTab, MetricsTab, ExemplarsTab, ReaderId, Producers),
|
169 | 172 |
|
170 |
| - otel_exporter:export_metrics(ExporterModule, Metrics, Resource, Config), |
| 173 | + Reply = otel_exporter:export_metrics(ExporterModule, Metrics, Resource, Config), |
171 | 174 |
|
172 |
| - {noreply, State}; |
173 |
| -handle_info(collect, State=#state{id=ReaderId, |
174 |
| - exporter={ExporterModule, Config}, |
175 |
| - export_interval_ms=ExporterIntervalMs, |
176 |
| - tref=TRef, |
177 |
| - callbacks_tab=CallbacksTab, |
178 |
| - streams_tab=StreamsTab, |
179 |
| - metrics_tab=MetricsTab, |
180 |
| - exemplars_tab=ExemplarsTab, |
181 |
| - resource=Resource, |
182 |
| - producers=Producers |
183 |
| - }) when TRef =/= undefined andalso |
184 |
| - ExporterIntervalMs =/= undefined -> |
185 |
| - erlang:cancel_timer(TRef, [{async, true}]), |
186 |
| - NewTRef = erlang:send_after(ExporterIntervalMs, self(), collect), |
| 175 | + {reply, Reply, State#state{tref=TRef}}; |
187 | 176 |
|
188 |
| - Metrics = run_collection(CallbacksTab, StreamsTab, MetricsTab, ExemplarsTab, ReaderId, Producers), |
| 177 | +%% no exporter, do nothing at all |
| 178 | +handle_call(collect, _From, State) -> |
| 179 | + {reply, ok, State}; |
189 | 180 |
|
190 |
| - otel_exporter:export_metrics(ExporterModule, Metrics, Resource, Config), |
| 181 | +handle_call(_, _From, State) -> |
| 182 | + {noreply, State}. |
191 | 183 |
|
192 |
| - {noreply, State#state{tref=NewTRef}}; |
193 |
| -%% no tref or exporter, do nothing at all |
| 184 | +handle_info(collect, State) -> |
| 185 | + {reply, _, NewState} = handle_call(collect, undefined, State), |
| 186 | + {noreply, NewState}; |
194 | 187 | handle_info(_, State) ->
|
195 | 188 | {noreply, State}.
|
196 | 189 |
|
| 190 | +handle_cast(_, State) -> |
| 191 | + {noreply, State}. |
| 192 | + |
197 | 193 | code_change(State) ->
|
198 | 194 | {ok, State}.
|
199 | 195 |
|
200 |
| -%% |
| 196 | +update_timer(undefined, undefined) -> |
| 197 | + undefined; |
| 198 | +update_timer(TRef, ExporterIntervalMs) -> |
| 199 | + erlang:cancel_timer(TRef, [{async, true}]), |
| 200 | + erlang:send_after(ExporterIntervalMs, self(), collect). |
201 | 201 |
|
202 | 202 | run_collection(CallbacksTab, StreamsTab, MetricsTab, ExemplarsTab, ReaderId, Producers) ->
|
203 | 203 | %% collect from view aggregations table and then export
|
|
0 commit comments