Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.26.13
1.26.14
77 changes: 77 additions & 0 deletions docs/docs.logflare.com/docs/concepts/system-monitoring.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
---
sidebar_position: 6
---

# System Monitoring

System Monitoring collects metrics and logs about your sources, backends, and endpoints. Dedicated system sources are provisioned to capture such operational data:

- **`system.metrics`** - OpenTelemetry metrics for ingestion, queries, and egress
- **`system.logs`** - Application logs for your sources and backends

System sources behave like regular sources. Query, search, and monitor them with standard Logflare tools. They appear as favorites by default.

## Enabling System Monitoring

1. Navigate to **Account Settings** at `/account/edit`
2. Find the **"System monitoring"** section
3. Check **"Enable system monitoring"**
4. Click **"Update account"**

Logflare creates the three system sources and starts collecting data every 60 seconds. Disabling stops data collection immediately.

## System Sources

### system.metrics

Contains OpenTelemetry metrics as structured events. Each metric includes:

- **`event_message`** - Metric name
- **`attributes`** - Key-value pairs with metric dimensions and values
- **`timestamp`** - When the metric was recorded

Metrics are collected every 60 seconds.

### system.logs

Contains application logs related to your sources, backends, and endpoints.

## Metrics Collected

| Metric | Description | Metadata |
| ------------------------------------------------ | ---------------------------------------------------------------------------- | ---------------------------------------- |
| `logflare.backends.ingest.ingested_bytes` | Total bytes ingested per source. Tracks storage consumption. | `source_id`, `backend_id`, custom labels |
| `logflare.backends.ingest.ingested_count` | Count of events ingested per source. Tracks ingestion volume. | `source_id`, `backend_id`, custom labels |
| `logflare.endpoints.query.total_bytes_processed` | Bytes processed when executing endpoint queries. Tracks query costs. | `endpoint_id`, custom labels |
| `logflare.backends.ingest.egress.request_bytes` | Bytes sent to external HTTP endpoints and webhooks. Tracks egress bandwidth. | Backend-specific metadata |

## Custom Labels

Add dimensions to metrics through labels on sources and endpoints. Labels appear in the `attributes` field of metric events.

For endpoint labelling behavior, see [Query Tagging with Labels](/concepts/endpoints#query-tagging-with-labels).

### Format

Use comma-separated key-value pairs:

```
environment=production,region=us-east,team=backend
```

### Ingest-time Field Extraction

Extract values from event metadata using field paths:

```
label_name=field.path
label_name=m.field.path
```

Examples:

- `environment=m.env` extracts `metadata.env`
- `user_type=m.user.type` extracts `metadata.user.type`
- `region=region` extracts top-level `region`

Only string values are extracted. Nested maps and lists are excluded.
4 changes: 3 additions & 1 deletion lib/logflare/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ defmodule Logflare.Application do

alias Logflare.Networking
alias Logflare.Backends.Adaptor.BigQueryAdaptor
alias Logflare.Backends.UserMonitoring
alias Logflare.ContextCache
alias Logflare.Logs
alias Logflare.SingleTenant
Expand Down Expand Up @@ -78,6 +79,7 @@ defmodule Logflare.Application do
# set goth early in the supervision tree
Networking.pools() ++
conditional_children() ++
UserMonitoring.get_otel_exporter() ++
[
Logflare.ErlSysMon,
{PartitionSupervisor, child_spec: Task.Supervisor, name: Logflare.TaskSupervisors},
Expand Down Expand Up @@ -123,7 +125,7 @@ defmodule Logflare.Application do
else
:logger.add_primary_filter(
:user_log_intercetor,
{&Logflare.Backends.UserMonitoring.log_interceptor/2, []}
{&UserMonitoring.log_interceptor/2, []}
)
end
end
Expand Down
2 changes: 0 additions & 2 deletions lib/logflare/backends/adaptor/clickhouse_adaptor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ defmodule Logflare.Backends.Adaptor.ClickHouseAdaptor do

defdelegate connection_pool_via(arg), to: ConnectionManager

defguardp is_list_or_map(value) when is_list(value) or is_map(value)

@doc false
def child_spec(arg) do
%{
Expand Down
12 changes: 12 additions & 0 deletions lib/logflare/backends/adaptor/http_based/egress_tracer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Logflare.Backends.Adaptor.HttpBased.EgressTracer do
Should be used as last middleware called before the `Tesla.Adapter`
"""

alias Logflare.Utils
require OpenTelemetry.Tracer

@behaviour Tesla.Middleware
Expand Down Expand Up @@ -40,6 +41,17 @@ defmodule Logflare.Backends.Adaptor.HttpBased.EgressTracer do
request_length: body_len + headers_len
] ++ meta_kw

str_meta =
for {k, v} <- meta_kw, is_binary(k), into: %{} do
{Utils.stringify(k), v}
end

:telemetry.execute(
[:logflare, :backends, :ingest, :egress],
%{request_bytes: body_len + headers_len},
str_meta
)

OpenTelemetry.Tracer.with_span :http_egress, %{attributes: attributes} do
Tesla.run(env, next)
end
Expand Down
11 changes: 1 addition & 10 deletions lib/logflare/backends/source_sup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ defmodule Logflare.Backends.SourceSup do
alias Logflare.Rules.Rule
alias Logflare.Sources
alias Logflare.Backends.AdaptorSupervisor
alias Logflare.Backends.UserMonitoring

def child_spec(%Source{id: id} = arg) do
%{
Expand Down Expand Up @@ -55,8 +54,6 @@ defmodule Logflare.Backends.SourceSup do
|> Enum.map(&Backend.child_spec(source, &1))
|> Enum.uniq()

otel_exporter = maybe_get_otel_exporter(source, user)

children =
[
{RateCounterServer, [source: source]},
Expand All @@ -68,17 +65,11 @@ defmodule Logflare.Backends.SourceSup do
{SlackHookServer, [source: source]},
{BillingWriter, [source: source]},
{SourceSupWorker, [source: source]}
] ++ otel_exporter ++ specs
] ++ specs

Supervisor.init(children, strategy: :one_for_one)
end

defp maybe_get_otel_exporter(%{system_source_type: :metrics} = source, user),
do: UserMonitoring.get_otel_exporter(source, user)

defp maybe_get_otel_exporter(_, _),
do: []

@doc """
Checks if a rule child is started for a given source/rule.
Must be a backend rule.
Expand Down
109 changes: 67 additions & 42 deletions lib/logflare/backends/user_monitoring.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,23 @@ defmodule Logflare.Backends.UserMonitoring do
"""

import Telemetry.Metrics

import Logflare.Utils.Guards
alias Logflare.Users
alias Logflare.Sources
alias Logflare.Sources.Source
alias Logflare.Logs
alias Logflare.Logs.Processor
alias Opentelemetry.Proto.Collector.Metrics.V1.ExportMetricsServiceRequest

def get_otel_exporter(source, user) do
def get_otel_exporter do
export_period =
case Application.get_env(:logflare, :env) do
:test -> 100
_ -> 60_000
end

otel_exporter_opts =
[
metrics: metrics(source),
metrics: metrics(),
resource: %{
name: "Logflare",
service: %{
Expand All @@ -25,60 +30,80 @@ defmodule Logflare.Backends.UserMonitoring do
node: inspect(Node.self()),
cluster: Application.get_env(:logflare, :metadata)[:cluster]
},
export_callback: generate_exporter_callback(source),
name: :"#{source.name}-#{user.id}",
otlp_endpoint: ""
export_callback: &exporter_callback/2,
extract_tags: &extract_tags/2,
name: :user_metrics_exporter,
otlp_endpoint: "",
export_period: export_period
]

[{OtelMetricExporter, otel_exporter_opts}]
end

def metrics(reference) do
keep_function = keep_metric_function(reference)

def metrics do
[
sum("logflare.backends.ingest.event_count",
tags: [:backend_id, :source_id],
keep: keep_function,
sum("logflare.backends.ingest.ingested_bytes",
keep: &keep_metric_function/1,
description: "Amount of bytes ingested by backend for a source"
),
sum("logflare.endpoints.query.total_bytes_processed",
keep: &keep_metric_function/1,
description: "Amount of bytes processed by a Logflare Endpoint"
),
counter("logflare.backends.ingest.ingested_count",
measurement: :ingested_bytes,
keep: &keep_metric_function/1,
description: "Count of events ingested by backend for a source"
),
sum("logflare.backends.ingest.egress.request_bytes",
keep: &keep_metric_function/1,
description:
"Amount of bytes egressed by backend for a source, currently only supports HTTP"
)
]
end

def keep_metric_function(%Source{} = source) do
fn metadata ->
case Users.get_related_user_id(metadata) do
nil -> false
user_id -> user_id == source.user_id && user_monitoring?(user_id)
end
def keep_metric_function(metadata) do
case Users.get_related_user_id(metadata) do
nil -> false
user_id -> Users.Cache.get(user_id).system_monitoring
end
end

def keep_metric_function(:main_exporter) do
fn metadata ->
case Users.get_related_user_id(metadata) do
nil -> true
user_id -> !Users.Cache.get(user_id).system_monitoring
end
# take all metadata string keys and non-nested values
defp extract_tags(_metric, metadata) do
for {key, value} <- metadata,
is_binary(key) and !is_list_or_map(value) and value != nil,
into: %{} do
{key, value}
end
end

defp user_monitoring?(user_id),
do: Users.Cache.get(user_id).system_monitoring

defp generate_exporter_callback(source) do
fn {:metrics, metrics}, config ->
refreshed_source = Sources.refresh_source_metrics(source)

metrics
|> OtelMetricExporter.Protocol.build_metric_service_request(config.resource)
|> Protobuf.encode()
|> Protobuf.decode(ExportMetricsServiceRequest)
|> Map.get(:resource_metrics)
|> Processor.ingest(Logs.OtelMetric, refreshed_source)
defp exporter_callback({:metrics, metrics}, config) do
metrics
|> OtelMetricExporter.Protocol.build_metric_service_request(config.resource)
|> Protobuf.encode()
|> Protobuf.decode(ExportMetricsServiceRequest)
|> Map.get(:resource_metrics)
|> Logs.OtelMetric.handle_batch(%{})
|> Enum.group_by(fn event ->
event
|> Map.get("attributes")
|> Users.get_related_user_id()
end)
|> Enum.each(fn {user_id, user_events} ->
with %Sources.Source{} = source <-
Sources.Cache.get_by(user_id: user_id, system_source_type: :metrics) do
source =
source
|> Sources.Cache.preload_rules()
|> Sources.refresh_source_metrics()

Processor.ingest(user_events, Logs.Raw, source)
end
end)

:ok
end
:ok
end

@doc """
Expand All @@ -89,7 +114,7 @@ defmodule Logflare.Backends.UserMonitoring do
with %{meta: meta} <- log_event,
user_id when is_integer(user_id) <- Users.get_related_user_id(meta),
%{system_monitoring: true} <- Users.Cache.get(user_id),
%{} = source <- get_system_source(user_id) do
%{} = source <- get_system_source_logs(user_id) do
LogflareLogger.Formatter.format(
log_event.level,
format_message(log_event),
Expand All @@ -105,7 +130,7 @@ defmodule Logflare.Backends.UserMonitoring do
end
end

defp get_system_source(user_id),
defp get_system_source_logs(user_id),
do:
Sources.Cache.get_by(user_id: user_id, system_source_type: :logs)
|> Sources.refresh_source_metrics()
Expand Down
41 changes: 25 additions & 16 deletions lib/logflare/endpoints.ex
Original file line number Diff line number Diff line change
Expand Up @@ -328,22 +328,31 @@ defmodule Logflare.Endpoints do
[:logflare, :endpoints, :run_query, :exec_query_on_backend],
%{endpoint_id: endpoint_query.id, language: query_language},
fn ->
result =
exec_query_on_backend(
endpoint_query,
transformed_query,
declared_params,
params,
opts
)

total_rows =
case result do
{:ok, %{rows: rows}} -> length(rows)
_ -> 0
end

{result, %{total_rows: total_rows}}
exec_query_on_backend(
endpoint_query,
transformed_query,
declared_params,
params,
opts
)
|> then(fn
{:ok, data} = result ->
measurements = %{
total_bytes_processed: Map.get(data, :total_bytes_processed, 0)
}

metadata =
Map.merge(endpoint_query.parsed_labels || %{}, %{
"endpoint_id" => endpoint_query.id
})

:telemetry.execute([:logflare, :endpoints, :query], measurements, metadata)

{result, %{}}

result ->
{result, %{}}
end)
end
)
end
Expand Down
Loading
Loading