Skip to content

Commit 3e3b809

Browse files
committed
feat: endpoint query metrics
1 parent d142215 commit 3e3b809

File tree

4 files changed

+102
-34
lines changed

4 files changed

+102
-34
lines changed

lib/logflare/backends/user_monitoring.ex

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ defmodule Logflare.Backends.UserMonitoring do
4646
keep: &keep_metric_function/1,
4747
description: "Amount of bytes ingested by backend for a source"
4848
),
49-
# sum("logflare.endpoints.query.scanned_bytes",
50-
# keep: &keep_metric_function/1,
51-
# description: "Amount of bytes scanned by a Logflare Endpoint"
52-
# ),
49+
sum("logflare.endpoints.query.total_bytes_processed",
50+
keep: &keep_metric_function/1,
51+
description: "Amount of bytes processed by a Logflare Endpoint"
52+
),
5353
counter("logflare.backends.ingest.ingested_count",
5454
measurement: :ingested_bytes,
5555
keep: &keep_metric_function/1,
@@ -81,7 +81,6 @@ defmodule Logflare.Backends.UserMonitoring do
8181

8282
defp exporter_callback({:metrics, metrics}, config) do
8383
metrics
84-
8584
|> OtelMetricExporter.Protocol.build_metric_service_request(config.resource)
8685
|> Protobuf.encode()
8786
|> Protobuf.decode(ExportMetricsServiceRequest)
@@ -99,9 +98,9 @@ defmodule Logflare.Backends.UserMonitoring do
9998
|> Sources.Cache.preload_rules()
10099
|> Sources.refresh_source_metrics()
101100

102-
Processor.ingest(user_events, Logs.Raw, source)
103-
101+
Processor.ingest(user_events, Logs.Raw, source)
104102
end)
103+
105104
:ok
106105
end
107106

lib/logflare/endpoints.ex

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -302,22 +302,31 @@ defmodule Logflare.Endpoints do
302302
[:logflare, :endpoints, :run_query, :exec_query_on_backend],
303303
%{endpoint_id: endpoint_query.id, language: query_language},
304304
fn ->
305-
result =
306-
exec_query_on_backend(
307-
endpoint_query,
308-
transformed_query,
309-
declared_params,
310-
params,
311-
opts
312-
)
313-
314-
total_rows =
315-
case result do
316-
{:ok, %{rows: rows}} -> length(rows)
317-
_ -> 0
318-
end
319-
320-
{result, %{total_rows: total_rows}}
305+
exec_query_on_backend(
306+
endpoint_query,
307+
transformed_query,
308+
declared_params,
309+
params,
310+
opts
311+
)
312+
|> then(fn
313+
{:ok, data} = result ->
314+
measurements = %{
315+
total_bytes_processed: Map.get(data, :total_bytes_processed, 0)
316+
}
317+
318+
metadata =
319+
Map.merge(endpoint_query.parsed_labels || %{}, %{
320+
"endpoint_id" => endpoint_query.id
321+
})
322+
323+
:telemetry.execute([:logflare, :endpoints, :query], measurements, metadata)
324+
325+
{result, %{}}
326+
327+
result ->
328+
{result, %{}}
329+
end)
321330
end
322331
)
323332
end

test/logflare/backends/user_monitoring_test.exs

Lines changed: 70 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Logflare.Backends.UserMonitoringTest do
1212
alias Logflare.Backends.UserMonitoring
1313
alias Logflare.SystemMetrics.AllLogsLogged
1414
alias Logflare.LogEvent
15+
alias Logflare.Endpoints
1516

1617
def source_and_user(_context) do
1718
start_supervised!(AllLogsLogged)
@@ -87,16 +88,16 @@ defmodule Logflare.Backends.UserMonitoringTest do
8788
end
8889
end
8990

90-
9191
describe "system monitoring labels" do
9292
setup :start_otel_exporter
93+
9394
setup do
9495
start_supervised!(AllLogsLogged)
9596
insert(:plan)
9697
:ok
9798
end
9899

99-
test "backends.ingest.ingested_bytes and backends.ingest.ingested_count" do
100+
test "backends.ingest.ingested_bytes and backends.ingest.ingested_count" do
100101
GoogleApi.BigQuery.V2.Api.Tabledata
101102
|> stub(:bigquery_tabledata_insert_all, fn _conn,
102103
_project_id,
@@ -171,42 +172,101 @@ test "backends.ingest.ingested_bytes and backends.ingest.ingested_count" do
171172
start_supervised!({SourceSup, other_source}, id: :other_source)
172173
start_supervised!({SourceSup, metrics_source}, id: :metrics_source)
173174
start_supervised!({SourceSup, other_metrics_source}, id: :other_metrics_source)
175+
174176
GoogleApi.BigQuery.V2.Api.Tabledata
175177
|> stub(:bigquery_tabledata_insert_all, fn _conn,
176178
_project_id,
177179
dataset_id,
178180
_table_name,
179181
opts ->
180-
if String.starts_with?(dataset_id, "#{other_user.id}") do
181-
send(pid, {:insert_all, opts[:body].rows})
182-
end
183-
{:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
182+
if String.starts_with?(dataset_id, "#{other_user.id}") do
183+
send(pid, {:insert_all, opts[:body].rows})
184+
end
185+
186+
{:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
184187
end)
185188

186189
:timer.sleep(500)
187190

188-
189191
assert {:ok, _} = Backends.ingest_logs([%{"metadata" => %{"value" => "test"}}], source)
190192

191193
assert {:ok, _} =
192194
Backends.ingest_logs([%{"metadata" => %{"value" => "different"}}], other_source)
193-
:timer.sleep(1500)
195+
196+
:timer.sleep(1500)
194197

195198
source_id = source.id
196199
other_source_id = other_source.id
197200
metrics_source_id = metrics_source.id
198201
other_metrics_source_id = other_metrics_source.id
199-
assert_receive {:insert_all, [%{json: %{"attributes" => _}} | _]= rows}, 5_000
202+
assert_receive {:insert_all, [%{json: %{"attributes" => _}} | _] = rows}, 5_000
200203

201204
rows = for row <- rows, do: row.json
202205

203206
assert Enum.all?(rows, &match?(%{"attributes" => [%{"my_label" => "different"}]}, &1))
207+
204208
for row <- rows, attr <- row["attributes"] do
205209
assert attr["source_id"] in [other_source_id, other_metrics_source_id]
206210
refute attr["source_id"] in [source_id, metrics_source_id]
207-
refute attr["my_label"] == "test"
211+
refute attr["my_label"] == "test"
208212
end
213+
end
214+
end
215+
216+
describe "endpoints" do
217+
setup :start_otel_exporter
218+
219+
setup do
220+
start_supervised!(AllLogsLogged)
221+
insert(:plan)
222+
:ok
223+
end
209224

225+
test "endpoints.query.total_processed_bytes" do
226+
pid = self()
227+
228+
GoogleApi.BigQuery.V2.Api.Tabledata
229+
|> stub(:bigquery_tabledata_insert_all, fn _conn,
230+
_project_id,
231+
dataset_id,
232+
_table_name,
233+
opts ->
234+
send(pid, {:insert_all, opts[:body].rows})
235+
{:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
236+
end)
237+
238+
expect(GoogleApi.BigQuery.V2.Api.Jobs, :bigquery_jobs_query, 1, fn _conn, _proj_id, opts ->
239+
{:ok, TestUtils.gen_bq_response([%{"result" => "1"}])}
240+
end)
241+
242+
user = insert(:user, system_monitoring: true)
243+
source = insert(:source, user: user, system_source_type: :metrics)
244+
start_supervised!({SourceSup, source}, id: :source)
245+
# execute a query on the endpoint
246+
endpoint =
247+
insert(:endpoint,
248+
user: user,
249+
query: "SELECT 1",
250+
labels: "my_label=some_value",
251+
parsed_labels: %{"my_label" => "some_value"}
252+
)
253+
254+
assert {:ok, _} = Endpoints.run_query(endpoint)
255+
:timer.sleep(1000)
256+
endpoint_id = endpoint.id
257+
258+
assert_receive {:insert_all,
259+
[
260+
%{
261+
json: %{
262+
"attributes" => [
263+
%{"my_label" => "some_value", "endpoint_id" => ^endpoint_id}
264+
]
265+
}
266+
}
267+
| _
268+
]},
269+
5_000
210270
end
211271
end
212272
end

test/support/test_utils.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ defmodule Logflare.TestUtils do
202202
rows: rows,
203203
schema: schema,
204204
# Simple result length as test value
205-
totalBytesProcessed: length(rows) |> to_string(),
205+
totalBytesProcessed: (length(rows) * 1024) |> to_string(),
206206
totalRows: inspect(length(results))
207207
}
208208
end

0 commit comments

Comments
 (0)