diff --git a/VERSION b/VERSION
index 616b3aac6e..f38d4cf6a6 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.26.13
+1.26.14
diff --git a/docs/docs.logflare.com/docs/concepts/system-monitoring.md b/docs/docs.logflare.com/docs/concepts/system-monitoring.md
new file mode 100644
index 0000000000..86fb3f293f
--- /dev/null
+++ b/docs/docs.logflare.com/docs/concepts/system-monitoring.md
@@ -0,0 +1,77 @@
+---
+sidebar_position: 6
+---
+
+# System Monitoring
+
+System Monitoring collects metrics and logs about your sources, backends, and endpoints. Dedicated system sources are provisioned to capture such operational data:
+
+- **`system.metrics`** - OpenTelemetry metrics for ingestion, queries, and egress
+- **`system.logs`** - Application logs for your sources and backends
+
+System sources behave like regular sources. Query, search, and monitor them with standard Logflare tools. They appear as favorites by default.
+
+## Enabling System Monitoring
+
+1. Navigate to **Account Settings** at `/account/edit`
+2. Find the **"System monitoring"** section
+3. Check **"Enable system monitoring"**
+4. Click **"Update account"**
+
+Logflare creates the three system sources and starts collecting data every 60 seconds. Disabling stops data collection immediately.
+
+## System Sources
+
+### system.metrics
+
+Contains OpenTelemetry metrics as structured events. Each metric includes:
+
+- **`event_message`** - Metric name
+- **`attributes`** - Key-value pairs with metric dimensions and values
+- **`timestamp`** - When the metric was recorded
+
+Metrics are collected every 60 seconds.
+
+### system.logs
+
+Contains application logs related to your sources, backends, and endpoints.
+
+## Metrics Collected
+
+| Metric | Description | Metadata |
+| ------------------------------------------------ | ---------------------------------------------------------------------------- | ---------------------------------------- |
+| `logflare.backends.ingest.ingested_bytes` | Total bytes ingested per source. Tracks storage consumption. | `source_id`, `backend_id`, custom labels |
+| `logflare.backends.ingest.ingested_count` | Count of events ingested per source. Tracks ingestion volume. | `source_id`, `backend_id`, custom labels |
+| `logflare.endpoints.query.total_bytes_processed` | Bytes processed when executing endpoint queries. Tracks query costs. | `endpoint_id`, custom labels |
+| `logflare.backends.ingest.egress.request_bytes` | Bytes sent to external HTTP endpoints and webhooks. Tracks egress bandwidth. | Backend-specific metadata |
+
+## Custom Labels
+
+Add dimensions to metrics through labels on sources and endpoints. Labels appear in the `attributes` field of metric events.
+
+For endpoint labelling behavior, see [Query Tagging with Labels](/concepts/endpoints#query-tagging-with-labels).
+
+### Format
+
+Use comma-separated key-value pairs:
+
+```
+environment=production,region=us-east,team=backend
+```
+
+### Ingest-time Field Extraction
+
+Extract values from event metadata using field paths:
+
+```
+label_name=field.path
+label_name=m.field.path
+```
+
+Examples:
+
+- `environment=m.env` extracts `metadata.env`
+- `user_type=m.user.type` extracts `metadata.user.type`
+- `region=region` extracts top-level `region`
+
+Only string values are extracted. Nested maps and lists are excluded.
diff --git a/lib/logflare/application.ex b/lib/logflare/application.ex
index 3adad04d05..61878eac92 100644
--- a/lib/logflare/application.ex
+++ b/lib/logflare/application.ex
@@ -6,6 +6,7 @@ defmodule Logflare.Application do
alias Logflare.Networking
alias Logflare.Backends.Adaptor.BigQueryAdaptor
+ alias Logflare.Backends.UserMonitoring
alias Logflare.ContextCache
alias Logflare.Logs
alias Logflare.SingleTenant
@@ -78,6 +79,7 @@ defmodule Logflare.Application do
# set goth early in the supervision tree
Networking.pools() ++
conditional_children() ++
+ UserMonitoring.get_otel_exporter() ++
[
Logflare.ErlSysMon,
{PartitionSupervisor, child_spec: Task.Supervisor, name: Logflare.TaskSupervisors},
@@ -123,7 +125,7 @@ defmodule Logflare.Application do
else
:logger.add_primary_filter(
:user_log_intercetor,
- {&Logflare.Backends.UserMonitoring.log_interceptor/2, []}
+ {&UserMonitoring.log_interceptor/2, []}
)
end
end
diff --git a/lib/logflare/backends/adaptor/clickhouse_adaptor.ex b/lib/logflare/backends/adaptor/clickhouse_adaptor.ex
index f376ec16cc..52346b6ad3 100644
--- a/lib/logflare/backends/adaptor/clickhouse_adaptor.ex
+++ b/lib/logflare/backends/adaptor/clickhouse_adaptor.ex
@@ -39,8 +39,6 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do
defdelegate connection_pool_via(arg), to: ConnectionManager
- defguardp is_list_or_map(value) when is_list(value) or is_map(value)
-
@doc false
def child_spec(arg) do
%{
diff --git a/lib/logflare/backends/adaptor/http_based/egress_tracer.ex b/lib/logflare/backends/adaptor/http_based/egress_tracer.ex
index e560f53817..bea5749551 100644
--- a/lib/logflare/backends/adaptor/http_based/egress_tracer.ex
+++ b/lib/logflare/backends/adaptor/http_based/egress_tracer.ex
@@ -5,6 +5,7 @@ defmodule Logflare.Backends.Adaptor.HttpBased.EgressTracer do
Should be used as last middleware called before the `Tesla.Adapter`
"""
+ alias Logflare.Utils
require OpenTelemetry.Tracer
@behaviour Tesla.Middleware
@@ -40,6 +41,17 @@ defmodule Logflare.Backends.Adaptor.HttpBased.EgressTracer do
request_length: body_len + headers_len
] ++ meta_kw
+ str_meta =
+ for {k, v} <- meta_kw, is_binary(k), into: %{} do
+ {Utils.stringify(k), v}
+ end
+
+ :telemetry.execute(
+ [:logflare, :backends, :ingest, :egress],
+ %{request_bytes: body_len + headers_len},
+ str_meta
+ )
+
OpenTelemetry.Tracer.with_span :http_egress, %{attributes: attributes} do
Tesla.run(env, next)
end
diff --git a/lib/logflare/backends/source_sup.ex b/lib/logflare/backends/source_sup.ex
index ceda602102..7309c46af8 100644
--- a/lib/logflare/backends/source_sup.ex
+++ b/lib/logflare/backends/source_sup.ex
@@ -20,7 +20,6 @@ defmodule Logflare.Backends.SourceSup do
alias Logflare.Rules.Rule
alias Logflare.Sources
alias Logflare.Backends.AdaptorSupervisor
- alias Logflare.Backends.UserMonitoring
def child_spec(%Source{id: id} = arg) do
%{
@@ -55,8 +54,6 @@ defmodule Logflare.Backends.SourceSup do
|> Enum.map(&Backend.child_spec(source, &1))
|> Enum.uniq()
- otel_exporter = maybe_get_otel_exporter(source, user)
-
children =
[
{RateCounterServer, [source: source]},
@@ -68,17 +65,11 @@ defmodule Logflare.Backends.SourceSup do
{SlackHookServer, [source: source]},
{BillingWriter, [source: source]},
{SourceSupWorker, [source: source]}
- ] ++ otel_exporter ++ specs
+ ] ++ specs
Supervisor.init(children, strategy: :one_for_one)
end
- defp maybe_get_otel_exporter(%{system_source_type: :metrics} = source, user),
- do: UserMonitoring.get_otel_exporter(source, user)
-
- defp maybe_get_otel_exporter(_, _),
- do: []
-
@doc """
Checks if a rule child is started for a given source/rule.
Must be a backend rule.
diff --git a/lib/logflare/backends/user_monitoring.ex b/lib/logflare/backends/user_monitoring.ex
index 8d35d538ed..2661f7245b 100644
--- a/lib/logflare/backends/user_monitoring.ex
+++ b/lib/logflare/backends/user_monitoring.ex
@@ -4,18 +4,23 @@ defmodule Logflare.Backends.UserMonitoring do
"""
import Telemetry.Metrics
-
+ import Logflare.Utils.Guards
alias Logflare.Users
alias Logflare.Sources
- alias Logflare.Sources.Source
alias Logflare.Logs
alias Logflare.Logs.Processor
alias Opentelemetry.Proto.Collector.Metrics.V1.ExportMetricsServiceRequest
- def get_otel_exporter(source, user) do
+ def get_otel_exporter do
+ export_period =
+ case Application.get_env(:logflare, :env) do
+ :test -> 100
+ _ -> 60_000
+ end
+
otel_exporter_opts =
[
- metrics: metrics(source),
+ metrics: metrics(),
resource: %{
name: "Logflare",
service: %{
@@ -25,60 +30,80 @@ defmodule Logflare.Backends.UserMonitoring do
node: inspect(Node.self()),
cluster: Application.get_env(:logflare, :metadata)[:cluster]
},
- export_callback: generate_exporter_callback(source),
- name: :"#{source.name}-#{user.id}",
- otlp_endpoint: ""
+ export_callback: &exporter_callback/2,
+ extract_tags: &extract_tags/2,
+ name: :user_metrics_exporter,
+ otlp_endpoint: "",
+ export_period: export_period
]
[{OtelMetricExporter, otel_exporter_opts}]
end
- def metrics(reference) do
- keep_function = keep_metric_function(reference)
-
+ def metrics do
[
- sum("logflare.backends.ingest.event_count",
- tags: [:backend_id, :source_id],
- keep: keep_function,
+ sum("logflare.backends.ingest.ingested_bytes",
+ keep: &keep_metric_function/1,
+ description: "Amount of bytes ingested by backend for a source"
+ ),
+ sum("logflare.endpoints.query.total_bytes_processed",
+ keep: &keep_metric_function/1,
+ description: "Amount of bytes processed by a Logflare Endpoint"
+ ),
+ counter("logflare.backends.ingest.ingested_count",
+ measurement: :ingested_bytes,
+ keep: &keep_metric_function/1,
description: "Count of events ingested by backend for a source"
+ ),
+ sum("logflare.backends.ingest.egress.request_bytes",
+ keep: &keep_metric_function/1,
+ description:
+ "Amount of bytes egressed by backend for a source, currently only supports HTTP"
)
]
end
- def keep_metric_function(%Source{} = source) do
- fn metadata ->
- case Users.get_related_user_id(metadata) do
- nil -> false
- user_id -> user_id == source.user_id && user_monitoring?(user_id)
- end
+ def keep_metric_function(metadata) do
+ case Users.get_related_user_id(metadata) do
+ nil -> false
+ user_id -> Users.Cache.get(user_id).system_monitoring
end
end
- def keep_metric_function(:main_exporter) do
- fn metadata ->
- case Users.get_related_user_id(metadata) do
- nil -> true
- user_id -> !Users.Cache.get(user_id).system_monitoring
- end
+ # take all metadata string keys and non-nested values
+ defp extract_tags(_metric, metadata) do
+ for {key, value} <- metadata,
+ is_binary(key) and !is_list_or_map(value) and value != nil,
+ into: %{} do
+ {key, value}
end
end
- defp user_monitoring?(user_id),
- do: Users.Cache.get(user_id).system_monitoring
-
- defp generate_exporter_callback(source) do
- fn {:metrics, metrics}, config ->
- refreshed_source = Sources.refresh_source_metrics(source)
-
- metrics
- |> OtelMetricExporter.Protocol.build_metric_service_request(config.resource)
- |> Protobuf.encode()
- |> Protobuf.decode(ExportMetricsServiceRequest)
- |> Map.get(:resource_metrics)
- |> Processor.ingest(Logs.OtelMetric, refreshed_source)
+ defp exporter_callback({:metrics, metrics}, config) do
+ metrics
+ |> OtelMetricExporter.Protocol.build_metric_service_request(config.resource)
+ |> Protobuf.encode()
+ |> Protobuf.decode(ExportMetricsServiceRequest)
+ |> Map.get(:resource_metrics)
+ |> Logs.OtelMetric.handle_batch(%{})
+ |> Enum.group_by(fn event ->
+ event
+ |> Map.get("attributes")
+ |> Users.get_related_user_id()
+ end)
+ |> Enum.each(fn {user_id, user_events} ->
+ with %Sources.Source{} = source <-
+ Sources.Cache.get_by(user_id: user_id, system_source_type: :metrics) do
+ source =
+ source
+ |> Sources.Cache.preload_rules()
+ |> Sources.refresh_source_metrics()
+
+ Processor.ingest(user_events, Logs.Raw, source)
+ end
+ end)
- :ok
- end
+ :ok
end
@doc """
@@ -89,7 +114,7 @@ defmodule Logflare.Backends.UserMonitoring do
with %{meta: meta} <- log_event,
user_id when is_integer(user_id) <- Users.get_related_user_id(meta),
%{system_monitoring: true} <- Users.Cache.get(user_id),
- %{} = source <- get_system_source(user_id) do
+ %{} = source <- get_system_source_logs(user_id) do
LogflareLogger.Formatter.format(
log_event.level,
format_message(log_event),
@@ -105,7 +130,7 @@ defmodule Logflare.Backends.UserMonitoring do
end
end
- defp get_system_source(user_id),
+ defp get_system_source_logs(user_id),
do:
Sources.Cache.get_by(user_id: user_id, system_source_type: :logs)
|> Sources.refresh_source_metrics()
diff --git a/lib/logflare/endpoints.ex b/lib/logflare/endpoints.ex
index 0514ca38b7..91f992202a 100644
--- a/lib/logflare/endpoints.ex
+++ b/lib/logflare/endpoints.ex
@@ -328,22 +328,31 @@ defmodule Logflare.Endpoints do
[:logflare, :endpoints, :run_query, :exec_query_on_backend],
%{endpoint_id: endpoint_query.id, language: query_language},
fn ->
- result =
- exec_query_on_backend(
- endpoint_query,
- transformed_query,
- declared_params,
- params,
- opts
- )
-
- total_rows =
- case result do
- {:ok, %{rows: rows}} -> length(rows)
- _ -> 0
- end
-
- {result, %{total_rows: total_rows}}
+ exec_query_on_backend(
+ endpoint_query,
+ transformed_query,
+ declared_params,
+ params,
+ opts
+ )
+ |> then(fn
+ {:ok, data} = result ->
+ measurements = %{
+ total_bytes_processed: Map.get(data, :total_bytes_processed, 0)
+ }
+
+ metadata =
+ Map.merge(endpoint_query.parsed_labels || %{}, %{
+ "endpoint_id" => endpoint_query.id
+ })
+
+ :telemetry.execute([:logflare, :endpoints, :query], measurements, metadata)
+
+ {result, %{}}
+
+ result ->
+ {result, %{}}
+ end)
end
)
end
diff --git a/lib/logflare/sources.ex b/lib/logflare/sources.ex
index 325451b4d3..d0eee6c82d 100644
--- a/lib/logflare/sources.ex
+++ b/lib/logflare/sources.ex
@@ -596,6 +596,37 @@ defmodule Logflare.Sources do
:ok
end
+ @doc """
+ Parses source labels from an event, for monitoring.
+
+
+ """
+ @spec get_labels_from_event(Source.t(), LogEvent.t()) :: map()
+ def get_labels_from_event(source, log_event) do
+ mapping = get_labels_mapping(source)
+
+ for {label, path} <- mapping, into: %{} do
+ {label, get_in(log_event.body, path)}
+ end
+ end
+
+ def get_labels_mapping(%{labels: nil}), do: %{}
+ def get_labels_mapping(%{labels: ""}), do: %{}
+
+ def get_labels_mapping(source) do
+ (source.labels || "")
+ |> String.split(",")
+ |> Enum.map(fn label ->
+ String.split(label, "=", parts: 2)
+ |> then(fn
+ [label, "m." <> path] -> {label, ["metadata" | String.split(path, ".")]}
+ [label, path] -> {label, String.split(path, ".")}
+ [label] -> {label, [label]}
+ end)
+ end)
+ |> Map.new()
+ end
+
defp source_idle?(source) do
metrics = get_source_metrics_for_ingest(source)
total_pending = calculate_total_pending(source)
diff --git a/lib/logflare/sources/source.ex b/lib/logflare/sources/source.ex
index 694adcea7b..0d6324a053 100644
--- a/lib/logflare/sources/source.ex
+++ b/lib/logflare/sources/source.ex
@@ -139,6 +139,7 @@ defmodule Logflare.Sources.Source do
field :bigquery_clustering_fields, :string
field :system_source, :boolean, default: false
field :system_source_type, Ecto.Enum, values: @system_source_types
+ field :labels, :string
field :default_ingest_backend_enabled?, :boolean,
source: :default_ingest_backend_enabled,
@@ -197,7 +198,8 @@ defmodule Logflare.Sources.Source do
:transform_copy_fields,
:disable_tailing,
:default_ingest_backend_enabled?,
- :bq_storage_write_api
+ :bq_storage_write_api,
+ :labels
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> default_validations(source)
@@ -226,7 +228,8 @@ defmodule Logflare.Sources.Source do
:transform_copy_fields,
:disable_tailing,
:default_ingest_backend_enabled?,
- :bq_storage_write_api
+ :bq_storage_write_api,
+ :labels
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
|> default_validations(source)
@@ -240,6 +243,7 @@ defmodule Logflare.Sources.Source do
|> unique_constraint(:public_token)
|> put_source_ttl_change()
|> validate_source_ttl(source)
+ |> normalize_and_validate_labels()
end
defp put_source_ttl_change(changeset) do
@@ -271,6 +275,45 @@ defmodule Logflare.Sources.Source do
end
end
+ defp normalize_and_validate_labels(changeset) do
+ {normalized, errors} =
+ case get_change(changeset, :labels) do
+ value when value in [nil, ""] ->
+ {[], []}
+
+ labels ->
+ get_normalized_and_errors(labels)
+ end
+
+ errors
+ |> Enum.uniq()
+ |> Enum.reduce(changeset, fn {k, v}, cs -> add_error(cs, k, v) end)
+ |> put_change(:labels, normalized |> Enum.reverse() |> Enum.join(","))
+ end
+
+ defp get_normalized_and_errors(labels) do
+ labels
+ |> String.split(",", trim: true)
+ |> Enum.reduce({[], []}, fn pair, {normalized, errors} ->
+ case pair |> String.trim() |> String.split("=") do
+ [k, v] when k != "" and v != "" ->
+ {["#{String.trim(k)}=#{String.trim(v)}" | normalized], errors}
+
+ [_, ""] ->
+ {normalized, [{:labels, "each label must have a non-empty value"} | errors]}
+
+ ["", _] ->
+ {normalized, [{:labels, "each label must have a non-empty key"} | errors]}
+
+ [_] ->
+ {normalized, [{:labels, "each label must be in key=value format"} | errors]}
+
+ _ ->
+ {normalized, [{:labels, "each label must have exactly one '=' sign"} | errors]}
+ end
+ end)
+ end
+
def generate_bq_table_id(%__MODULE__{} = source) do
default_project_id = Application.get_env(:logflare, Logflare.Google)[:project_id]
diff --git a/lib/logflare/sources/source/bigquery/pipeline.ex b/lib/logflare/sources/source/bigquery/pipeline.ex
index b97937b1d0..1e8a81b924 100644
--- a/lib/logflare/sources/source/bigquery/pipeline.ex
+++ b/lib/logflare/sources/source/bigquery/pipeline.ex
@@ -108,13 +108,34 @@ defmodule Logflare.Sources.Source.BigQuery.Pipeline do
def ack({queue, source_token}, successful, _failed) do
# TODO: re-queue failed
metrics = Sources.get_source_metrics_for_ingest(source_token)
- {_sid, bid, _tid} = queue
+ {sid, bid, _tid} = queue
- # delete immediately if not default backend or if avg rate is above 100
- if metrics.avg > 100 or bid != nil do
- for %{data: le} <- successful do
+ backend_metadata =
+ if bid do
+ Backends.Cache.get_backend(bid).metadata || %{}
+ else
+ %{}
+ end
+
+ source = Sources.Cache.get_by_id(sid)
+
+ for %{data: le} <- successful do
+ # delete immediately if not default backend or if avg rate is above 100
+ if metrics.avg > 100 or bid != nil do
IngestEventQueue.delete(queue, le)
end
+
+ # emit telemetry on event
+ event_labels = Sources.get_labels_from_event(source, le)
+
+ metrics = %{ingested_bytes: :erlang.external_size(le.body)}
+
+ metadata =
+ %{"source_id" => sid, "backend_id" => bid}
+ |> Map.merge(event_labels)
+ |> Map.merge(backend_metadata)
+
+ :telemetry.execute([:logflare, :backends, :ingest], metrics, metadata)
end
end
diff --git a/lib/logflare/sql/ast_utils.ex b/lib/logflare/sql/ast_utils.ex
index 7f3f70e275..d79ac09690 100644
--- a/lib/logflare/sql/ast_utils.ex
+++ b/lib/logflare/sql/ast_utils.ex
@@ -3,7 +3,7 @@ defmodule Logflare.Sql.AstUtils do
Utilities for traversing and transforming SQL ASTs.
"""
- defguardp is_list_or_map(value) when is_list(value) or is_map(value)
+ import Logflare.Utils.Guards
@doc """
Recursively transforms an AST using a provided transform function.
diff --git a/lib/logflare/users/users.ex b/lib/logflare/users/users.ex
index 285e80fec7..3008314582 100644
--- a/lib/logflare/users/users.ex
+++ b/lib/logflare/users/users.ex
@@ -186,10 +186,15 @@ defmodule Logflare.Users do
def get_related_user_id(map) do
case map do
%{user_id: user_id} -> %{user_id: user_id}
+ %{"user_id" => user_id} -> %{user_id: user_id}
%{source_id: source_id} -> Sources.Cache.get_by_id(source_id)
+ %{"source_id" => source_id} -> Sources.Cache.get_by_id(source_id)
%{source_token: token} -> Sources.Cache.get_source_by_token(token)
+ %{"source_token" => token} -> Sources.Cache.get_source_by_token(token)
%{backend_id: backend_id} -> Backends.Cache.get_backend(backend_id)
+ %{"backend_id" => backend_id} -> Backends.Cache.get_backend(backend_id)
%{endpoint_id: endpoint_id} -> Endpoints.Cache.get_endpoint_query(endpoint_id)
+ %{"endpoint_id" => endpoint_id} -> Endpoints.Cache.get_endpoint_query(endpoint_id)
_ -> nil
end
|> case do
diff --git a/lib/logflare/utils/guards.ex b/lib/logflare/utils/guards.ex
index e34cf6474e..4717534351 100644
--- a/lib/logflare/utils/guards.ex
+++ b/lib/logflare/utils/guards.ex
@@ -50,4 +50,9 @@ defmodule Logflare.Utils.Guards do
Guard that indicates if the value is a valid percentile aggregate.
"""
defguard is_percentile_aggregate(value) when value in [:p50, :p95, :p99]
+
+ @doc """
+ Guard that indicates if the value is a list or map.
+ """
+ defguard is_list_or_map(value) when is_list(value) or is_map(value)
end
diff --git a/lib/logflare_web/templates/source/edit.html.eex b/lib/logflare_web/templates/source/edit.html.eex
index 95bdcfd60f..29ccc0b7a7 100644
--- a/lib/logflare_web/templates/source/edit.html.eex
+++ b/lib/logflare_web/templates/source/edit.html.eex
@@ -312,6 +312,22 @@
<%= submit "Save", class: "btn btn-primary form-button" %>
<% end %>
+
+ <%= if @user.system_monitoring do %>
+ <%= section_header("Monitoring Labels") %>
+
Set labels for this source, to be used for monitoring. Labels are used to group metrics together when system monitoring is enabled. Values of the labels will be determined at ingest time.
+ <%= form_for @changeset, Routes.source_path(@conn, :update, @source),fn f -> %>
+
+ <%= text_input f, :labels , class: "form-control form-control-margin" %>
+ <%= error_tag f, :labels %>
+
+ Comma-separated. For example: status=m.level,my_project=project.name.
+
+
+ <%= submit "Save", class: "btn btn-primary form-button" %>
+ <% end %>
+ <% end %>
+
<%= section_header("Search Tailing") %>
<%= form_for @changeset, Routes.source_path(@conn, :update, @source),fn f -> %>
diff --git a/lib/telemetry.ex b/lib/telemetry.ex
index 131c7e4295..4d07628ca4 100644
--- a/lib/telemetry.ex
+++ b/lib/telemetry.ex
@@ -4,8 +4,6 @@ defmodule Logflare.Telemetry do
import Telemetry.Metrics
import Logflare.Utils, only: [ets_info: 1]
- alias Logflare.Backends.UserMonitoring
-
def start_link(arg), do: Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
@caches [
@@ -202,16 +200,13 @@ defmodule Logflare.Telemetry do
)
]
- user_specific_metrics = UserMonitoring.metrics(:main_exporter)
-
Enum.concat([
phoenix_metrics,
database_metrics,
vm_metrics,
cache_metrics,
broadway_metrics,
- application_metrics,
- user_specific_metrics
+ application_metrics
])
end
diff --git a/mix.exs b/mix.exs
index 2fc2ae9292..cfd6aec9e9 100644
--- a/mix.exs
+++ b/mix.exs
@@ -242,7 +242,7 @@ defmodule Logflare.Mixfile do
{:opentelemetry_phoenix, "~> 2.0.0-rc.2"},
{:opentelemetry_bandit, "~> 0.2.0-rc.1"},
{:otel_metric_exporter,
- git: "https://github.com/supabase/elixir-otel-metric-exporter", ref: "61e2d37"},
+ git: "https://github.com/supabase/elixir-otel-metric-exporter", ref: "e44bc0e"},
{:live_monaco_editor, "~> 0.2"}
]
end
diff --git a/mix.lock b/mix.lock
index 3d697a3b54..cad854da84 100644
--- a/mix.lock
+++ b/mix.lock
@@ -110,7 +110,7 @@
"opentelemetry_semantic_conventions": {:hex, :opentelemetry_semantic_conventions, "1.27.0", "acd0194a94a1e57d63da982ee9f4a9f88834ae0b31b0bd850815fe9be4bbb45f", [:mix, :rebar3], [], "hexpm", "9681ccaa24fd3d810b4461581717661fd85ff7019b082c2dff89c7d5b1fc2864"},
"opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.1.2", "410ab4d76b0921f42dbccbe5a7c831b8125282850be649ee1f70050d3961118a", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.3", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.1", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "641ab469deb181957ac6d59bce6e1321d5fe2a56df444fc9c19afcad623ab253"},
"otel_http": {:hex, :otel_http, "0.2.0", "b17385986c7f1b862f5d577f72614ecaa29de40392b7618869999326b9a61d8a", [:rebar3], [], "hexpm", "f2beadf922c8cfeb0965488dd736c95cc6ea8b9efce89466b3904d317d7cc717"},
- "otel_metric_exporter": {:git, "https://github.com/supabase/elixir-otel-metric-exporter", "61e2d371638e5ff7edad09639a0d9bc7a694d875", [ref: "61e2d37"]},
+ "otel_metric_exporter": {:git, "https://github.com/supabase/elixir-otel-metric-exporter", "e44bc0e7f3013933df022cae6d0b9b90320f3d1d", [ref: "e44bc0e"]},
"parse_trans": {:hex, :parse_trans, "3.4.1", "6e6aa8167cb44cc8f39441d05193be6e6f4e7c2946cb2759f015f8c56b76e5ff", [:rebar3], [], "hexpm", "620a406ce75dada827b82e453c19cf06776be266f5a67cff34e1ef2cbb60e49a"},
"pgoutput_decoder": {:hex, :pgoutput_decoder, "0.1.0", "d4ffae6e58a563f2e6de8a0495d9f9afbe2f4ac75d6805419cd4a0d05f414c00", [:mix], [], "hexpm", "4dbecbe4eb8de728178fd129ccba810bccafa9a8769c6714c8b7b22963081c27"},
"phoenix": {:hex, :phoenix, "1.7.21", "14ca4f1071a5f65121217d6b57ac5712d1857e40a0833aff7a691b7870fc9a3b", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "336dce4f86cba56fed312a7d280bf2282c720abb6074bdb1b61ec8095bdd0bc9"},
diff --git a/priv/repo/migrations/20251112085259_user_system_monitoring_labels.exs b/priv/repo/migrations/20251112085259_user_system_monitoring_labels.exs
new file mode 100644
index 0000000000..0a1cdb1a14
--- /dev/null
+++ b/priv/repo/migrations/20251112085259_user_system_monitoring_labels.exs
@@ -0,0 +1,9 @@
+defmodule Logflare.Repo.Migrations.UserSystemMonitoringLabels do
+ use Ecto.Migration
+
+ def change do
+ alter table(:sources) do
+ add :labels, :text
+ end
+ end
+end
diff --git a/test/logflare/alerting_test.exs b/test/logflare/alerting_test.exs
index 3dd2bad35b..41d33574dc 100644
--- a/test/logflare/alerting_test.exs
+++ b/test/logflare/alerting_test.exs
@@ -208,7 +208,7 @@ defmodule Logflare.AlertingTest do
{:ok, TestUtils.gen_bq_response([%{"testing" => "123"}])}
end)
- assert {:ok, %{rows: [%{"testing" => "123"}], total_bytes_processed: 1}} =
+ assert {:ok, %{rows: [%{"testing" => "123"}], total_bytes_processed: _}} =
Alerting.execute_alert_query(alert_query)
# no reservation set by user
@@ -227,7 +227,7 @@ defmodule Logflare.AlertingTest do
{:ok, TestUtils.gen_bq_response([%{"testing" => "123"}])}
end)
- assert {:ok, %{rows: [%{"testing" => "123"}], total_bytes_processed: 1}} =
+ assert {:ok, %{rows: [%{"testing" => "123"}], total_bytes_processed: _}} =
Alerting.execute_alert_query(alert_query)
assert_receive {:reservation, reservation}
diff --git a/test/logflare/backends/user_monitoring_test.exs b/test/logflare/backends/user_monitoring_test.exs
index 6c27addad2..b814ead356 100644
--- a/test/logflare/backends/user_monitoring_test.exs
+++ b/test/logflare/backends/user_monitoring_test.exs
@@ -11,6 +11,8 @@ defmodule Logflare.Backends.UserMonitoringTest do
alias Logflare.Backends.SourceSup
alias Logflare.Backends.UserMonitoring
alias Logflare.SystemMetrics.AllLogsLogged
+ alias Logflare.LogEvent
+ alias Logflare.Endpoints
def source_and_user(_context) do
start_supervised!(AllLogsLogged)
@@ -27,7 +29,25 @@ defmodule Logflare.Backends.UserMonitoringTest do
[source: source, source_b: source_b, user: user]
end
+ def start_otel_exporter(_context) do
+ [spec] = UserMonitoring.get_otel_exporter()
+ start_supervised!(spec)
+ :ok
+ end
+
describe "logs" do
+ setup do
+ :ok =
+ :logger.add_primary_filter(
+ :user_log_intercetor,
+ {&UserMonitoring.log_interceptor/2, []}
+ )
+
+ on_exit(fn ->
+ :logger.remove_primary_filter(:user_log_intercetor)
+ end)
+ end
+
setup :source_and_user
test "are routed to user's system source when monitoring is on", %{user: user, source: source} do
@@ -66,189 +86,251 @@ defmodule Logflare.Backends.UserMonitoringTest do
&match?(%{body: %{"event_message" => "user not monitoring"}}, &1)
)
end
+ end
+
+ describe "system monitoring labels" do
+ setup :start_otel_exporter
setup do
- :ok =
- :logger.add_primary_filter(
- :user_log_intercetor,
- {&UserMonitoring.log_interceptor/2, []}
- )
+ start_supervised!(AllLogsLogged)
+ insert(:plan)
+ :ok
+ end
- on_exit(fn ->
- :logger.remove_primary_filter(:user_log_intercetor)
+ test "backends.ingest.ingested_bytes and backends.ingest.ingested_count" do
+ GoogleApi.BigQuery.V2.Api.Tabledata
+ |> stub(:bigquery_tabledata_insert_all, fn _conn,
+ _project_id,
+ _dataset_id,
+ _table_name,
+ _opts ->
+ {:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
+ end)
+
+ user =
+ insert(:user, system_monitoring: true)
+
+ source = insert(:source, user: user, labels: "my_label=m.value")
+ metrics_source = insert(:source, user: user, system_source_type: :metrics)
+ start_supervised!({SourceSup, metrics_source}, id: :metrics_source)
+ start_supervised!({SourceSup, source}, id: :source)
+
+ :timer.sleep(1000)
+
+ assert {:ok, _} = Backends.ingest_logs([%{"metadata" => %{"value" => "test"}}], source)
+
+ TestUtils.retry_assert(fn ->
+ assert [_] = Backends.list_recent_logs_local(source)
+
+ assert [
+ _ | _
+ ] = events = Backends.list_recent_logs_local(metrics_source)
+
+ assert Enum.any?(
+ events,
+ &match?(%LogEvent{body: %{"attributes" => %{"my_label" => "test"}}}, &1)
+ )
+
+ assert Enum.any?(
+ events,
+ &match?(
+ %LogEvent{
+ body: %{"event_message" => "logflare.backends.ingest.ingested_count"}
+ },
+ &1
+ )
+ )
+
+ assert Enum.any?(
+ events,
+ &match?(
+ %LogEvent{
+ body: %{"event_message" => "logflare.backends.ingest.ingested_bytes"}
+ },
+ &1
+ )
+ )
end)
end
- end
- describe "metrics" do
- setup do
- insert(:plan)
+ test "other users metrics" do
+ pid = self()
- user_1 = insert(:user)
- user_1 = Users.preload_defaults(user_1)
+ user =
+ insert(:user, system_monitoring: true)
- backend_1 =
- insert(:backend,
- user: user_1,
- type: :webhook,
- config: %{url: "http://test.com"},
- default_ingest?: false
- )
+ other_user =
+ insert(:user, system_monitoring: true)
- user_2 = insert(:user)
- user_2 = Users.preload_defaults(user_2)
+ source = insert(:source, user: user, labels: "my_label=m.value")
- backend_2 =
- insert(:backend,
- user: user_2,
- type: :webhook,
- config: %{url: "http://test.com"},
- default_ingest?: false
- )
+ other_source = insert(:source, user: other_user, labels: "my_label=m.value")
- {:ok, user_1: user_1, user_2: user_2, backend_1: backend_1, backend_2: backend_2}
- end
+ metrics_source = insert(:source, user: user, system_source_type: :metrics)
+ other_metrics_source = insert(:source, user: other_user, system_source_type: :metrics)
+ start_supervised!({SourceSup, source}, id: :source)
+ start_supervised!({SourceSup, other_source}, id: :other_source)
+ start_supervised!({SourceSup, metrics_source}, id: :metrics_source)
+ start_supervised!({SourceSup, other_metrics_source}, id: :other_metrics_source)
- test "are routed to user's system source when flag is true", %{
- user_1: user,
- backend_1: %{id: backend_id}
- } do
- user |> Users.update_user_allowed(%{system_monitoring: true})
- metadata = %{backend_id: backend_id}
- keep_function = UserMonitoring.keep_metric_function(:main_exporter)
+ GoogleApi.BigQuery.V2.Api.Tabledata
+ |> stub(:bigquery_tabledata_insert_all, fn _conn,
+ _project_id,
+ dataset_id,
+ _table_name,
+ opts ->
+ if String.starts_with?(dataset_id, "#{other_user.id}") do
+ send(pid, {:insert_all, opts[:body].rows})
+ end
- # main exporter's metric keep function returns false
- refute keep_function.(metadata)
+ {:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
+ end)
- # user exporter keep user specific metrics
+ :timer.sleep(500)
- system_source =
- Sources.get_by(user_id: user.id, system_source_type: :metrics)
+ assert {:ok, _} = Backends.ingest_logs([%{"metadata" => %{"value" => "test"}}], source)
- start_supervised!({SourceSup, system_source})
+ assert {:ok, _} =
+ Backends.ingest_logs([%{"metadata" => %{"value" => "different"}}], other_source)
- :telemetry.execute([:logflare, :backends, :ingest], %{event_count: 456}, metadata)
+ :timer.sleep(1500)
- user_exporter_metrics =
- OtelMetricExporter.MetricStore.get_metrics(:"system.metrics-#{user.id}")
+ source_id = source.id
+ other_source_id = other_source.id
+ metrics_source_id = metrics_source.id
+ other_metrics_source_id = other_metrics_source.id
+ assert_receive {:insert_all, [%{json: %{"attributes" => _}} | _] = rows}, 5_000
- assert match?(
- %{
- {:sum, "logflare.backends.ingest.event_count"} => %{
- %{backend_id: ^backend_id} => 456
- }
- },
- user_exporter_metrics
- )
+ rows = for row <- rows, do: row.json
+
+ assert Enum.all?(rows, &match?(%{"attributes" => [%{"my_label" => "different"}]}, &1))
+
+ for row <- rows, attr <- row["attributes"] do
+ assert attr["source_id"] in [other_source_id, other_metrics_source_id]
+ refute attr["source_id"] in [source_id, metrics_source_id]
+ refute attr["my_label"] == "test"
+ end
end
+ end
- test "stay on main exporter when flag is false", %{
- user_1: user,
- backend_1: %{id: backend_id}
- } do
- metadata = %{backend_id: backend_id}
- keep_function = UserMonitoring.keep_metric_function(:main_exporter)
+ describe "egress" do
+ setup :start_otel_exporter
+
+ setup do
+ start_supervised!(AllLogsLogged)
+ insert(:plan)
+ :ok
+ end
- # main exporter's metric keep function returns true
- assert keep_function.(metadata)
+ test "backends.ingest.egress.request_bytes includes backend metadata" do
+ pid = self()
- # if SourceSup is up, it stil won't ingest any metric
- system_source =
- user.id
- |> Sources.create_user_system_sources()
- |> Enum.find(&(&1.system_source_type == :metrics))
+ GoogleApi.BigQuery.V2.Api.Tabledata
+ |> stub(:bigquery_tabledata_insert_all, fn _, _, _, _, opts ->
+ send(pid, {:insert_all, opts[:body].rows})
+ {:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
+ end)
- start_supervised!({SourceSup, system_source})
+ user = insert(:user, system_monitoring: true)
+ metrics_source = insert(:source, user: user, system_source_type: :metrics)
- :telemetry.execute([:logflare, :backends, :ingest], %{value: 456}, metadata)
+ webhook_backend =
+ insert(:backend,
+ user: user,
+ type: :webhook,
+ config: %{url: "http://127.0.0.1:9999/webhook"},
+ metadata: %{"environment" => "test", "region" => "us-west"}
+ )
- user_exporter_metrics =
- OtelMetricExporter.MetricStore.get_metrics(:"system.metrics-#{user.id}")
+ source = insert(:source, user: user)
- refute match?(
- %{
- {:sum, "logflare.backends.ingest.event_count"} => %{
- %{backend_id: ^backend_id} => 456
- }
- },
- user_exporter_metrics
- )
- end
+ start_supervised!({SourceSup, metrics_source}, id: :metrics_source)
+ start_supervised!({SourceSup, source}, id: :source)
- test "dont get mixed between users", %{
- user_1: user_1,
- user_2: user_2,
- backend_1: %{id: backend_1_id},
- backend_2: %{id: backend_2_id}
- } do
- # user 1 setup
- user_1 |> Users.update_user_allowed(%{system_monitoring: true})
+ {:ok, _} = Backends.update_source_backends(source, [webhook_backend])
+ Backends.Cache.get_backend(webhook_backend.id)
- system_source_1 =
- Sources.get_by(user_id: user_1.id, system_source_type: :metrics)
+ :timer.sleep(1000)
- start_supervised!({SourceSup, system_source_1}, id: {:source_sup, 1})
+ assert {:ok, _} = Backends.ingest_logs([%{"message" => "test webhook egress"}], source)
- # user 2 setup
+ :timer.sleep(2500)
- user_2 |> Users.update_user_allowed(%{system_monitoring: true})
+ assert_receive {:insert_all, [%{json: %{"attributes" => _}} | _] = rows}, 5_000
- system_source_2 =
- Sources.get_by(user_id: user_2.id, system_source_type: :metrics)
+ rows = for row <- rows, do: row.json
- start_supervised!({SourceSup, system_source_2}, id: {:source_sup, 2})
+ egress_row =
+ Enum.find(
+ rows,
+ &match?(%{"event_message" => "logflare.backends.ingest.egress.request_bytes"}, &1)
+ )
- # sending signals related to both users
+ assert egress_row, "Expected egress metric to be present"
- :telemetry.execute([:logflare, :backends, :ingest], %{event_count: 123}, %{
- backend_id: backend_1_id
- })
+ [attributes] = egress_row["attributes"]
- :telemetry.execute([:logflare, :backends, :ingest], %{event_count: 456}, %{
- backend_id: backend_2_id
- })
+ assert attributes["source_id"] == source.id
+ assert attributes["backend_id"] == webhook_backend.id
+ assert attributes["_backend_environment"] == "test"
+ assert attributes["_backend_region"] == "us-west"
+ end
+ end
- user_1_exporter_metrics =
- OtelMetricExporter.MetricStore.get_metrics(:"system.metrics-#{user_1.id}")
+ describe "endpoints" do
+ setup :start_otel_exporter
- assert match?(
- %{
- {:sum, "logflare.backends.ingest.event_count"} => %{
- %{backend_id: ^backend_1_id} => 123
- }
- },
- user_1_exporter_metrics
- )
+ setup do
+ start_supervised!(AllLogsLogged)
+ insert(:plan)
+ :ok
+ end
- refute match?(
- %{
- {:sum, "logflare.backends.ingest.event_count"} => %{
- %{backend_id: ^backend_2_id} => _
- }
- },
- user_1_exporter_metrics
- )
+ test "endpoints.query.total_bytes_processed" do
+ pid = self()
+
+ GoogleApi.BigQuery.V2.Api.Tabledata
+ |> stub(:bigquery_tabledata_insert_all, fn _conn,
+ _project_id,
+ _dataset_id,
+ _table_name,
+ opts ->
+ send(pid, {:insert_all, opts[:body].rows})
+ {:ok, %GoogleApi.BigQuery.V2.Model.TableDataInsertAllResponse{insertErrors: nil}}
+ end)
- user_2_exporter_metrics =
- OtelMetricExporter.MetricStore.get_metrics(:"system.metrics-#{user_2.id}")
+ expect(GoogleApi.BigQuery.V2.Api.Jobs, :bigquery_jobs_query, 1, fn _conn, _proj_id, _opts ->
+ {:ok, TestUtils.gen_bq_response([%{"result" => "1"}])}
+ end)
- assert match?(
- %{
- {:sum, "logflare.backends.ingest.event_count"} => %{
- %{backend_id: ^backend_2_id} => 456
- }
- },
- user_2_exporter_metrics
- )
+ user = insert(:user, system_monitoring: true)
+ source = insert(:source, user: user, system_source_type: :metrics)
+ start_supervised!({SourceSup, source}, id: :source)
+ # execute a query on the endpoint
+ endpoint =
+ insert(:endpoint,
+ user: user,
+ query: "SELECT 1",
+ labels: "my_label=some_value",
+ parsed_labels: %{"my_label" => "some_value"}
+ )
- refute match?(
- %{
- {:sum, "logflare.backends.ingest.event_count"} => %{
- %{backend_id: ^backend_1_id} => _
- }
- },
- user_2_exporter_metrics
- )
+ assert {:ok, _} = Endpoints.run_query(endpoint)
+ :timer.sleep(1000)
+ endpoint_id = endpoint.id
+
+ assert_receive {:insert_all,
+ [
+ %{
+ json: %{
+ "attributes" => [
+ %{"my_label" => "some_value", "endpoint_id" => ^endpoint_id}
+ ]
+ }
+ }
+ | _
+ ]},
+ 5_000
end
end
end
diff --git a/test/logflare/bigquery/pipeline_test.exs b/test/logflare/bigquery/pipeline_test.exs
index e136284917..4c398b5b5f 100644
--- a/test/logflare/bigquery/pipeline_test.exs
+++ b/test/logflare/bigquery/pipeline_test.exs
@@ -13,6 +13,7 @@ defmodule Logflare.BigQuery.PipelineTest do
@pipeline_name :test_pipeline
describe "pipeline" do
setup do
+ insert(:plan)
user = insert(:user)
source = insert(:source, user_id: user.id)
{:ok, source: source}
diff --git a/test/logflare/sources_test.exs b/test/logflare/sources_test.exs
index 1c0ce0a7e8..06d1b5cc8e 100644
--- a/test/logflare/sources_test.exs
+++ b/test/logflare/sources_test.exs
@@ -520,4 +520,44 @@ defmodule Logflare.SourcesTest do
end)
end
end
+
+ describe "labels validation and normalization" do
+ setup do
+ insert(:plan)
+ user = insert(:user)
+ %{source: insert(:source, user_id: user.id)}
+ end
+
+ test "normalizes whitespace around commas and equals", %{source: source} do
+ for {input, expected} <- [
+ {"key1=value1 , key2=value2", "key1=value1,key2=value2"},
+ {"key1 = value1,key2= value2", "key1=value1,key2=value2"},
+ {" key1=value1 , key2=value2 ", "key1=value1,key2=value2"},
+ {"key1=value1,,key2=value2,", "key1=value1,key2=value2"}
+ ] do
+ assert %_{valid?: true} = changeset = Source.changeset(source, %{labels: input})
+ assert get_change(changeset, :labels) == expected
+ end
+ end
+
+ test "accepts valid formats and empty string", %{source: source} do
+ assert Source.changeset(source, %{labels: ""}).valid?
+ assert Source.changeset(source, %{labels: "status=m.level,project=name"}).valid?
+ end
+
+ test "rejects invalid label formats", %{source: source} do
+ invalid_cases = [
+ {"no_equals", "each label must be in key=value format"},
+ {"=value", "each label must have a non-empty key"},
+ {"key=", "each label must have a non-empty value"},
+ {"key=val=extra", "each label must have exactly one '=' sign"},
+ {"valid=ok,invalid", "each label must be in key=value format"}
+ ]
+
+ for {input, expected_error} <- invalid_cases do
+ assert %_{valid?: false} = changeset = Source.changeset(source, %{labels: input})
+ assert expected_error in errors_on(changeset).labels
+ end
+ end
+ end
end
diff --git a/test/logflare_web/live/alerts/alerts_live_test.exs b/test/logflare_web/live/alerts/alerts_live_test.exs
index 92a6cba514..83cad77456 100644
--- a/test/logflare_web/live/alerts/alerts_live_test.exs
+++ b/test/logflare_web/live/alerts/alerts_live_test.exs
@@ -304,7 +304,7 @@ defmodule LogflareWeb.AlertsLiveTest do
|> element("button", "Run query")
|> render_click() =~ "results-123"
- assert view |> render() =~ "1 byte processed"
+ assert view |> render() =~ ~r/1 .+ processed/
end
test "errors from BQ are dispalyed", %{conn: conn, alert_query: alert_query} do
diff --git a/test/logflare_web/live_views/endpoints_live_test.exs b/test/logflare_web/live_views/endpoints_live_test.exs
index 9f456d39cf..a7d93c6114 100644
--- a/test/logflare_web/live_views/endpoints_live_test.exs
+++ b/test/logflare_web/live_views/endpoints_live_test.exs
@@ -329,7 +329,7 @@ defmodule LogflareWeb.EndpointsLiveTest do
assert has_element?(view, "label", "Max limit")
assert has_element?(view, "label", "Enable authentication")
- assert view |> render() =~ "1 byte processed"
+ assert view |> render() =~ ~r/1 .+ processed/
assert view
|> element("form#endpoint")
diff --git a/test/logflare_web/live_views/query_live_test.exs b/test/logflare_web/live_views/query_live_test.exs
index 00a0a567d9..b5b2f78d55 100644
--- a/test/logflare_web/live_views/query_live_test.exs
+++ b/test/logflare_web/live_views/query_live_test.exs
@@ -30,7 +30,7 @@ defmodule LogflareWeb.QueryLiveTest do
|> element("form")
|> render_submit(%{}) =~ "Ran query successfully"
- assert view |> render() =~ "1 byte processed"
+ assert view |> render() =~ ~r/1 .+ processed/
assert_patch(view) =~ ~r/current_timestamp/
assert render(view) =~ "some-data"
diff --git a/test/support/test_utils.ex b/test/support/test_utils.ex
index 184c8c528f..2aba04b7bd 100644
--- a/test/support/test_utils.ex
+++ b/test/support/test_utils.ex
@@ -202,7 +202,7 @@ defmodule Logflare.TestUtils do
rows: rows,
schema: schema,
# Simple result length as test value
- totalBytesProcessed: length(rows) |> to_string(),
+ totalBytesProcessed: (length(rows) * 1024) |> to_string(),
totalRows: inspect(length(results))
}
end