diff --git a/Cargo.lock b/Cargo.lock index 8eeb09ba1b5c..b7d8d9bb3f21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5191,6 +5191,9 @@ dependencies = [ "linera-base", "linera-kywasmtime", "linera-witty", + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", "port-selector", "prometheus", "proptest", @@ -5209,6 +5212,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "tracing-opentelemetry", "tracing-subscriber 0.3.19", "tracing-web", "trait-variant", @@ -5351,7 +5355,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", - "tonic", + "tonic 0.12.3", "tracing", "tracing-subscriber 0.3.19", "trait-set", @@ -5537,7 +5541,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-stream", - "tonic", + "tonic 0.12.3", "tonic-build", "tower-http 0.6.6", "tracing", @@ -5690,7 +5694,7 @@ dependencies = [ "thiserror 1.0.69", "tokio", "tokio-util", - "tonic", + "tonic 0.12.3", "tonic-build", "tonic-health", "tonic-reflection", @@ -5831,7 +5835,7 @@ dependencies = [ "tokio-stream", "tokio-util", "toml", - "tonic", + "tonic 0.12.3", "tonic-build", "tonic-health", "tonic-reflection", @@ -5887,6 +5891,7 @@ dependencies = [ "papaya", "prometheus", "serde", + "tracing", ] [[package]] @@ -5912,7 +5917,7 @@ dependencies = [ "test-strategy", "thiserror 1.0.69", "tokio", - "tonic", + "tonic 0.12.3", "tonic-build", "tracing", "tracing-subscriber 0.3.19", @@ -6884,6 +6889,82 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aaf416e4cb72756655126f7dd7bb0af49c674f4c1b9903e80c009e0c37e552e6" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.14", + "tracing", +] + +[[package]] +name = "opentelemetry-http" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f6639e842a97dbea8886e3439710ae463120091e2e064518ba8e716e6ac36d" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry", + "reqwest 0.12.23", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbee664a43e07615731afc539ca60c6d9f1a9425e25ca09c57bc36c87c55852b" +dependencies = [ + "http 1.3.1", + "opentelemetry", + "opentelemetry-http", + "opentelemetry-proto", + "opentelemetry_sdk", + "prost", + "reqwest 0.12.23", + "thiserror 2.0.14", + "tokio", + "tonic 0.13.1", + "tracing", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e046fd7660710fe5a05e8748e70d9058dc15c94ba914e7c4faa7c728f0e8ddc" +dependencies = [ + "opentelemetry", + "opentelemetry_sdk", + "prost", + "tonic 0.13.1", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11f644aa9e5e31d11896e024305d7e3c98a88884d9f8919dbf37a9991bc47a4b" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry", + "percent-encoding", + "rand 0.9.2", + "serde_json", + "thiserror 2.0.14", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -8035,7 +8116,9 @@ checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ "base64 0.22.1", "bytes", + "futures-channel", "futures-core", + "futures-util", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -10276,6 +10359,34 @@ dependencies = [ "webpki-roots 0.26.11", ] +[[package]] +name = "tonic" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7e581ba15a835f4d9ea06c55ab1bd4dce26fc53752c69a04aac00703bfb49ba9" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-timeout 0.5.2", + "hyper-util", + "percent-encoding", + "pin-project", + "prost", + "rustls-native-certs 0.8.1", + "tokio", + "tokio-rustls 0.26.2", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "tonic-build" version = "0.12.3" @@ -10300,7 +10411,7 @@ dependencies = [ "prost", "tokio", "tokio-stream", - "tonic", + "tonic 0.12.3", ] [[package]] @@ -10313,7 +10424,7 @@ dependencies = [ "prost-types", "tokio", "tokio-stream", - "tonic", + "tonic 0.12.3", ] [[package]] @@ -10329,7 +10440,7 @@ dependencies = [ "http-body-util", "pin-project", "tokio-stream", - "tonic", + "tonic 0.12.3", "tower-http 0.5.2", "tower-layer", "tower-service", @@ -10353,7 +10464,7 @@ dependencies = [ "js-sys", "pin-project", "thiserror 1.0.69", - "tonic", + "tonic 0.12.3", "tower-service", "wasm-bindgen", "wasm-bindgen-futures", @@ -10389,7 +10500,9 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.10.0", "pin-project-lite", + "slab", "sync_wrapper 1.0.2", "tokio", "tokio-util", @@ -10499,6 +10612,35 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddcf5959f39507d0d04d6413119c04f33b623f4f951ebcbdddddfad2d0623a9c" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.19", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 7114f247f480..1df68522ad9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -157,6 +157,14 @@ num-format = "0.4.4" num-traits = "0.2.18" octocrab = "0.42.1" oneshot = "0.1.6" +opentelemetry = { version = "0.30.0", features = ["trace"] } +opentelemetry-http = "0.30.0" +opentelemetry-otlp = { version = "0.30.0", features = [ + "grpc-tonic", + "trace", + "tls-roots", +] } +opentelemetry_sdk = { version = "0.30.0", features = ["trace", "rt-tokio"] } papaya = "0.1.5" pathdiff = "0.2.1" port-selector = "0.1.6" @@ -250,6 +258,7 @@ tonic-web-wasm-client = "0.6.0" tower = "0.4.13" tower-http = "0.6.6" tracing = { version = "0.1.40", features = ["release_max_level_debug"] } +tracing-opentelemetry = "0.31.0" tracing-subscriber = { version = "0.3.18", default-features = false, features = [ "env-filter", ] } diff --git a/docker/Dockerfile b/docker/Dockerfile index b233b01346c2..b479cf3cf5a7 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -25,7 +25,7 @@ ARG binaries= ARG copy=${binaries:+_copy} ARG build_flag=--release ARG build_folder=release -ARG build_features=scylladb,metrics,memory-profiling +ARG build_features=scylladb,metrics,memory-profiling,tempo ARG rustflags="-C force-frame-pointers=yes" FROM rust:1.74-slim-bookworm AS builder diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 09c86cf1e7a4..9e141c6907f8 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -3901,6 +3901,7 @@ dependencies = [ "papaya", "prometheus", "serde", + "tracing", ] [[package]] diff --git a/kubernetes/linera-validator/Chart.lock b/kubernetes/linera-validator/Chart.lock index c4d2e00ef72c..f0f67683ccaa 100644 --- a/kubernetes/linera-validator/Chart.lock +++ b/kubernetes/linera-validator/Chart.lock @@ -5,5 +5,8 @@ dependencies: - name: loki-stack repository: https://grafana.github.io/helm-charts version: 2.8.9 -digest: sha256:da3a8808fb9479bdd183b307e70d7855794739cafc1796047bde8febc78d539c -generated: "2023-11-30T17:19:14.544457977Z" +- name: pyroscope + repository: https://grafana.github.io/helm-charts + version: 1.14.2 +digest: sha256:7fe611b57ddb6d72aa31bac87568fdb8e531e988e2ce4067b931d3026332f027 +generated: "2025-09-01T16:56:59.19795-03:00" diff --git a/kubernetes/linera-validator/Chart.yaml b/kubernetes/linera-validator/Chart.yaml index 9ad89e557b26..301d59f669c4 100644 --- a/kubernetes/linera-validator/Chart.yaml +++ b/kubernetes/linera-validator/Chart.yaml @@ -32,3 +32,7 @@ dependencies: - name: loki-stack version: "2.8.9" repository: "https://grafana.github.io/helm-charts" + + - name: pyroscope + version: "1.14.2" + repository: "https://grafana.github.io/helm-charts" diff --git a/kubernetes/linera-validator/charts/pyroscope-1.14.2.tgz b/kubernetes/linera-validator/charts/pyroscope-1.14.2.tgz new file mode 100644 index 000000000000..7d2b8fa12c8e Binary files /dev/null and b/kubernetes/linera-validator/charts/pyroscope-1.14.2.tgz differ diff --git a/kubernetes/linera-validator/grafana-dashboards/profiling/cpu.json b/kubernetes/linera-validator/grafana-dashboards/profiling/cpu.json new file mode 100644 index 000000000000..018ccd0442c2 --- /dev/null +++ b/kubernetes/linera-validator/grafana-dashboards/profiling/cpu.json @@ -0,0 +1,432 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "P02E4190217B50628" + }, + "gridPos": { + "h": 12, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 1, + "targets": [ + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "P02E4190217B50628" + }, + "groupBy": [], + "labelSelector": "{service_name=\"ebpf/default/proxy/\"}", + "profileTypeId": "process_cpu:cpu:nanoseconds:cpu:nanoseconds", + "queryType": "profile", + "refId": "A" + } + ], + "title": "CPU Profile Flamegraph - Proxies", + "type": "flamegraph" + }, + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "P02E4190217B50628" + }, + "gridPos": { + "h": 12, + "w": 24, + "x": 0, + "y": 12 + }, + "id": 4, + "targets": [ + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "P02E4190217B50628" + }, + "groupBy": [], + "labelSelector": "{service_name=\"ebpf/default/shards/\"}", + "profileTypeId": "process_cpu:cpu:nanoseconds:cpu:nanoseconds", + "queryType": "profile", + "refId": "A" + } + ], + "title": "CPU Profile Flamegraph - Shards", + "type": "flamegraph" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 24 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "rate(pyroscope_distributor_received_samples_count[5m])", + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Profile Ingestion Rate", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 24 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "disableTextWrap": false, + "editorMode": "code", + "expr": "rate(pyroscope_distributor_received_decompressed_bytes_sum[5m])", + "fullMetaSearch": false, + "includeNullMetadata": true, + "legendFormat": "Bytes/sec", + "range": true, + "refId": "A", + "useBackend": false + } + ], + "title": "Profile Data Volume", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "bytes" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 0, + "y": 30 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "pyroscope_distributor_received_compressed_bytes_sum - pyroscope_distributor_received_compressed_bytes_sum offset 1h", + "legendFormat": "Compressed Data (1h)", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "pyroscope_distributor_received_decompressed_bytes_sum - pyroscope_distributor_received_decompressed_bytes_sum offset 1h", + "legendFormat": "Decompressed Data (1h)", + "range": true, + "refId": "B" + } + ], + "title": "Data Compression Efficiency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "yellow", + "value": 1000 + }, + { + "color": "red", + "value": 5000 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 12, + "x": 12, + "y": 30 + }, + "id": 6, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "pluginVersion": "10.1.1", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": "pyroscope_distributor_ingester_clients", + "range": true, + "refId": "A" + } + ], + "title": "Active Ingester Clients", + "type": "stat" + } + ], + "refresh": "30s", + "schemaVersion": 38, + "style": "dark", + "tags": [ + "linera", + "profiling", + "cpu" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "CPU", + "uid": "ee74af8d-0448-45cd-bdbc-f9108b3378b7", + "version": 2, + "weekStart": "" +} diff --git a/kubernetes/linera-validator/grafana-dashboards/profiling/jemalloc-memory.json b/kubernetes/linera-validator/grafana-dashboards/profiling/jemalloc-memory.json new file mode 100644 index 000000000000..c67f579228d0 --- /dev/null +++ b/kubernetes/linera-validator/grafana-dashboards/profiling/jemalloc-memory.json @@ -0,0 +1,146 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "datasource", + "uid": "grafana" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "links": [], + "liveNow": false, + "panels": [ + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "P02E4190217B50628" + }, + "gridPos": { + "h": 12, + "w": 24, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "displayMode": "flamegraph" + }, + "targets": [ + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "P02E4190217B50628" + }, + "groupBy": [], + "labelSelector": "{service_name=\"memory/default/shards/\"}", + "profileTypeId": "memory:inuse_space:bytes:space:bytes", + "queryType": "profile", + "refId": "A" + } + ], + "title": "Memory Allocation Flamegraph - Server", + "type": "flamegraph" + }, + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "${datasource}" + }, + "gridPos": { + "h": 12, + "w": 24, + "x": 0, + "y": 12 + }, + "id": 2, + "options": { + "displayMode": "flamegraph" + }, + "targets": [ + { + "datasource": { + "type": "grafana-pyroscope-datasource", + "uid": "${datasource}" + }, + "groupBy": [], + "labelSelector": "{service_name=\"memory/default/proxy/\"}", + "profileTypeId": "memory:inuse_space:bytes:space:bytes", + "queryType": "profile", + "refId": "A" + } + ], + "title": "Memory Allocation Flamegraph - Proxy", + "type": "flamegraph" + } + ], + "refresh": "30s", + "schemaVersion": 38, + "style": "dark", + "tags": [ + "linera", + "memory", + "profiling" + ], + "templating": { + "list": [ + { + "current": { + "selected": false, + "text": "Pyroscope", + "value": "P02E4190217B50628" + }, + "hide": 0, + "includeAll": false, + "label": "Pyroscope Datasource", + "multi": false, + "name": "datasource", + "options": [], + "query": "grafana-pyroscope-datasource", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + }, + { + "current": { + "selected": false, + "text": "Prometheus", + "value": "prometheus" + }, + "hide": 0, + "includeAll": false, + "label": "Prometheus Datasource", + "multi": false, + "name": "prometheus_datasource", + "options": [], + "query": "prometheus", + "queryValue": "", + "refresh": 1, + "regex": "", + "skipUrlSync": false, + "type": "datasource" + } + ] + }, + "time": { + "from": "now-1h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Memory", + "uid": "linera-memory-profiling", + "version": 1, + "weekStart": "" +} diff --git a/kubernetes/linera-validator/helmfile.yaml b/kubernetes/linera-validator/helmfile.yaml index beeb057951eb..83f8fa7620c7 100644 --- a/kubernetes/linera-validator/helmfile.yaml +++ b/kubernetes/linera-validator/helmfile.yaml @@ -154,3 +154,10 @@ releases: set: - name: crds.enabled value: "true" + - name: tempo + version: 1.23.3 + namespace: tempo + chart: grafana/tempo + timeout: 900 + values: + - {{ env "LINERA_HELMFILE_VALUES_LINERA_CORE" | default "values-local.yaml.gotmpl" }} diff --git a/kubernetes/linera-validator/templates/grafana-pyroscope-dashboard-config.yaml b/kubernetes/linera-validator/templates/grafana-pyroscope-dashboard-config.yaml new file mode 100644 index 000000000000..b553a4d713e3 --- /dev/null +++ b/kubernetes/linera-validator/templates/grafana-pyroscope-dashboard-config.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: grafana-pyroscope-dashboard-config + labels: + grafana_dashboard: "1" + annotations: + grafana_folder: "Profiling" +data: + cpu_profiling.json: {{ .Files.Get "grafana-dashboards/profiling/cpu.json" | quote }} + memory_profiling.json: {{ .Files.Get "grafana-dashboards/profiling/jemalloc-memory.json" | quote }} \ No newline at end of file diff --git a/linera-base/Cargo.toml b/linera-base/Cargo.toml index 3293bca140ee..dcdd9cd97088 100644 --- a/linera-base/Cargo.toml +++ b/linera-base/Cargo.toml @@ -18,6 +18,12 @@ workspace = true metrics = ["prometheus"] reqwest = ["dep:reqwest"] revm = [] +tempo = [ + "opentelemetry", + "opentelemetry-otlp", + "opentelemetry_sdk", + "tracing-opentelemetry", +] test = ["test-strategy", "proptest"] web = [ "getrandom/js", @@ -80,6 +86,10 @@ tracing-web = { optional = true, workspace = true } [target.'cfg(not(target_arch = "wasm32"))'.dependencies] chrono.workspace = true +opentelemetry = { workspace = true, optional = true } +opentelemetry-otlp = { workspace = true, optional = true } +opentelemetry_sdk = { workspace = true, optional = true } +tracing-opentelemetry = { workspace = true, optional = true } rand = { workspace = true, features = ["getrandom", "std", "std_rng"] } tokio = { workspace = true, features = [ "process", diff --git a/linera-base/src/tracing.rs b/linera-base/src/tracing.rs index 25358caa7986..09811bb0f8eb 100644 --- a/linera-base/src/tracing.rs +++ b/linera-base/src/tracing.rs @@ -12,6 +12,8 @@ use std::{ use is_terminal::IsTerminal as _; use tracing::Subscriber; +#[cfg(all(not(target_arch = "wasm32"), feature = "tempo"))] +use tracing_subscriber::filter::{filter_fn, FilterExt as _}; use tracing_subscriber::{ fmt::{ self, @@ -23,6 +25,16 @@ use tracing_subscriber::{ registry::LookupSpan, util::SubscriberInitExt, }; +#[cfg(all(not(target_arch = "wasm32"), feature = "tempo"))] +use { + opentelemetry::{global, trace::TracerProvider}, + opentelemetry_otlp::{SpanExporter, WithExportConfig}, + opentelemetry_sdk::{ + trace::{self as sdktrace, SdkTracerProvider}, + Resource, + }, + tracing_opentelemetry::OpenTelemetryLayer, +}; /// Initializes tracing in a standard way. /// @@ -34,6 +46,64 @@ use tracing_subscriber::{ /// store log files. If it is set, a file named `log_name` with the `log` extension is /// created in the directory. pub fn init(log_name: &str) { + init_internal(log_name, false); +} + +/// Initializes tracing with full OpenTelemetry support. +/// +/// **IMPORTANT**: This function must be called from within a Tokio runtime context +/// as it initializes OpenTelemetry background tasks for span batching and export. +/// +/// This sets up complete tracing with OpenTelemetry integration, including the +/// OpenTelemetry layer in the subscriber to export spans to Tempo. +/// +/// ## Span Filtering for Performance +/// +/// By default, spans created by `#[instrument]` are logged to console AND sent +/// to OpenTelemetry. In order to not spam stderr, you can set a low level, and use the +/// `telemetry_only` target: +/// +/// ```rust +/// use tracing::{instrument, Level}; +/// +/// // Always sent to telemetry; console output controlled by level +/// #[instrument(level = "trace", target = "telemetry_only")] +/// fn my_called_too_frequently_function() { +/// // Will be sent to OpenTelemetry regardless of RUST_LOG level +/// // Will only appear in console if RUST_LOG includes trace level +/// } +/// +/// // Higher level - more likely to appear in console +/// #[instrument(level = "info", target = "telemetry_only")] +/// fn my_important_function() { +/// // Will be sent to OpenTelemetry regardless of RUST_LOG level +/// // Will appear in console if RUST_LOG includes info level or higher +/// } +/// ``` +/// +/// **Key behaviors:** +/// - If span level >= RUST_LOG level: span goes to BOTH telemetry AND console (regardless of target) +/// - If span level < RUST_LOG level AND target = "telemetry_only": span goes to telemetry ONLY +/// - If span level < RUST_LOG level AND target != "telemetry_only": span is filtered out completely +/// - Default level for `telemetry_only` should be `trace` for minimal console noise +/// - All explicit log calls (tracing::info!(), etc.) are always printed regardless of span filtering +pub async fn init_with_opentelemetry(log_name: &str) { + #[cfg(feature = "tempo")] + { + init_internal(log_name, true); + } + + #[cfg(not(feature = "tempo"))] + { + tracing::warn!( + "OpenTelemetry initialization requested but 'tempo' feature is not enabled. \ + Initializing standard tracing without OpenTelemetry support." + ); + init_internal(log_name, false); + } +} + +fn init_internal(log_name: &str, with_opentelemetry: bool) { let env_filter = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) .from_env_lossy(); @@ -43,7 +113,6 @@ pub fn init(log_name: &str) { .map_or(FmtSpan::NONE, |s| fmt_span_from_str(&s)); let format = std::env::var("RUST_LOG_FORMAT").ok(); - let color_output = !std::env::var("NO_COLOR").is_ok_and(|x| !x.is_empty()) && std::io::stderr().is_terminal(); @@ -65,11 +134,70 @@ pub fn init(log_name: &str) { ) }); - tracing_subscriber::registry() - .with(env_filter) - .with(maybe_log_file_layer) - .with(stderr_layer) - .init(); + #[cfg(any(target_arch = "wasm32", not(feature = "tempo")))] + { + let _ = with_opentelemetry; + tracing_subscriber::registry() + .with(env_filter) + .with(maybe_log_file_layer) + .with(stderr_layer) + .init(); + } + + #[cfg(all(not(target_arch = "wasm32"), feature = "tempo"))] + { + if with_opentelemetry { + // Initialize OpenTelemetry within async context + let otlp_endpoint = std::env::var("OTEL_EXPORTER_OTLP_ENDPOINT") + .unwrap_or_else(|_| "http://tempo.tempo.svc.cluster.local:4317".to_string()); + + let exporter = SpanExporter::builder() + .with_tonic() + .with_endpoint(otlp_endpoint) + .build() + .expect("Failed to create OTLP exporter"); + + let resource = Resource::builder() + .with_service_name(log_name.to_string()) + .build(); + + let tracer_provider = SdkTracerProvider::builder() + .with_resource(resource) + .with_batch_exporter(exporter) + .with_sampler(sdktrace::Sampler::AlwaysOn) + .build(); + + // Set the global tracer provider + global::set_tracer_provider(tracer_provider.clone()); + + let tracer = tracer_provider.tracer("linera"); + + let telemetry_only_filter = + filter_fn(|metadata| metadata.is_span() && metadata.target() == "telemetry_only"); + + let otel_env_filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + .from_env_lossy(); + + let opentelemetry_filter = otel_env_filter.or(telemetry_only_filter); + + let opentelemetry_layer = + OpenTelemetryLayer::new(tracer).with_filter(opentelemetry_filter); + + tracing_subscriber::registry() + .with(env_filter) + .with(maybe_log_file_layer) + .with(stderr_layer) + .with(opentelemetry_layer) + .init(); + } else { + tracing_subscriber::registry() + .with(env_filter) + .with(maybe_log_file_layer) + .with(stderr_layer) + .init(); + } + } } /// Opens a log file for writing. diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index a644e13cb73a..ca56d67924a3 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -35,6 +35,7 @@ use linera_views::{ views::{ClonableView, CryptoHashView, RootView, View}, }; use serde::{Deserialize, Serialize}; +use tracing::instrument; use crate::{ block::{Block, ConfirmedBlock}, @@ -388,6 +389,9 @@ where self.context().extra().chain_id() } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + ))] pub async fn query_application( &mut self, local_time: Timestamp, @@ -405,6 +409,10 @@ where .with_execution_context(ChainExecutionContext::Query) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + application_id = %application_id + ))] pub async fn describe_application( &mut self, application_id: ApplicationId, @@ -416,6 +424,11 @@ where .with_execution_context(ChainExecutionContext::DescribeApplication) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + target = %target, + height = %height + ))] pub async fn mark_messages_as_received( &mut self, target: &ChainId, @@ -503,6 +516,9 @@ where /// Verifies that this chain is up-to-date and all the messages executed ahead of time /// have been properly received by now. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> { let chain_id = self.chain_id(); let pairs = self.inboxes.try_load_all_entries().await?; @@ -564,6 +580,11 @@ where /// round timeouts. /// /// Returns `true` if incoming `Subscribe` messages created new outbox entries. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + origin = %origin, + bundle_height = %bundle.height + ))] pub async fn receive_message_bundle( &mut self, origin: &ChainId, @@ -658,6 +679,9 @@ where } /// Removes the incoming message bundles in the block from the inboxes. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + ))] pub async fn remove_bundles_from_inboxes( &mut self, timestamp: Timestamp, @@ -747,6 +771,10 @@ where /// Executes a block: first the incoming messages, then the main operation. /// Does not update chain state other than the execution state. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %block.chain_id, + block_height = %block.height + ))] #[expect(clippy::too_many_arguments)] async fn execute_block_inner( chain: &mut ExecutionStateView, @@ -856,6 +884,10 @@ where /// Executes a block: first the incoming messages, then the main operation. /// Does not update chain state other than the execution state. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.height + ))] pub async fn execute_block( &mut self, block: &ProposedBlock, @@ -915,6 +947,11 @@ where /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain /// manager. This does not touch the execution state itself, which must be updated separately. + /// Returns the set of event streams that were updated as a result of applying the block. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.inner().inner().header.height + ))] pub async fn apply_confirmed_block( &mut self, block: &ConfirmedBlock, @@ -947,6 +984,10 @@ where } /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.inner().inner().header.height + ))] pub async fn preprocess_block(&mut self, block: &ConfirmedBlock) -> Result<(), ChainError> { let hash = block.inner().hash(); let block = block.inner().inner(); @@ -969,6 +1010,10 @@ where } /// Verifies that the block is valid according to the chain's application permission settings. + #[instrument(target = "telemetry_only", skip_all, fields( + block_height = %block.height, + num_transactions = %block.transactions.len() + ))] fn check_app_permissions( app_permissions: &ApplicationPermissions, block: &ProposedBlock, @@ -1011,6 +1056,10 @@ where } /// Returns the hashes of all blocks we have in the given range. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + next_block_height = %self.tip_state.get().next_block_height + ))] pub async fn block_hashes( &self, range: impl RangeBounds, @@ -1056,6 +1105,10 @@ where /// Updates the outboxes with the messages sent in the block. /// /// Returns the set of all recipients. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.header.height + ))] async fn process_outgoing_messages( &mut self, block: &Block, diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index 83c196954021..0170aabe6abb 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -244,7 +244,7 @@ where /// Runs the worker until there are no more incoming requests. #[instrument( skip_all, - fields(chain_id = format!("{:.8}", self.chain_id)), + fields(chain_id = format!("{:.8}", self.chain_id), long_lived_services = %self.config.long_lived_services), )] async fn handle_requests( self, @@ -276,9 +276,12 @@ where self.chain_id, service_runtime_endpoint, ) + .instrument(span.clone()) .await?; - Box::pin(worker.handle_request(request).instrument(span)).await; + Box::pin(worker.handle_request(request)) + .instrument(span) + .await; loop { futures::select! { @@ -287,7 +290,7 @@ where let Some((request, span)) = maybe_request else { break; // Request sender was dropped. }; - Box::pin(worker.handle_request(request).instrument(span)).await; + Box::pin(worker.handle_request(request)).instrument(span).await; } } } diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index 0d0ba688011f..36a18ade8fd7 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -70,6 +70,9 @@ where StorageClient: Storage + Clone + Send + Sync + 'static, { /// Creates a new [`ChainWorkerState`] using the provided `storage` client. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %chain_id + ))] #[expect(clippy::too_many_arguments)] pub async fn load( config: ChainWorkerConfig, @@ -103,7 +106,7 @@ where } /// Handles a request and applies it to the chain state. - #[instrument(skip_all)] + #[instrument(skip_all, fields(chain_id = %self.chain_id()))] pub async fn handle_request(&mut self, request: ChainWorkerRequest) { tracing::trace!("Handling chain worker request: {request:?}"); // TODO(#2237): Spawn concurrent tasks for read-only operations @@ -264,6 +267,10 @@ where } /// Returns the requested blob, if it belongs to the current locking block or pending proposal. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + blob_id = %blob_id + ))] pub(super) async fn download_pending_blob(&self, blob_id: BlobId) -> Result { if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? { return Ok(blob); @@ -274,6 +281,9 @@ where /// Reads the blobs from the chain manager or from storage. Returns an error if any are /// missing. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn get_required_blobs( &self, required_blob_ids: impl IntoIterator, @@ -294,6 +304,9 @@ where } /// Tries to read the blobs from the chain manager or storage. Returns `None` if not found. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn maybe_get_required_blobs( &self, blob_ids: impl IntoIterator, @@ -362,6 +375,9 @@ where } /// Loads pending cross-chain requests, and adds `NewRound` notifications where appropriate. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn create_network_actions( &self, old_round: Option, @@ -399,6 +415,10 @@ where }) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + num_recipients = %heights_by_recipient.len() + ))] async fn create_cross_chain_requests( &self, heights_by_recipient: BTreeMap>, @@ -470,6 +490,10 @@ where /// Returns true if there are no more outgoing messages in flight up to the given /// block height. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %height + ))] pub async fn all_messages_to_tracked_chains_delivered_up_to( &self, height: BlockHeight, @@ -496,6 +520,10 @@ where } /// Processes a leader timeout issued for this multi-owner chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %certificate.inner().height() + ))] pub(super) async fn process_timeout( &mut self, certificate: TimeoutCertificate, @@ -534,6 +562,10 @@ where /// /// If they cannot be found, it creates an entry in `pending_proposed_blobs` so they can be /// submitted one by one. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %proposal.content.block.height + ))] pub(super) async fn load_proposal_blobs( &mut self, proposal: &BlockProposal, @@ -578,6 +610,10 @@ where } /// Processes a validated block issued for this multi-owner chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %certificate.block().header.height + ))] pub(super) async fn process_validated_block( &mut self, certificate: ValidatedBlockCertificate, @@ -635,6 +671,11 @@ where } /// Processes a confirmed block (aka a commit). + #[instrument(skip_all, fields( + chain_id = %certificate.block().header.chain_id, + height = %certificate.block().header.height, + block_hash = %certificate.hash(), + ))] pub(super) async fn process_confirmed_block( &mut self, certificate: ConfirmedBlockCertificate, @@ -802,6 +843,7 @@ where computed: Box::new(verified_outcome), } ); + // Update the rest of the chain state. chain .apply_confirmed_block(certificate.value(), local_time) @@ -866,7 +908,7 @@ where } /// Updates the chain's inboxes, receiving messages from a cross-chain update. - #[instrument(level = "trace", skip(self, bundles))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, bundles))] pub(super) async fn process_cross_chain_update( &mut self, origin: ChainId, @@ -915,6 +957,11 @@ where } /// Handles the cross-chain request confirming that the recipient was updated. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + recipient = %recipient, + latest_height = %latest_height + ))] pub(super) async fn confirm_updated_recipient( &mut self, recipient: ChainId, @@ -937,6 +984,10 @@ where Ok(()) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + num_trackers = %new_trackers.len() + ))] pub async fn update_received_certificate_trackers( &mut self, new_trackers: BTreeMap, @@ -948,6 +999,11 @@ where } /// Attempts to vote for a leader timeout, if possible. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %height, + round = %round + ))] pub(super) async fn vote_for_leader_timeout( &mut self, height: BlockHeight, @@ -975,6 +1031,9 @@ where } /// Votes for falling back to a public chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] pub(super) async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> { let chain = &mut self.chain; if let (epoch, Some(entry)) = ( @@ -997,6 +1056,10 @@ where Ok(()) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + blob_id = %blob.id() + ))] pub(super) async fn handle_pending_blob( &mut self, blob: Blob, @@ -1033,6 +1096,10 @@ where /// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`]. #[cfg(with_testing)] + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %height + ))] pub(super) async fn read_certificate( &mut self, height: BlockHeight, @@ -1051,6 +1118,10 @@ where } /// Queries an application's state on the chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + query_application_id = %query.application_id() + ))] pub(super) async fn query_application( &mut self, query: Query, @@ -1065,6 +1136,10 @@ where } /// Returns an application's description. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + application_id = %application_id + ))] pub(super) async fn describe_application( &mut self, application_id: ApplicationId, @@ -1075,6 +1150,10 @@ where } /// Executes a block without persisting any changes to the state. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.height + ))] pub(super) async fn stage_block_execution( &mut self, block: ProposedBlock, @@ -1107,6 +1186,10 @@ where } /// Validates and executes a block proposed to extend this chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %proposal.content.block.height + ))] pub(super) async fn handle_block_proposal( &mut self, proposal: BlockProposal, @@ -1250,6 +1333,9 @@ where } /// Prepares a [`ChainInfoResponse`] for a [`ChainInfoQuery`]. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] pub(super) async fn prepare_chain_info_response( &mut self, query: ChainInfoQuery, @@ -1315,6 +1401,10 @@ where } /// Executes a block, caches the result, and returns the outcome. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.height + ))] async fn execute_block( &mut self, block: &ProposedBlock, @@ -1356,6 +1446,9 @@ where /// Stores the chain state in persistent storage. /// /// Waits until the [`ChainStateView`] is no longer shared before persisting the changes. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn save(&mut self) -> Result<(), WorkerError> { self.clear_shared_chain_view().await; self.chain.save().await?; diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 2bf504e81723..5e5b66d868c9 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -457,7 +457,7 @@ where .ok_or(WorkerError::InvalidLiteCertificate)?, )) } - _ => return Err(WorkerError::InvalidLiteCertificate), + _ => Err(WorkerError::InvalidLiteCertificate), } } } @@ -550,7 +550,11 @@ where } /// Executes a [`Query`] for an application's state on a specific chain. - #[instrument(level = "trace", skip(self, chain_id, query))] + #[instrument( + level = "trace", + target = "telemetry_only", + skip(self, chain_id, query) + )] pub async fn query_application( &self, chain_id: ChainId, @@ -562,7 +566,11 @@ where .await } - #[instrument(level = "trace", skip(self, chain_id, application_id))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, chain_id, application_id), fields( + nickname = %self.nickname, + chain_id = %chain_id, + application_id = %application_id + ))] pub async fn describe_application( &self, chain_id: ChainId, @@ -580,7 +588,13 @@ where /// Processes a confirmed block (aka a commit). #[instrument( level = "trace", - skip(self, certificate, notify_when_messages_are_delivered) + target = "telemetry_only", + skip(self, certificate, notify_when_messages_are_delivered), + fields( + nickname = %self.nickname, + chain_id = %certificate.block().header.chain_id, + block_height = %certificate.block().header.height + ) )] async fn process_confirmed_block( &self, @@ -599,7 +613,11 @@ where } /// Processes a validated block issued from a multi-owner chain. - #[instrument(level = "trace", skip(self, certificate))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, certificate), fields( + nickname = %self.nickname, + chain_id = %certificate.block().header.chain_id, + block_height = %certificate.block().header.height + ))] async fn process_validated_block( &self, certificate: ValidatedBlockCertificate, @@ -615,7 +633,11 @@ where } /// Processes a leader timeout issued from a multi-owner chain. - #[instrument(level = "trace", skip(self, certificate))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, certificate), fields( + nickname = %self.nickname, + chain_id = %certificate.value().chain_id(), + height = %certificate.value().height() + ))] async fn process_timeout( &self, certificate: TimeoutCertificate, @@ -630,7 +652,12 @@ where .await } - #[instrument(level = "trace", skip(self, origin, recipient, bundles))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, origin, recipient, bundles), fields( + nickname = %self.nickname, + origin = %origin, + recipient = %recipient, + num_bundles = %bundles.len() + ))] async fn process_cross_chain_update( &self, origin: ChainId, @@ -648,7 +675,11 @@ where } /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block. - #[instrument(level = "trace", skip(self, chain_id, height))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, chain_id, height), fields( + nickname = %self.nickname, + chain_id = %chain_id, + height = %height + ))] #[cfg(with_testing)] pub async fn read_certificate( &self, @@ -666,7 +697,10 @@ where /// /// The returned view holds a lock on the chain state, which prevents the worker from changing /// the state of that chain. - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] pub async fn chain_state_view( &self, chain_id: ChainId, @@ -677,7 +711,10 @@ where .await } - #[instrument(level = "trace", skip(self, request_builder))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, request_builder), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. async fn query_chain_worker( &self, @@ -700,7 +737,10 @@ where /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it /// to the cache if needed. - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] async fn get_chain_worker_endpoint( &self, chain_id: ChainId, @@ -749,7 +789,10 @@ where /// and add it to the cache if needed. /// /// Returns [`None`] if the cache is full and no candidate for eviction was found. - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] #[expect(clippy::type_complexity)] fn try_get_chain_worker_endpoint( &self, @@ -1062,6 +1105,11 @@ where } /// Updates the received certificate trackers to at least the given values. + #[instrument(target = "telemetry_only", skip_all, fields( + nickname = %self.nickname, + chain_id = %chain_id, + num_trackers = %new_trackers.len() + ))] pub async fn update_received_certificate_trackers( &self, chain_id: ChainId, diff --git a/linera-service/src/proxy/grpc.rs b/linera-service/src/proxy/grpc.rs index 9663e5c811ef..03aca53202b9 100644 --- a/linera-service/src/proxy/grpc.rs +++ b/linera-service/src/proxy/grpc.rs @@ -302,6 +302,7 @@ where } #[allow(clippy::result_large_err)] + #[instrument(target = "telemetry_only", skip_all, fields(remote_addr = ?request.remote_addr(), chain_id = ?request.get_ref().chain_id()))] fn worker_client( &self, request: Request, @@ -373,7 +374,12 @@ where { type SubscribeStream = UnboundedReceiverStream>; - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_block_proposal") + )] async fn handle_block_proposal( &self, request: Request, @@ -385,7 +391,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_lite_certificate") + )] async fn handle_lite_certificate( &self, request: Request, @@ -397,7 +408,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_confirmed_certificate") + )] async fn handle_confirmed_certificate( &self, request: Request, @@ -409,7 +425,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_validated_certificate") + )] async fn handle_validated_certificate( &self, request: Request, @@ -421,7 +442,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_timeout_certificate") + )] async fn handle_timeout_certificate( &self, request: Request, @@ -433,7 +459,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_chain_info_query") + )] async fn handle_chain_info_query( &self, request: Request, @@ -445,7 +476,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "subscribe") + )] async fn subscribe( &self, request: Request, @@ -474,7 +510,12 @@ where Ok(Response::new(linera_version::VersionInfo::default().into())) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "get_network_description") + )] async fn get_network_description( &self, _request: Request<()>, @@ -489,7 +530,12 @@ where Ok(Response::new(description.into())) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "upload_blob") + )] async fn upload_blob(&self, request: Request) -> Result, Status> { let content: linera_sdk::linera_base_types::BlobContent = request.into_inner().try_into()?; @@ -502,7 +548,12 @@ where Ok(Response::new(id.try_into()?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_blob") + )] async fn download_blob( &self, request: Request, @@ -518,7 +569,12 @@ where Ok(Response::new(blob.into_content().try_into()?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_pending_blob") + )] async fn download_pending_blob( &self, request: Request, @@ -543,7 +599,12 @@ where } } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_pending_blob") + )] async fn handle_pending_blob( &self, request: Request, @@ -568,7 +629,12 @@ where } } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_certificate") + )] async fn download_certificate( &self, request: Request, @@ -585,7 +651,12 @@ where Ok(Response::new(certificate.try_into()?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_certificates") + )] async fn download_certificates( &self, request: Request, @@ -631,7 +702,12 @@ where )?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_certificates_by_heights") + )] async fn download_certificates_by_heights( &self, request: Request, @@ -746,7 +822,9 @@ where })) } - #[instrument(skip_all, err(level = Level::WARN))] + #[instrument(target = "telemetry_only", skip_all, err(level = Level::WARN), fields( + method = "blob_last_used_by" + ))] async fn blob_last_used_by( &self, request: Request, @@ -766,7 +844,7 @@ where Ok(Response::new(last_used_by.into())) } - #[instrument(skip_all, err(level = Level::WARN))] + #[instrument(target = "telemetry_only", skip_all, err(level = Level::WARN))] async fn missing_blob_ids( &self, request: Request, @@ -781,7 +859,9 @@ where Ok(Response::new(missing_blob_ids.try_into()?)) } - #[instrument(skip_all, err(level = Level::WARN))] + #[instrument(target = "telemetry_only", skip_all, err(level = Level::WARN), fields( + method = "blob_last_used_by_certificate" + ))] async fn blob_last_used_by_certificate( &self, request: Request, @@ -797,7 +877,12 @@ impl NotifierService for GrpcProxy where S: Storage + Clone + Send + Sync + 'static, { - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "notify") + )] async fn notify(&self, request: Request) -> Result, Status> { let notification = request.into_inner(); let chain_id = notification diff --git a/linera-service/src/proxy/main.rs b/linera-service/src/proxy/main.rs index 825bfebcd956..0c0a54148068 100644 --- a/linera-service/src/proxy/main.rs +++ b/linera-service/src/proxy/main.rs @@ -459,11 +459,6 @@ where fn main() -> Result<()> { let options = ::parse(); - let server_config: ValidatorServerConfig = - util::read_json(&options.config_path).expect("Fail to read server config"); - let public_key = &server_config.validator.public_key; - - linera_base::tracing::init(&format!("validator-{public_key}-proxy")); let mut runtime = if options.tokio_threads == Some(1) { tokio::runtime::Builder::new_current_thread() @@ -486,6 +481,12 @@ fn main() -> Result<()> { impl ProxyOptions { async fn run(&self) -> Result<()> { + let server_config: ValidatorServerConfig = + util::read_json(&self.config_path).expect("Fail to read server config"); + let public_key = &server_config.validator.public_key; + linera_base::tracing::init_with_opentelemetry(&format!("validator-{public_key}-proxy")) + .await; + let store_config = self .storage_config .add_common_storage_options(&self.common_storage_options)?; diff --git a/linera-service/src/server.rs b/linera-service/src/server.rs index fbccf7683aa5..f7641883bd42 100644 --- a/linera-service/src/server.rs +++ b/linera-service/src/server.rs @@ -432,8 +432,6 @@ enum ServerCommand { fn main() { let options = ::parse(); - linera_base::tracing::init(&log_file_name_for(&options.command)); - let mut runtime = if options.tokio_threads == Some(1) { tokio::runtime::Builder::new_current_thread() } else { @@ -481,6 +479,8 @@ fn log_file_name_for(command: &ServerCommand) -> Cow<'static, str> { } async fn run(options: ServerOptions) { + linera_base::tracing::init_with_opentelemetry(&log_file_name_for(&options.command)).await; + match options.command { ServerCommand::Run { server_config_path, diff --git a/linera-storage/Cargo.toml b/linera-storage/Cargo.toml index 04ff5dd63209..02c34f4ac613 100644 --- a/linera-storage/Cargo.toml +++ b/linera-storage/Cargo.toml @@ -45,6 +45,7 @@ linera-views.workspace = true papaya.workspace = true prometheus.workspace = true serde.workspace = true +tracing.workspace = true [dev-dependencies] anyhow.workspace = true diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index e34562657c44..5c0267556eb2 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -28,6 +28,7 @@ use linera_views::{ ViewError, }; use serde::{Deserialize, Serialize}; +use tracing::instrument; #[cfg(with_testing)] use { futures::channel::oneshot::{self, Receiver}, @@ -544,6 +545,7 @@ where &self.clock } + #[instrument(level = "trace", target = "telemetry_only", skip_all, fields(chain_id = %chain_id))] async fn load_chain( &self, chain_id: ChainId, @@ -563,6 +565,7 @@ where ChainStateView::load(context).await } + #[instrument(level = "trace", target = "telemetry_only", skip_all, fields(blob_id = %blob_id))] async fn contains_blob(&self, blob_id: BlobId) -> Result { let store = self.database.open_shared(&[])?; let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?; @@ -572,6 +575,7 @@ where Ok(test) } + #[instrument(target = "telemetry_only", skip_all, fields(blob_count = blob_ids.len()))] async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result, ViewError> { let store = self.database.open_shared(&[])?; let mut keys = Vec::new(); @@ -602,6 +606,7 @@ where Ok(test) } + #[instrument(target = "telemetry_only", skip_all, fields(hash = %hash))] async fn read_confirmed_block( &self, hash: CryptoHash, @@ -616,6 +621,7 @@ where Ok(value) } + #[instrument(target = "telemetry_only", skip_all, fields(blob_id = %blob_id))] async fn read_blob(&self, blob_id: BlobId) -> Result, ViewError> { let store = self.database.open_shared(&[])?; let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?; @@ -682,6 +688,7 @@ where Ok(blob_states) } + #[instrument(target = "telemetry_only", skip_all, fields(blob_id = %blob.id()))] async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> { let mut batch = Batch::new(); batch.add_blob(blob)?; @@ -996,6 +1003,7 @@ where Ok(()) } + #[instrument(target = "telemetry_only", skip_all, fields(batch_size = batch.key_value_bytes.len()))] async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> { if batch.key_value_bytes.is_empty() { return Ok(()); diff --git a/linera-views/src/backends/rocks_db.rs b/linera-views/src/backends/rocks_db.rs index ce9ca3172549..1d3dfcd18c94 100644 --- a/linera-views/src/backends/rocks_db.rs +++ b/linera-views/src/backends/rocks_db.rs @@ -14,7 +14,7 @@ use std::{ }; use linera_base::ensure; -use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle}; +use rocksdb::{BlockBasedOptions, Cache, DBCompactionStyle, SliceTransform}; use serde::{Deserialize, Serialize}; use sysinfo::{CpuRefreshKind, MemoryRefreshKind, RefreshKind, System}; use tempfile::TempDir; @@ -166,25 +166,38 @@ impl RocksDbStoreExecutor { Ok(entries.into_iter().collect::>()?) } + fn get_find_prefix_iterator(&self, prefix: &[u8]) -> rocksdb::DBRawIteratorWithThreadMode { + // Configure ReadOptions optimized for SSDs and iterator performance + let mut read_opts = rocksdb::ReadOptions::default(); + // Enable async I/O for better concurrency + read_opts.set_async_io(true); + + // Set precise upper bound to minimize key traversal + let upper_bound = get_upper_bound_option(prefix); + if let Some(upper_bound) = upper_bound { + read_opts.set_iterate_upper_bound(upper_bound); + } + + let mut iter = self.db.raw_iterator_opt(read_opts); + iter.seek(prefix); + iter + } + fn find_keys_by_prefix_internal( &self, key_prefix: Vec, ) -> Result>, RocksDbStoreInternalError> { check_key_size(&key_prefix)?; + let mut prefix = self.start_key.clone(); prefix.extend(key_prefix); let len = prefix.len(); - let mut iter = self.db.raw_iterator(); + + let mut iter = self.get_find_prefix_iterator(&prefix); let mut keys = Vec::new(); - iter.seek(&prefix); - let mut next_key = iter.key(); - while let Some(key) = next_key { - if !key.starts_with(&prefix) { - break; - } + while let Some(key) = iter.key() { keys.push(key[len..].to_vec()); iter.next(); - next_key = iter.key(); } Ok(keys) } @@ -198,20 +211,13 @@ impl RocksDbStoreExecutor { let mut prefix = self.start_key.clone(); prefix.extend(key_prefix); let len = prefix.len(); - let mut iter = self.db.raw_iterator(); + + let mut iter = self.get_find_prefix_iterator(&prefix); let mut key_values = Vec::new(); - iter.seek(&prefix); - let mut next_key = iter.key(); - while let Some(key) = next_key { - if !key.starts_with(&prefix) { - break; - } - if let Some(value) = iter.value() { - let key_value = (key[len..].to_vec(), value.to_vec()); - key_values.push(key_value); - } + while let Some((key, value)) = iter.item() { + let key_value = (key[len..].to_vec(), value.to_vec()); + key_values.push(key_value); iter.next(); - next_key = iter.key(); } Ok(key_values) } @@ -373,8 +379,32 @@ impl RocksDbStoreInternal { total_ram / 4, HYPER_CLOCK_CACHE_BLOCK_SIZE, )); + + // Configure bloom filters for prefix iteration optimization + block_options.set_bloom_filter(10.0, false); + block_options.set_whole_key_filtering(false); + + // 32KB blocks instead of default 4KB - reduces iterator seeks + block_options.set_block_size(32 * 1024); + // Use latest format for better compression and performance + block_options.set_format_version(5); + options.set_block_based_table_factory(&block_options); + // Configure prefix extraction for bloom filter optimization + // Use 8 bytes: ROOT_KEY_DOMAIN (1 byte) + BCS variant (1-2 bytes) + identifier start (4-5 bytes) + let prefix_extractor = SliceTransform::create_fixed_prefix(8); + options.set_prefix_extractor(prefix_extractor); + + // 12.5% of memtable size for bloom filter + options.set_memtable_prefix_bloom_ratio(0.125); + // Skip bloom filter for memtable when key exists + options.set_optimize_filters_for_hits(true); + // Use memory-mapped files for faster reads + options.set_allow_mmap_reads(true); + // Don't use random access pattern since we do prefix scans + options.set_advise_random_on_open(false); + let db = DB::open(&options, path_buf)?; let executor = RocksDbStoreExecutor { db: Arc::new(db),