Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

63 changes: 36 additions & 27 deletions linera-base/src/tracing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ use std::{

use is_terminal::IsTerminal as _;
use tracing::Subscriber;
#[cfg(all(not(target_arch = "wasm32"), feature = "tempo"))]
use tracing_subscriber::filter::{filter_fn, FilterExt as _};
use tracing_subscriber::{
filter::filter_fn,
fmt::{
self,
format::{FmtSpan, Format, Full},
Expand Down Expand Up @@ -59,20 +60,33 @@ pub fn init(log_name: &str) {
/// ## Span Filtering for Performance
///
/// By default, spans created by `#[instrument]` are logged to console AND sent
/// to OpenTelemetry. To disable console output for performance-critical functions
/// while keeping OpenTelemetry tracing, use:
/// to OpenTelemetry. In order to not spam stderr, you can set a low level, and use the
/// `telemetry_only` target:
///
/// ```rust
/// use tracing::instrument;
/// use tracing::{instrument, Level};
///
/// // Always sent to telemetry; console output controlled by level
/// #[instrument(level = "trace", target = "telemetry_only")]
/// fn my_called_too_frequently_function() {
/// // Will be sent to OpenTelemetry regardless of RUST_LOG level
/// // Will only appear in console if RUST_LOG includes trace level
/// }
///
/// #[instrument(target = "telemetry_only")]
/// fn my_performance_critical_function() {
/// // This span will ONLY be sent to OpenTelemetry, not logged to console
/// // Higher level - more likely to appear in console
/// #[instrument(level = "info", target = "telemetry_only")]
/// fn my_important_function() {
/// // Will be sent to OpenTelemetry regardless of RUST_LOG level
/// // Will appear in console if RUST_LOG includes info level or higher
/// }
/// ```
///
/// All explicit log calls (tracing::info!(), etc.) are always printed regardless
/// of span filtering.
/// **Key behaviors:**
/// - If span level >= RUST_LOG level: span goes to BOTH telemetry AND console (regardless of target)
/// - If span level < RUST_LOG level AND target = "telemetry_only": span goes to telemetry ONLY
/// - If span level < RUST_LOG level AND target != "telemetry_only": span is filtered out completely
/// - Default level for `telemetry_only` should be `trace` for minimal console noise
/// - All explicit log calls (tracing::info!(), etc.) are always printed regardless of span filtering
pub async fn init_with_opentelemetry(log_name: &str) {
#[cfg(feature = "tempo")]
{
Expand Down Expand Up @@ -102,28 +116,13 @@ fn init_internal(log_name: &str, with_opentelemetry: bool) {
let color_output =
!std::env::var("NO_COLOR").is_ok_and(|x| !x.is_empty()) && std::io::stderr().is_terminal();

// Create a filter that:
// 1. Allows all explicit events (tracing::info!(), etc.)
// 2. Allows spans from #[instrument] by default
// 3. Blocks spans ONLY if they have target = "telemetry_only"
let console_filter = filter_fn(|metadata| {
if metadata.is_span() {
// Block spans that explicitly request telemetry-only via target = "telemetry_only"
metadata.target() != "telemetry_only"
} else {
// Always allow explicit log events (tracing::info!(), etc.)
true
}
});

let stderr_layer = prepare_formatted_layer(
format.as_deref(),
fmt::layer()
.with_span_events(span_events.clone())
.with_writer(std::io::stderr)
.with_ansi(color_output),
)
.with_filter(console_filter.clone());
);

let maybe_log_file_layer = open_log_file(log_name).map(|file_writer| {
prepare_formatted_layer(
Expand All @@ -133,7 +132,6 @@ fn init_internal(log_name: &str, with_opentelemetry: bool) {
.with_writer(Arc::new(file_writer))
.with_ansi(false),
)
.with_filter(console_filter.clone())
});

#[cfg(any(target_arch = "wasm32", not(feature = "tempo")))]
Expand Down Expand Up @@ -173,7 +171,18 @@ fn init_internal(log_name: &str, with_opentelemetry: bool) {
global::set_tracer_provider(tracer_provider.clone());

let tracer = tracer_provider.tracer("linera");
let opentelemetry_layer = OpenTelemetryLayer::new(tracer);

let telemetry_only_filter =
filter_fn(|metadata| metadata.is_span() && metadata.target() == "telemetry_only");

let otel_env_filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy();

let opentelemetry_filter = otel_env_filter.or(telemetry_only_filter);

let opentelemetry_layer =
OpenTelemetryLayer::new(tracer).with_filter(opentelemetry_filter);

tracing_subscriber::registry()
.with(env_filter)
Expand Down
56 changes: 56 additions & 0 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use linera_views::{
views::{ClonableView, CryptoHashView, RootView, View},
};
use serde::{Deserialize, Serialize};
use tracing::instrument;

use crate::{
block::{Block, ConfirmedBlock},
Expand Down Expand Up @@ -391,6 +392,9 @@ where
self.context().extra().chain_id()
}

#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
))]
pub async fn query_application(
&mut self,
local_time: Timestamp,
Expand All @@ -408,6 +412,10 @@ where
.with_execution_context(ChainExecutionContext::Query)
}

#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
application_id = %application_id
))]
pub async fn describe_application(
&mut self,
application_id: ApplicationId,
Expand All @@ -419,6 +427,11 @@ where
.with_execution_context(ChainExecutionContext::DescribeApplication)
}

#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
target = %target,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should really decide how we label things. I've seen things like nickname , target, etc. and the are really NOT helping.

height = %height
))]
pub async fn mark_messages_as_received(
&mut self,
target: &ChainId,
Expand Down Expand Up @@ -506,6 +519,9 @@ where

/// Verifies that this chain is up-to-date and all the messages executed ahead of time
/// have been properly received by now.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id()
))]
pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> {
let chain_id = self.chain_id();
let pairs = self.inboxes.try_load_all_entries().await?;
Expand Down Expand Up @@ -567,6 +583,11 @@ where
/// round timeouts.
///
/// Returns `true` if incoming `Subscribe` messages created new outbox entries.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
origin = %origin,
bundle_height = %bundle.height
))]
pub async fn receive_message_bundle(
&mut self,
origin: &ChainId,
Expand Down Expand Up @@ -661,6 +682,9 @@ where
}

/// Removes the incoming message bundles in the block from the inboxes.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
))]
pub async fn remove_bundles_from_inboxes(
&mut self,
timestamp: Timestamp,
Expand Down Expand Up @@ -750,6 +774,10 @@ where

/// Executes a block: first the incoming messages, then the main operation.
/// Does not update chain state other than the execution state.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %block.chain_id,
block_height = %block.height
))]
#[expect(clippy::too_many_arguments)]
async fn execute_block_inner(
chain: &mut ExecutionStateView<C>,
Expand Down Expand Up @@ -859,6 +887,10 @@ where

/// Executes a block: first the incoming messages, then the main operation.
/// Does not update chain state other than the execution state.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
block_height = %block.height
))]
pub async fn execute_block(
&mut self,
block: &ProposedBlock,
Expand Down Expand Up @@ -919,6 +951,10 @@ where
/// Applies an execution outcome to the chain, updating the outboxes, state hash and chain
/// manager. This does not touch the execution state itself, which must be updated separately.
/// Returns the set of event streams that were updated as a result of applying the block.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
block_height = %block.inner().inner().header.height
))]
pub async fn apply_confirmed_block(
&mut self,
block: &ConfirmedBlock,
Expand Down Expand Up @@ -953,6 +989,10 @@ where

/// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible.
/// Returns the set of streams that were updated as a result of preprocessing the block.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
block_height = %block.inner().inner().header.height
))]
pub async fn preprocess_block(
&mut self,
block: &ConfirmedBlock,
Expand All @@ -979,6 +1019,10 @@ where
}

/// Verifies that the block is valid according to the chain's application permission settings.
#[instrument(target = "telemetry_only", skip_all, fields(
block_height = %block.height,
num_transactions = %block.transactions.len()
))]
fn check_app_permissions(
app_permissions: &ApplicationPermissions,
block: &ProposedBlock,
Expand Down Expand Up @@ -1021,6 +1065,10 @@ where
}

/// Returns the hashes of all blocks we have in the given range.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
next_block_height = %self.tip_state.get().next_block_height
))]
pub async fn block_hashes(
&self,
range: impl RangeBounds<BlockHeight>,
Expand Down Expand Up @@ -1066,6 +1114,10 @@ where
/// Updates the outboxes with the messages sent in the block.
///
/// Returns the set of all recipients.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
block_height = %block.header.height
))]
async fn process_outgoing_messages(
&mut self,
block: &Block,
Expand Down Expand Up @@ -1157,6 +1209,10 @@ where
/// Updates the event streams with events emitted by the block if they form a contiguous
/// sequence (might not be the case when preprocessing a block).
/// Returns the set of updated event streams.
#[instrument(target = "telemetry_only", skip_all, fields(
chain_id = %self.chain_id(),
block_height = %block.header.height
))]
async fn process_emitted_events(
&mut self,
block: &Block,
Expand Down
9 changes: 6 additions & 3 deletions linera-core/src/chain_worker/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ where
/// Runs the worker until there are no more incoming requests.
#[instrument(
skip_all,
fields(chain_id = format!("{:.8}", self.chain_id)),
fields(chain_id = format!("{:.8}", self.chain_id), long_lived_services = %self.config.long_lived_services),
)]
async fn handle_requests(
self,
Expand Down Expand Up @@ -276,9 +276,12 @@ where
self.chain_id,
service_runtime_endpoint,
)
.instrument(span.clone())
.await?;

Box::pin(worker.handle_request(request).instrument(span)).await;
Box::pin(worker.handle_request(request))
.instrument(span)
.await;

loop {
futures::select! {
Expand All @@ -287,7 +290,7 @@ where
let Some((request, span)) = maybe_request else {
break; // Request sender was dropped.
};
Box::pin(worker.handle_request(request).instrument(span)).await;
Box::pin(worker.handle_request(request)).instrument(span).await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small change but are you sure this is what we want? @Twey has more experience with these spans.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we instrument inside the pin, we reach the compilation recursion limit

}
}
}
Expand Down
Loading
Loading