Skip to content

Commit d142215

Browse files
committed
fix: metrics not routing to sources properly
1 parent 7dea485 commit d142215

File tree

6 files changed

+63
-106
lines changed

6 files changed

+63
-106
lines changed

lib/logflare/backends/source_sup.ex

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ defmodule Logflare.Backends.SourceSup do
2020
alias Logflare.Rules.Rule
2121
alias Logflare.Sources
2222
alias Logflare.Backends.AdaptorSupervisor
23-
alias Logflare.Backends.UserMonitoring
2423

2524
def child_spec(%Source{id: id} = arg) do
2625
%{

lib/logflare/backends/user_monitoring.ex

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ defmodule Logflare.Backends.UserMonitoring do
77
import Logflare.Utils.Guards
88
alias Logflare.Users
99
alias Logflare.Sources
10-
alias Logflare.Sources.Source
1110
alias Logflare.Logs
1211
alias Logflare.Logs.Processor
1312
alias Opentelemetry.Proto.Collector.Metrics.V1.ExportMetricsServiceRequest
@@ -43,15 +42,19 @@ defmodule Logflare.Backends.UserMonitoring do
4342

4443
def metrics do
4544
[
45+
sum("logflare.backends.ingest.ingested_bytes",
46+
keep: &keep_metric_function/1,
47+
description: "Amount of bytes ingested by backend for a source"
48+
),
49+
# sum("logflare.endpoints.query.scanned_bytes",
50+
# keep: &keep_metric_function/1,
51+
# description: "Amount of bytes scanned by a Logflare Endpoint"
52+
# ),
4653
counter("logflare.backends.ingest.ingested_count",
4754
measurement: :ingested_bytes,
4855
keep: &keep_metric_function/1,
4956
description: "Count of events ingested by backend for a source"
5057
),
51-
sum("logflare.backends.ingest.ingested_bytes",
52-
keep: &keep_metric_function/1,
53-
description: "Amount of bytes ingested by backend for a source"
54-
),
5558
sum("logflare.backends.ingest.egress.request_bytes",
5659
keep: &keep_metric_function/1,
5760
description:
@@ -68,7 +71,7 @@ defmodule Logflare.Backends.UserMonitoring do
6871
end
6972

7073
# take all metadata string keys and non-nested values
71-
defp extract_tags(metric, metadata) do
74+
defp extract_tags(_metric, metadata) do
7275
for {key, value} <- metadata,
7376
is_binary(key) and !is_list_or_map(value) and value != nil,
7477
into: %{} do
@@ -78,30 +81,27 @@ defmodule Logflare.Backends.UserMonitoring do
7881

7982
defp exporter_callback({:metrics, metrics}, config) do
8083
metrics
81-
|> Enum.group_by(fn metric ->
82-
metric
83-
|> Protobuf.encode()
84-
|> Protobuf.decode(Opentelemetry.Proto.Metrics.V1.Metric)
85-
|> Map.get(:data)
86-
|> Logflare.Logs.OtelMetric.handle_metric_data(%{})
87-
|> hd()
84+
85+
|> OtelMetricExporter.Protocol.build_metric_service_request(config.resource)
86+
|> Protobuf.encode()
87+
|> Protobuf.decode(ExportMetricsServiceRequest)
88+
|> Map.get(:resource_metrics)
89+
|> Logs.OtelMetric.handle_batch(%{})
90+
|> List.flatten()
91+
|> Enum.group_by(fn event ->
92+
event
8893
|> Map.get("attributes")
8994
|> Users.get_related_user_id()
9095
end)
91-
|> Enum.each(fn {user_id, user_metrics} ->
96+
|> Enum.each(fn {user_id, user_events} ->
9297
source =
9398
Sources.Cache.get_by(user_id: user_id, system_source_type: :metrics)
9499
|> Sources.Cache.preload_rules()
95100
|> Sources.refresh_source_metrics()
96101

97-
user_metrics
98-
|> OtelMetricExporter.Protocol.build_metric_service_request(config.resource)
99-
|> Protobuf.encode()
100-
|> Protobuf.decode(ExportMetricsServiceRequest)
101-
|> Map.get(:resource_metrics)
102-
|> Processor.ingest(Logs.OtelMetric, source)
103-
end)
102+
Processor.ingest(user_events, Logs.Raw, source)
104103

104+
end)
105105
:ok
106106
end
107107

@@ -135,12 +135,6 @@ defmodule Logflare.Backends.UserMonitoring do
135135
|> Sources.refresh_source_metrics()
136136
|> Sources.Cache.preload_rules()
137137

138-
defp get_system_source_metrics(user_id),
139-
do:
140-
Sources.Cache.get_by(user_id: user_id, system_source_type: :metrics)
141-
|> Sources.refresh_source_metrics()
142-
|> Sources.Cache.preload_rules()
143-
144138
defp format_message(event),
145139
do:
146140
:logger_formatter.format(event, %{single_line: true, template: [:msg]})

lib/logflare_web/templates/source/edit.html.eex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@
310310
<% end %>
311311

312312

313-
<% if @user.system_monitoring do %>
313+
<%= if @user.system_monitoring do %>
314314
<%= section_header("Monitoring Labels") %>
315315
<p>Set labels for this source, to be used for monitoring. Labels are used to group metrics together when system monitoring is enabled. Values of the labels will be determined at ingest time.</p>
316316
<%= form_for @changeset, Routes.source_path(@conn, :update, @source),fn f -> %>

mix.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ defmodule Logflare.Mixfile do
242242
{:opentelemetry_phoenix, "~> 2.0.0-rc.2"},
243243
{:opentelemetry_bandit, "~> 0.2.0-rc.1"},
244244
{:otel_metric_exporter,
245-
git: "https://github.com/supabase/elixir-otel-metric-exporter", ref: "61e2d37"},
245+
git: "https://github.com/supabase/elixir-otel-metric-exporter", ref: "e44bc0e"},
246246
{:live_monaco_editor, "~> 0.2"}
247247
]
248248
end

mix.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@
110110
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"},
111111
"opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"},
112112
"otel_http": {:hex, :otel_http, "0.2.0", "b17385986c7f1b862f5d577f72614ecaa29de40392b7618869999326b9a61d8a", [:rebar3], [], "hexpm", "f2beadf922c8cfeb0965488dd736c95cc6ea8b9efce89466b3904d317d7cc717"},
113-
"otel_metric_exporter": {:git, "https://github.com/supabase/elixir-otel-metric-exporter", "61e2d371638e5ff7edad09639a0d9bc7a694d875", [ref: "61e2d37"]},
113+
"otel_metric_exporter": {:git, "https://github.com/supabase/elixir-otel-metric-exporter", "e44bc0e7f3013933df022cae6d0b9b90320f3d1d", [ref: "e44bc0e"]},
114114
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
115115
"pgoutput_decoder": {:hex, :pgoutput_decoder, "0.1.0", "d4ffae6e58a563f2e6de8a0495d9f9afbe2f4ac75d6805419cd4a0d05f414c00", [:mix], [], "hexpm", "4dbecbe4eb8de728178fd129ccba810bccafa9a8769c6714c8b7b22963081c27"},
116116
"phoenix": {:hex, :phoenix, "1.7.21", "14ca4f1071a5f65121217d6b57ac5712d1857e40a0833aff7a691b7870fc9a3b", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "336dce4f86cba56fed312a7d280bf2282c720abb6074bdb1b61ec8095bdd0bc9"},

test/logflare/backends/user_monitoring_test.exs

Lines changed: 39 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ defmodule Logflare.Backends.UserMonitoringTest do
2828
[source: source, source_b: source_b, user: user]
2929
end
3030

31+
def start_otel_exporter(_context) do
32+
[spec] = UserMonitoring.get_otel_exporter()
33+
start_supervised!(spec)
34+
:ok
35+
end
36+
3137
describe "logs" do
3238
setup do
3339
:ok =
@@ -81,44 +87,16 @@ defmodule Logflare.Backends.UserMonitoringTest do
8187
end
8288
end
8389

84-
describe "metrics" do
85-
setup do
86-
insert(:plan)
87-
88-
user_1 = insert(:user)
89-
user_1 = Users.preload_defaults(user_1)
90-
91-
backend_1 =
92-
insert(:backend,
93-
user: user_1,
94-
type: :webhook,
95-
config: %{url: "http://test.com"},
96-
default_ingest?: false
97-
)
98-
99-
user_2 = insert(:user)
100-
user_2 = Users.preload_defaults(user_2)
101-
102-
backend_2 =
103-
insert(:backend,
104-
user: user_2,
105-
type: :webhook,
106-
config: %{url: "http://test.com"},
107-
default_ingest?: false
108-
)
109-
110-
{:ok, user_1: user_1, user_2: user_2, backend_1: backend_1, backend_2: backend_2}
111-
end
112-
end
11390

11491
describe "system monitoring labels" do
92+
setup :start_otel_exporter
11593
setup do
11694
start_supervised!(AllLogsLogged)
11795
insert(:plan)
11896
:ok
11997
end
12098

121-
test "applies to backend metrics" do
99+
test "backends.ingest.ingested_bytes and backends.ingest.ingested_count" do
122100
GoogleApi.BigQuery.V2.Api.Tabledata
123101
|> stub(:bigquery_tabledata_insert_all, fn _conn,
124102
_project_id,
@@ -175,74 +153,60 @@ defmodule Logflare.Backends.UserMonitoringTest do
175153
end
176154

177155
test "other users metrics" do
178-
GoogleApi.BigQuery.V2.Api.Tabledata
179-
|> stub(:bigquery_tabledata_insert_all, fn _conn,
180-
_project_id,
181-
_dataset_id,
182-
_table_name,
183-
_opts ->
184-
{:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
185-
end)
156+
pid = self()
186157

187158
user =
188159
insert(:user, system_monitoring: true)
189160

190161
other_user =
191162
insert(:user, system_monitoring: true)
192163

193-
%{id: source_id} = source = insert(:source, user: user, labels: "my_label=m.value")
164+
source = insert(:source, user: user, labels: "my_label=m.value")
194165

195-
%{id: other_source_id} =
196-
other_source = insert(:source, user: other_user, labels: "my_label=m.value")
166+
other_source = insert(:source, user: other_user, labels: "my_label=m.value")
197167

198168
metrics_source = insert(:source, user: user, system_source_type: :metrics)
199169
other_metrics_source = insert(:source, user: other_user, system_source_type: :metrics)
200-
start_supervised!({SourceSup, metrics_source}, id: :metrics_source)
201170
start_supervised!({SourceSup, source}, id: :source)
202171
start_supervised!({SourceSup, other_source}, id: :other_source)
172+
start_supervised!({SourceSup, metrics_source}, id: :metrics_source)
203173
start_supervised!({SourceSup, other_metrics_source}, id: :other_metrics_source)
174+
GoogleApi.BigQuery.V2.Api.Tabledata
175+
|> stub(:bigquery_tabledata_insert_all, fn _conn,
176+
_project_id,
177+
dataset_id,
178+
_table_name,
179+
opts ->
180+
if String.starts_with?(dataset_id, "#{other_user.id}") do
181+
send(pid, {:insert_all, opts[:body].rows})
182+
end
183+
{:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
184+
end)
185+
186+
:timer.sleep(500)
204187

205-
:timer.sleep(1000)
206188

207189
assert {:ok, _} = Backends.ingest_logs([%{"metadata" => %{"value" => "test"}}], source)
208190

209191
assert {:ok, _} =
210-
Backends.ingest_logs([%{"metadata" => %{"value" => "test"}}], other_source)
192+
Backends.ingest_logs([%{"metadata" => %{"value" => "different"}}], other_source)
193+
:timer.sleep(1500)
211194

212-
TestUtils.retry_assert(fn ->
213-
assert [_] = Backends.list_recent_logs_local(source)
214-
assert [_] = Backends.list_recent_logs_local(other_source)
195+
source_id = source.id
196+
other_source_id = other_source.id
197+
metrics_source_id = metrics_source.id
198+
other_metrics_source_id = other_metrics_source.id
199+
assert_receive {:insert_all, [%{json: %{"attributes" => _}} | _]= rows}, 5_000
215200

216-
assert [
217-
_ | _
218-
] = events = Backends.list_recent_logs_local(metrics_source)
201+
rows = for row <- rows, do: row.json
219202

220-
assert Enum.all?(
221-
events,
222-
&match?(
223-
%LogEvent{
224-
body: %{"attributes" => %{"my_label" => "test", "source_id" => ^source_id}}
225-
},
226-
&1
227-
)
228-
)
203+
assert Enum.all?(rows, &match?(%{"attributes" => [%{"my_label" => "different"}]}, &1))
204+
for row <- rows, attr <- row["attributes"] do
205+
assert attr["source_id"] in [other_source_id, other_metrics_source_id]
206+
refute attr["source_id"] in [source_id, metrics_source_id]
207+
refute attr["my_label"] == "test"
208+
end
229209

230-
assert [
231-
_ | _
232-
] = events = Backends.list_recent_logs_local(other_metrics_source)
233-
234-
assert Enum.all?(
235-
events,
236-
&match?(
237-
%LogEvent{
238-
body: %{
239-
"attributes" => %{"my_label" => "test", "source_id" => ^other_source_id}
240-
}
241-
},
242-
&1
243-
)
244-
)
245-
end)
246210
end
247211
end
248212
end

0 commit comments

Comments
 (0)