diff --git a/Cargo.lock b/Cargo.lock index 7e16973900bb..f18639c48a85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5838,6 +5838,7 @@ dependencies = [ "papaya", "prometheus", "serde", + "tracing", ] [[package]] diff --git a/examples/Cargo.lock b/examples/Cargo.lock index 1bf1d80b2c58..1e90f339a9f4 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -3828,6 +3828,7 @@ dependencies = [ "papaya", "prometheus", "serde", + "tracing", ] [[package]] diff --git a/linera-base/src/tracing.rs b/linera-base/src/tracing.rs index fb545dcfdd31..09811bb0f8eb 100644 --- a/linera-base/src/tracing.rs +++ b/linera-base/src/tracing.rs @@ -12,8 +12,9 @@ use std::{ use is_terminal::IsTerminal as _; use tracing::Subscriber; +#[cfg(all(not(target_arch = "wasm32"), feature = "tempo"))] +use tracing_subscriber::filter::{filter_fn, FilterExt as _}; use tracing_subscriber::{ - filter::filter_fn, fmt::{ self, format::{FmtSpan, Format, Full}, @@ -59,20 +60,33 @@ pub fn init(log_name: &str) { /// ## Span Filtering for Performance /// /// By default, spans created by `#[instrument]` are logged to console AND sent -/// to OpenTelemetry. To disable console output for performance-critical functions -/// while keeping OpenTelemetry tracing, use: +/// to OpenTelemetry. In order to not spam stderr, you can set a low level, and use the +/// `telemetry_only` target: /// /// ```rust -/// use tracing::instrument; +/// use tracing::{instrument, Level}; +/// +/// // Always sent to telemetry; console output controlled by level +/// #[instrument(level = "trace", target = "telemetry_only")] +/// fn my_called_too_frequently_function() { +/// // Will be sent to OpenTelemetry regardless of RUST_LOG level +/// // Will only appear in console if RUST_LOG includes trace level +/// } /// -/// #[instrument(target = "telemetry_only")] -/// fn my_performance_critical_function() { -/// // This span will ONLY be sent to OpenTelemetry, not logged to console +/// // Higher level - more likely to appear in console +/// #[instrument(level = "info", target = "telemetry_only")] +/// fn my_important_function() { +/// // Will be sent to OpenTelemetry regardless of RUST_LOG level +/// // Will appear in console if RUST_LOG includes info level or higher /// } /// ``` /// -/// All explicit log calls (tracing::info!(), etc.) are always printed regardless -/// of span filtering. +/// **Key behaviors:** +/// - If span level >= RUST_LOG level: span goes to BOTH telemetry AND console (regardless of target) +/// - If span level < RUST_LOG level AND target = "telemetry_only": span goes to telemetry ONLY +/// - If span level < RUST_LOG level AND target != "telemetry_only": span is filtered out completely +/// - Default level for `telemetry_only` should be `trace` for minimal console noise +/// - All explicit log calls (tracing::info!(), etc.) are always printed regardless of span filtering pub async fn init_with_opentelemetry(log_name: &str) { #[cfg(feature = "tempo")] { @@ -102,28 +116,13 @@ fn init_internal(log_name: &str, with_opentelemetry: bool) { let color_output = !std::env::var("NO_COLOR").is_ok_and(|x| !x.is_empty()) && std::io::stderr().is_terminal(); - // Create a filter that: - // 1. Allows all explicit events (tracing::info!(), etc.) - // 2. Allows spans from #[instrument] by default - // 3. Blocks spans ONLY if they have target = "telemetry_only" - let console_filter = filter_fn(|metadata| { - if metadata.is_span() { - // Block spans that explicitly request telemetry-only via target = "telemetry_only" - metadata.target() != "telemetry_only" - } else { - // Always allow explicit log events (tracing::info!(), etc.) - true - } - }); - let stderr_layer = prepare_formatted_layer( format.as_deref(), fmt::layer() .with_span_events(span_events.clone()) .with_writer(std::io::stderr) .with_ansi(color_output), - ) - .with_filter(console_filter.clone()); + ); let maybe_log_file_layer = open_log_file(log_name).map(|file_writer| { prepare_formatted_layer( @@ -133,7 +132,6 @@ fn init_internal(log_name: &str, with_opentelemetry: bool) { .with_writer(Arc::new(file_writer)) .with_ansi(false), ) - .with_filter(console_filter.clone()) }); #[cfg(any(target_arch = "wasm32", not(feature = "tempo")))] @@ -173,7 +171,18 @@ fn init_internal(log_name: &str, with_opentelemetry: bool) { global::set_tracer_provider(tracer_provider.clone()); let tracer = tracer_provider.tracer("linera"); - let opentelemetry_layer = OpenTelemetryLayer::new(tracer); + + let telemetry_only_filter = + filter_fn(|metadata| metadata.is_span() && metadata.target() == "telemetry_only"); + + let otel_env_filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into()) + .from_env_lossy(); + + let opentelemetry_filter = otel_env_filter.or(telemetry_only_filter); + + let opentelemetry_layer = + OpenTelemetryLayer::new(tracer).with_filter(opentelemetry_filter); tracing_subscriber::registry() .with(env_filter) diff --git a/linera-chain/src/chain.rs b/linera-chain/src/chain.rs index 4689723d22be..2244eea2b5c0 100644 --- a/linera-chain/src/chain.rs +++ b/linera-chain/src/chain.rs @@ -35,6 +35,7 @@ use linera_views::{ views::{ClonableView, CryptoHashView, RootView, View}, }; use serde::{Deserialize, Serialize}; +use tracing::instrument; use crate::{ block::{Block, ConfirmedBlock}, @@ -391,6 +392,9 @@ where self.context().extra().chain_id() } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + ))] pub async fn query_application( &mut self, local_time: Timestamp, @@ -408,6 +412,10 @@ where .with_execution_context(ChainExecutionContext::Query) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + application_id = %application_id + ))] pub async fn describe_application( &mut self, application_id: ApplicationId, @@ -419,6 +427,11 @@ where .with_execution_context(ChainExecutionContext::DescribeApplication) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + target = %target, + height = %height + ))] pub async fn mark_messages_as_received( &mut self, target: &ChainId, @@ -506,6 +519,9 @@ where /// Verifies that this chain is up-to-date and all the messages executed ahead of time /// have been properly received by now. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] pub async fn validate_incoming_bundles(&self) -> Result<(), ChainError> { let chain_id = self.chain_id(); let pairs = self.inboxes.try_load_all_entries().await?; @@ -567,6 +583,11 @@ where /// round timeouts. /// /// Returns `true` if incoming `Subscribe` messages created new outbox entries. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + origin = %origin, + bundle_height = %bundle.height + ))] pub async fn receive_message_bundle( &mut self, origin: &ChainId, @@ -661,6 +682,9 @@ where } /// Removes the incoming message bundles in the block from the inboxes. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + ))] pub async fn remove_bundles_from_inboxes( &mut self, timestamp: Timestamp, @@ -750,6 +774,10 @@ where /// Executes a block: first the incoming messages, then the main operation. /// Does not update chain state other than the execution state. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %block.chain_id, + block_height = %block.height + ))] #[expect(clippy::too_many_arguments)] async fn execute_block_inner( chain: &mut ExecutionStateView, @@ -859,6 +887,10 @@ where /// Executes a block: first the incoming messages, then the main operation. /// Does not update chain state other than the execution state. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.height + ))] pub async fn execute_block( &mut self, block: &ProposedBlock, @@ -919,6 +951,10 @@ where /// Applies an execution outcome to the chain, updating the outboxes, state hash and chain /// manager. This does not touch the execution state itself, which must be updated separately. /// Returns the set of event streams that were updated as a result of applying the block. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.inner().inner().header.height + ))] pub async fn apply_confirmed_block( &mut self, block: &ConfirmedBlock, @@ -953,6 +989,10 @@ where /// Adds a block to `preprocessed_blocks`, and updates the outboxes where possible. /// Returns the set of streams that were updated as a result of preprocessing the block. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.inner().inner().header.height + ))] pub async fn preprocess_block( &mut self, block: &ConfirmedBlock, @@ -979,6 +1019,10 @@ where } /// Verifies that the block is valid according to the chain's application permission settings. + #[instrument(target = "telemetry_only", skip_all, fields( + block_height = %block.height, + num_transactions = %block.transactions.len() + ))] fn check_app_permissions( app_permissions: &ApplicationPermissions, block: &ProposedBlock, @@ -1021,6 +1065,10 @@ where } /// Returns the hashes of all blocks we have in the given range. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + next_block_height = %self.tip_state.get().next_block_height + ))] pub async fn block_hashes( &self, range: impl RangeBounds, @@ -1066,6 +1114,10 @@ where /// Updates the outboxes with the messages sent in the block. /// /// Returns the set of all recipients. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.header.height + ))] async fn process_outgoing_messages( &mut self, block: &Block, @@ -1157,6 +1209,10 @@ where /// Updates the event streams with events emitted by the block if they form a contiguous /// sequence (might not be the case when preprocessing a block). /// Returns the set of updated event streams. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.header.height + ))] async fn process_emitted_events( &mut self, block: &Block, diff --git a/linera-core/src/chain_worker/actor.rs b/linera-core/src/chain_worker/actor.rs index 83c196954021..0170aabe6abb 100644 --- a/linera-core/src/chain_worker/actor.rs +++ b/linera-core/src/chain_worker/actor.rs @@ -244,7 +244,7 @@ where /// Runs the worker until there are no more incoming requests. #[instrument( skip_all, - fields(chain_id = format!("{:.8}", self.chain_id)), + fields(chain_id = format!("{:.8}", self.chain_id), long_lived_services = %self.config.long_lived_services), )] async fn handle_requests( self, @@ -276,9 +276,12 @@ where self.chain_id, service_runtime_endpoint, ) + .instrument(span.clone()) .await?; - Box::pin(worker.handle_request(request).instrument(span)).await; + Box::pin(worker.handle_request(request)) + .instrument(span) + .await; loop { futures::select! { @@ -287,7 +290,7 @@ where let Some((request, span)) = maybe_request else { break; // Request sender was dropped. }; - Box::pin(worker.handle_request(request).instrument(span)).await; + Box::pin(worker.handle_request(request)).instrument(span).await; } } } diff --git a/linera-core/src/chain_worker/state.rs b/linera-core/src/chain_worker/state.rs index cee41ec1acae..f2f616e39bd9 100644 --- a/linera-core/src/chain_worker/state.rs +++ b/linera-core/src/chain_worker/state.rs @@ -70,6 +70,9 @@ where StorageClient: Storage + Clone + Send + Sync + 'static, { /// Creates a new [`ChainWorkerState`] using the provided `storage` client. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %chain_id + ))] #[expect(clippy::too_many_arguments)] pub(super) async fn load( config: ChainWorkerConfig, @@ -103,7 +106,7 @@ where } /// Handles a request and applies it to the chain state. - #[instrument(skip_all)] + #[instrument(skip_all, fields(chain_id = %self.chain_id()))] pub(super) async fn handle_request( &mut self, request: ChainWorkerRequest, @@ -267,6 +270,10 @@ where } /// Returns the requested blob, if it belongs to the current locking block or pending proposal. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + blob_id = %blob_id + ))] async fn download_pending_blob(&self, blob_id: BlobId) -> Result { if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? { return Ok(blob); @@ -277,6 +284,9 @@ where /// Reads the blobs from the chain manager or from storage. Returns an error if any are /// missing. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn get_required_blobs( &self, required_blob_ids: impl IntoIterator, @@ -297,6 +307,9 @@ where } /// Tries to read the blobs from the chain manager or storage. Returns `None` if not found. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn maybe_get_required_blobs( &self, blob_ids: impl IntoIterator, @@ -365,6 +378,9 @@ where } /// Loads pending cross-chain requests, and adds `NewRound` notifications where appropriate. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn create_network_actions( &self, old_round: Option, @@ -402,6 +418,10 @@ where }) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + num_recipients = %heights_by_recipient.len() + ))] async fn create_cross_chain_requests( &self, heights_by_recipient: BTreeMap>, @@ -473,6 +493,10 @@ where /// Returns true if there are no more outgoing messages in flight up to the given /// block height. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %height + ))] async fn all_messages_to_tracked_chains_delivered_up_to( &self, height: BlockHeight, @@ -499,6 +523,10 @@ where } /// Processes a leader timeout issued for this multi-owner chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %certificate.inner().height() + ))] async fn process_timeout( &mut self, certificate: TimeoutCertificate, @@ -537,6 +565,10 @@ where /// /// If they cannot be found, it creates an entry in `pending_proposed_blobs` so they can be /// submitted one by one. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %proposal.content.block.height + ))] async fn load_proposal_blobs( &mut self, proposal: &BlockProposal, @@ -581,6 +613,10 @@ where } /// Processes a validated block issued for this multi-owner chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %certificate.block().header.height + ))] async fn process_validated_block( &mut self, certificate: ValidatedBlockCertificate, @@ -638,6 +674,11 @@ where } /// Processes a confirmed block (aka a commit). + #[instrument(skip_all, fields( + chain_id = %certificate.block().header.chain_id, + height = %certificate.block().header.height, + block_hash = %certificate.hash(), + ))] async fn process_confirmed_block( &mut self, certificate: ConfirmedBlockCertificate, @@ -815,6 +856,7 @@ where computed: Box::new(verified_outcome), } ); + // Update the rest of the chain state. let updated_streams = chain .apply_confirmed_block(certificate.value(), local_time) @@ -876,7 +918,7 @@ where } /// Updates the chain's inboxes, receiving messages from a cross-chain update. - #[instrument(level = "trace", skip(self, bundles))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, bundles))] async fn process_cross_chain_update( &mut self, origin: ChainId, @@ -925,6 +967,11 @@ where } /// Handles the cross-chain request confirming that the recipient was updated. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + recipient = %recipient, + latest_height = %latest_height + ))] async fn confirm_updated_recipient( &mut self, recipient: ChainId, @@ -947,6 +994,10 @@ where Ok(()) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + num_trackers = %new_trackers.len() + ))] async fn update_received_certificate_trackers( &mut self, new_trackers: BTreeMap, @@ -958,6 +1009,11 @@ where } /// Attempts to vote for a leader timeout, if possible. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %height, + round = %round + ))] async fn vote_for_leader_timeout( &mut self, height: BlockHeight, @@ -985,6 +1041,9 @@ where } /// Votes for falling back to a public chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn vote_for_fallback(&mut self) -> Result<(), WorkerError> { let chain = &mut self.chain; if let (epoch, Some(entry)) = ( @@ -1007,6 +1066,10 @@ where Ok(()) } + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + blob_id = %blob.id() + ))] async fn handle_pending_blob(&mut self, blob: Blob) -> Result { let mut was_expected = self .chain @@ -1040,6 +1103,10 @@ where /// Returns a stored [`Certificate`] for the chain's block at the requested [`BlockHeight`]. #[cfg(with_testing)] + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + height = %height + ))] async fn read_certificate( &mut self, height: BlockHeight, @@ -1058,6 +1125,10 @@ where } /// Queries an application's state on the chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + query_application_id = %query.application_id() + ))] async fn query_application(&mut self, query: Query) -> Result { self.ensure_is_active().await?; let local_time = self.storage.clock().current_time(); @@ -1069,6 +1140,10 @@ where } /// Returns an application's description. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + application_id = %application_id + ))] async fn describe_application( &mut self, application_id: ApplicationId, @@ -1079,6 +1154,10 @@ where } /// Executes a block without persisting any changes to the state. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.height + ))] async fn stage_block_execution( &mut self, block: ProposedBlock, @@ -1111,6 +1190,10 @@ where } /// Validates and executes a block proposed to extend this chain. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %proposal.content.block.height + ))] async fn handle_block_proposal( &mut self, proposal: BlockProposal, @@ -1254,6 +1337,9 @@ where } /// Prepares a [`ChainInfoResponse`] for a [`ChainInfoQuery`]. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn prepare_chain_info_response( &mut self, query: ChainInfoQuery, @@ -1319,6 +1405,10 @@ where } /// Executes a block, caches the result, and returns the outcome. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id(), + block_height = %block.height + ))] async fn execute_block( &mut self, block: &ProposedBlock, @@ -1360,6 +1450,9 @@ where /// Stores the chain state in persistent storage. /// /// Waits until the [`ChainStateView`] is no longer shared before persisting the changes. + #[instrument(target = "telemetry_only", skip_all, fields( + chain_id = %self.chain_id() + ))] async fn save(&mut self) -> Result<(), WorkerError> { self.clear_shared_chain_view().await; self.chain.save().await?; diff --git a/linera-core/src/worker.rs b/linera-core/src/worker.rs index 0bbb06d47483..2915c0791539 100644 --- a/linera-core/src/worker.rs +++ b/linera-core/src/worker.rs @@ -462,7 +462,7 @@ where .ok_or(WorkerError::InvalidLiteCertificate)?, )) } - _ => return Err(WorkerError::InvalidLiteCertificate), + _ => Err(WorkerError::InvalidLiteCertificate), } } } @@ -555,7 +555,11 @@ where } /// Executes a [`Query`] for an application's state on a specific chain. - #[instrument(level = "trace", skip(self, chain_id, query))] + #[instrument( + level = "trace", + target = "telemetry_only", + skip(self, chain_id, query) + )] pub async fn query_application( &self, chain_id: ChainId, @@ -567,7 +571,11 @@ where .await } - #[instrument(level = "trace", skip(self, chain_id, application_id))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, chain_id, application_id), fields( + nickname = %self.nickname, + chain_id = %chain_id, + application_id = %application_id + ))] pub async fn describe_application( &self, chain_id: ChainId, @@ -585,7 +593,13 @@ where /// Processes a confirmed block (aka a commit). #[instrument( level = "trace", - skip(self, certificate, notify_when_messages_are_delivered) + target = "telemetry_only", + skip(self, certificate, notify_when_messages_are_delivered), + fields( + nickname = %self.nickname, + chain_id = %certificate.block().header.chain_id, + block_height = %certificate.block().header.height + ) )] async fn process_confirmed_block( &self, @@ -604,7 +618,11 @@ where } /// Processes a validated block issued from a multi-owner chain. - #[instrument(level = "trace", skip(self, certificate))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, certificate), fields( + nickname = %self.nickname, + chain_id = %certificate.block().header.chain_id, + block_height = %certificate.block().header.height + ))] async fn process_validated_block( &self, certificate: ValidatedBlockCertificate, @@ -620,7 +638,11 @@ where } /// Processes a leader timeout issued from a multi-owner chain. - #[instrument(level = "trace", skip(self, certificate))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, certificate), fields( + nickname = %self.nickname, + chain_id = %certificate.value().chain_id(), + height = %certificate.value().height() + ))] async fn process_timeout( &self, certificate: TimeoutCertificate, @@ -635,7 +657,12 @@ where .await } - #[instrument(level = "trace", skip(self, origin, recipient, bundles))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, origin, recipient, bundles), fields( + nickname = %self.nickname, + origin = %origin, + recipient = %recipient, + num_bundles = %bundles.len() + ))] async fn process_cross_chain_update( &self, origin: ChainId, @@ -653,7 +680,11 @@ where } /// Returns a stored [`ConfirmedBlockCertificate`] for a chain's block. - #[instrument(level = "trace", skip(self, chain_id, height))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, chain_id, height), fields( + nickname = %self.nickname, + chain_id = %chain_id, + height = %height + ))] #[cfg(with_testing)] pub async fn read_certificate( &self, @@ -671,7 +702,10 @@ where /// /// The returned view holds a lock on the chain state, which prevents the worker from changing /// the state of that chain. - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] pub async fn chain_state_view( &self, chain_id: ChainId, @@ -682,7 +716,10 @@ where .await } - #[instrument(level = "trace", skip(self, request_builder))] + #[instrument(level = "trace", target = "telemetry_only", skip(self, request_builder), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] /// Sends a request to the [`ChainWorker`] for a [`ChainId`] and waits for the `Response`. async fn query_chain_worker( &self, @@ -705,7 +742,10 @@ where /// Retrieves an endpoint to a [`ChainWorkerActor`] from the cache, creating one and adding it /// to the cache if needed. - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] async fn get_chain_worker_endpoint( &self, chain_id: ChainId, @@ -754,7 +794,10 @@ where /// and add it to the cache if needed. /// /// Returns [`None`] if the cache is full and no candidate for eviction was found. - #[instrument(level = "trace", skip(self))] + #[instrument(level = "trace", target = "telemetry_only", skip(self), fields( + nickname = %self.nickname, + chain_id = %chain_id + ))] #[expect(clippy::type_complexity)] fn try_get_chain_worker_endpoint( &self, @@ -1067,6 +1110,11 @@ where } /// Updates the received certificate trackers to at least the given values. + #[instrument(target = "telemetry_only", skip_all, fields( + nickname = %self.nickname, + chain_id = %chain_id, + num_trackers = %new_trackers.len() + ))] pub async fn update_received_certificate_trackers( &self, chain_id: ChainId, diff --git a/linera-service/src/proxy/grpc.rs b/linera-service/src/proxy/grpc.rs index b113b3d3d839..7377e70449c4 100644 --- a/linera-service/src/proxy/grpc.rs +++ b/linera-service/src/proxy/grpc.rs @@ -303,6 +303,7 @@ where } } + #[instrument(target = "telemetry_only", skip_all, fields(remote_addr = ?request.remote_addr(), chain_id = ?request.get_ref().chain_id()))] fn worker_client( &self, request: Request, @@ -374,7 +375,12 @@ where { type SubscribeStream = UnboundedReceiverStream>; - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_block_proposal") + )] async fn handle_block_proposal( &self, request: Request, @@ -386,7 +392,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_lite_certificate") + )] async fn handle_lite_certificate( &self, request: Request, @@ -398,7 +409,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_confirmed_certificate") + )] async fn handle_confirmed_certificate( &self, request: Request, @@ -410,7 +426,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_validated_certificate") + )] async fn handle_validated_certificate( &self, request: Request, @@ -422,7 +443,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_timeout_certificate") + )] async fn handle_timeout_certificate( &self, request: Request, @@ -434,7 +460,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_chain_info_query") + )] async fn handle_chain_info_query( &self, request: Request, @@ -446,7 +477,12 @@ where ) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "subscribe") + )] async fn subscribe( &self, request: Request, @@ -475,7 +511,12 @@ where Ok(Response::new(linera_version::VersionInfo::default().into())) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "get_network_description") + )] async fn get_network_description( &self, _request: Request<()>, @@ -490,7 +531,12 @@ where Ok(Response::new(description.into())) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "upload_blob") + )] async fn upload_blob(&self, request: Request) -> Result, Status> { let content: linera_sdk::linera_base_types::BlobContent = request.into_inner().try_into()?; @@ -503,7 +549,12 @@ where Ok(Response::new(id.try_into()?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_blob") + )] async fn download_blob( &self, request: Request, @@ -519,7 +570,12 @@ where Ok(Response::new(blob.into_content().try_into()?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_pending_blob") + )] async fn download_pending_blob( &self, request: Request, @@ -544,7 +600,12 @@ where } } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "handle_pending_blob") + )] async fn handle_pending_blob( &self, request: Request, @@ -569,7 +630,12 @@ where } } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_certificate") + )] async fn download_certificate( &self, request: Request, @@ -586,7 +652,12 @@ where Ok(Response::new(certificate.try_into()?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_certificates") + )] async fn download_certificates( &self, request: Request, @@ -632,7 +703,12 @@ where )?)) } - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "download_certificates_by_heights") + )] async fn download_certificates_by_heights( &self, request: Request, @@ -747,7 +823,9 @@ where })) } - #[instrument(skip_all, err(level = Level::WARN))] + #[instrument(target = "telemetry_only", skip_all, err(level = Level::WARN), fields( + method = "blob_last_used_by" + ))] async fn blob_last_used_by( &self, request: Request, @@ -767,7 +845,9 @@ where Ok(Response::new(last_used_by.into())) } - #[instrument(skip_all, err(level = Level::WARN))] + #[instrument(target = "telemetry_only", skip_all, err(level = Level::WARN), fields( + method = "blob_last_used_by_certificate" + ))] async fn blob_last_used_by_certificate( &self, request: Request, @@ -777,7 +857,9 @@ where self.download_certificate(request).await } - #[instrument(skip_all, err(level = Level::WARN))] + #[instrument(target = "telemetry_only", skip_all, err(level = Level::WARN), fields( + method = "missing_blob_ids" + ))] async fn missing_blob_ids( &self, request: Request, @@ -798,7 +880,12 @@ impl NotifierService for GrpcProxy where S: Storage + Clone + Send + Sync + 'static, { - #[instrument(skip_all, err(Display))] + #[instrument( + target = "telemetry_only", + skip_all, + err(Display), + fields(method = "notify") + )] async fn notify(&self, request: Request) -> Result, Status> { let notification = request.into_inner(); let chain_id = notification diff --git a/linera-storage/Cargo.toml b/linera-storage/Cargo.toml index 04ff5dd63209..02c34f4ac613 100644 --- a/linera-storage/Cargo.toml +++ b/linera-storage/Cargo.toml @@ -45,6 +45,7 @@ linera-views.workspace = true papaya.workspace = true prometheus.workspace = true serde.workspace = true +tracing.workspace = true [dev-dependencies] anyhow.workspace = true diff --git a/linera-storage/src/db_storage.rs b/linera-storage/src/db_storage.rs index e34562657c44..5c0267556eb2 100644 --- a/linera-storage/src/db_storage.rs +++ b/linera-storage/src/db_storage.rs @@ -28,6 +28,7 @@ use linera_views::{ ViewError, }; use serde::{Deserialize, Serialize}; +use tracing::instrument; #[cfg(with_testing)] use { futures::channel::oneshot::{self, Receiver}, @@ -544,6 +545,7 @@ where &self.clock } + #[instrument(level = "trace", target = "telemetry_only", skip_all, fields(chain_id = %chain_id))] async fn load_chain( &self, chain_id: ChainId, @@ -563,6 +565,7 @@ where ChainStateView::load(context).await } + #[instrument(level = "trace", target = "telemetry_only", skip_all, fields(blob_id = %blob_id))] async fn contains_blob(&self, blob_id: BlobId) -> Result { let store = self.database.open_shared(&[])?; let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?; @@ -572,6 +575,7 @@ where Ok(test) } + #[instrument(target = "telemetry_only", skip_all, fields(blob_count = blob_ids.len()))] async fn missing_blobs(&self, blob_ids: &[BlobId]) -> Result, ViewError> { let store = self.database.open_shared(&[])?; let mut keys = Vec::new(); @@ -602,6 +606,7 @@ where Ok(test) } + #[instrument(target = "telemetry_only", skip_all, fields(hash = %hash))] async fn read_confirmed_block( &self, hash: CryptoHash, @@ -616,6 +621,7 @@ where Ok(value) } + #[instrument(target = "telemetry_only", skip_all, fields(blob_id = %blob_id))] async fn read_blob(&self, blob_id: BlobId) -> Result, ViewError> { let store = self.database.open_shared(&[])?; let blob_key = bcs::to_bytes(&BaseKey::Blob(blob_id))?; @@ -682,6 +688,7 @@ where Ok(blob_states) } + #[instrument(target = "telemetry_only", skip_all, fields(blob_id = %blob.id()))] async fn write_blob(&self, blob: &Blob) -> Result<(), ViewError> { let mut batch = Batch::new(); batch.add_blob(blob)?; @@ -996,6 +1003,7 @@ where Ok(()) } + #[instrument(target = "telemetry_only", skip_all, fields(batch_size = batch.key_value_bytes.len()))] async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> { if batch.key_value_bytes.is_empty() { return Ok(());