Skip to content

Commit 0c747ec

Browse files
committed
refactor(agent): add tracing to expensive calls
1 parent 1edcc34 commit 0c747ec

File tree

11 files changed

+124
-25
lines changed

11 files changed

+124
-25
lines changed

src/agent.rs

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ use {
7272
lazy_static::lazy_static,
7373
std::sync::Arc,
7474
tokio::sync::watch,
75+
tracing::instrument,
7576
};
7677

7778
pub mod config;
@@ -121,58 +122,59 @@ impl Agent {
121122
};
122123
}
123124

125+
#[instrument(skip(self))]
124126
async fn spawn(&self) -> Result<()> {
125127
// job handles
126-
let mut jhs = vec![];
128+
let mut handles = vec![];
127129

128130
// Create the Application State.
129131
let state = Arc::new(state::State::new(&self.config).await);
130132

131133
// Spawn the primary network Oracle.
132-
jhs.push(tokio::spawn(services::oracle(
134+
handles.extend(services::oracle(
133135
self.config.primary_network.clone(),
134136
network::Network::Primary,
135137
state.clone(),
136-
)));
138+
));
137139

138-
jhs.push(tokio::spawn(services::exporter(
140+
handles.extend(services::exporter(
139141
self.config.primary_network.clone(),
140142
network::Network::Primary,
141143
state.clone(),
142-
)));
144+
));
143145

144146
// Spawn the secondary network Oracle, if needed.
145147
if let Some(config) = &self.config.secondary_network {
146-
jhs.push(tokio::spawn(services::oracle(
148+
handles.extend(services::oracle(
147149
config.clone(),
148150
network::Network::Secondary,
149151
state.clone(),
150-
)));
152+
));
151153

152-
jhs.push(tokio::spawn(services::exporter(
154+
handles.extend(services::exporter(
153155
config.clone(),
154156
network::Network::Secondary,
155157
state.clone(),
156-
)));
158+
));
157159
}
158160

159161
// Create the Notifier task for the Pythd RPC.
160-
jhs.push(tokio::spawn(services::notifier(state.clone())));
162+
handles.push(tokio::spawn(services::notifier(state.clone())));
161163

162164
// Spawn the Pythd API Server
163-
jhs.push(tokio::spawn(rpc::run(
165+
handles.push(tokio::spawn(rpc::run(
164166
self.config.pythd_api_server.clone(),
165167
state.clone(),
166168
)));
167169

168170
// Spawn the metrics server
169-
jhs.push(tokio::spawn(metrics::spawn(
171+
handles.push(tokio::spawn(metrics::spawn(
170172
self.config.metrics_server.bind_address,
171173
)));
172174

173175
// Spawn the remote keypair loader endpoint for both networks
174-
jhs.append(
175-
&mut services::keypairs(
176+
handles.extend(
177+
services::keypairs(
176178
self.config.primary_network.rpc_url.clone(),
177179
self.config
178180
.secondary_network
@@ -185,7 +187,7 @@ impl Agent {
185187
);
186188

187189
// Wait for all tasks to complete
188-
join_all(jhs).await;
190+
join_all(handles).await;
189191

190192
Ok(())
191193
}

src/agent/pyth/rpc.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use {
5050
sync::Arc,
5151
},
5252
tokio::sync::mpsc,
53+
tracing::instrument,
5354
warp::{
5455
ws::{
5556
Message,
@@ -411,6 +412,7 @@ impl Default for Config {
411412
}
412413
}
413414

415+
#[instrument(skip_all)]
414416
pub async fn run<S>(config: Config, state: Arc<S>)
415417
where
416418
S: state::Prices,

src/agent/pyth/rpc/update_price.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,10 @@ use {
1212
Request,
1313
Value,
1414
},
15+
tracing::instrument,
1516
};
1617

18+
#[instrument(skip_all, fields(account))]
1719
pub async fn update_price<S>(
1820
state: &S,
1921
request: &Request<Method, Value>,
@@ -28,6 +30,8 @@ where
2830
.ok_or_else(|| anyhow!("Missing request parameters"))?,
2931
)?;
3032

33+
tracing::Span::current().record("account", params.account.to_string());
34+
3135
state
3236
.update_local_price(
3337
&params.account.parse::<solana_sdk::pubkey::Pubkey>()?,

src/agent/services/exporter.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@ use {
2323
},
2424
tokio::{
2525
sync::watch,
26+
task::JoinHandle,
2627
time::Interval,
2728
},
29+
tracing::instrument,
2830
};
2931

3032
#[derive(Clone, Serialize, Deserialize, Debug)]
@@ -118,6 +120,13 @@ struct NetworkStateQuerier {
118120
}
119121

120122
impl NetworkStateQuerier {
123+
#[instrument(
124+
skip(rpc_endpoint, rpc_timeout, query_interval),
125+
fields(
126+
rpc_timeout = rpc_timeout.as_millis(),
127+
query_interval = query_interval.period().as_millis(),
128+
)
129+
)]
121130
pub fn new(
122131
rpc_endpoint: &str,
123132
rpc_timeout: Duration,
@@ -140,6 +149,7 @@ impl NetworkStateQuerier {
140149
}
141150
}
142151

152+
#[instrument(skip(self))]
143153
async fn query_network_state(&mut self) -> Result<()> {
144154
// Fetch the blockhash and current slot in parallel
145155
let current_slot_future = self
@@ -160,12 +170,15 @@ impl NetworkStateQuerier {
160170
}
161171
}
162172

163-
pub async fn exporter<S>(config: network::Config, network: Network, state: Arc<S>)
173+
#[instrument(skip(config, state))]
174+
pub fn exporter<S>(config: network::Config, network: Network, state: Arc<S>) -> Vec<JoinHandle<()>>
164175
where
165176
S: Exporter,
166177
S: Transactions,
167178
S: Send + Sync + 'static,
168179
{
180+
let mut handles = Vec::new();
181+
169182
// Create and spawn the network state querier
170183
let (network_state_tx, network_state_rx) = watch::channel(Default::default());
171184
let mut network_state_querier = NetworkStateQuerier::new(
@@ -175,12 +188,23 @@ where
175188
network_state_tx,
176189
);
177190

178-
tokio::spawn(transaction_monitor::transaction_monitor(
191+
handles.push(tokio::spawn(transaction_monitor::transaction_monitor(
179192
config.clone(),
180193
state.clone(),
194+
)));
195+
196+
handles.push(tokio::spawn(exporter::exporter(
197+
config,
198+
network,
199+
state,
200+
network_state_rx,
201+
)));
202+
203+
handles.push(tokio::spawn(
204+
async move { network_state_querier.run().await },
181205
));
182-
tokio::spawn(exporter::exporter(config, network, state, network_state_rx));
183-
tokio::spawn(async move { network_state_querier.run().await });
206+
207+
handles
184208
}
185209

186210
mod exporter {
@@ -294,6 +318,7 @@ mod transaction_monitor {
294318
sync::Arc,
295319
time::Duration,
296320
},
321+
tracing::instrument,
297322
};
298323

299324
#[derive(Clone, Serialize, Deserialize, Debug)]
@@ -318,6 +343,7 @@ mod transaction_monitor {
318343
}
319344
}
320345

346+
#[instrument(skip(config, state))]
321347
pub async fn transaction_monitor<S>(config: network::Config, state: Arc<S>)
322348
where
323349
S: Transactions,

src/agent/services/notifier.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
use {
77
crate::agent::state::Prices,
88
std::sync::Arc,
9+
tracing::instrument,
910
};
1011

12+
#[instrument(skip(state))]
1113
pub async fn notifier<S>(state: Arc<S>)
1214
where
1315
S: Prices,

src/agent/services/oracle.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,30 +38,35 @@ use {
3838
Instant,
3939
},
4040
},
41+
tokio::task::JoinHandle,
4142
tokio_stream::StreamExt,
43+
tracing::instrument,
4244
};
4345

44-
pub async fn oracle<S>(config: Config, network: Network, state: Arc<S>)
46+
#[instrument(skip(config, state))]
47+
pub fn oracle<S>(config: Config, network: Network, state: Arc<S>) -> Vec<JoinHandle<()>>
4548
where
4649
S: Oracle,
4750
S: Send + Sync + 'static,
4851
{
52+
let mut handles = Vec::new();
53+
4954
let Ok(key_store) = KeyStore::new(config.key_store.clone()) else {
5055
tracing::warn!("Key store not available, Oracle won't start.");
51-
return;
56+
return handles;
5257
};
5358

54-
tokio::spawn(poller(
59+
handles.push(tokio::spawn(poller(
5560
config.clone(),
5661
network,
5762
state.clone(),
5863
key_store.mapping_key,
5964
key_store.publish_keypair,
6065
config.oracle.max_lookup_batch_size,
61-
));
66+
)));
6267

6368
if config.oracle.subscriber_enabled {
64-
tokio::spawn(async move {
69+
handles.push(tokio::spawn(async move {
6570
loop {
6671
let current_time = Instant::now();
6772
if let Err(ref err) = subscriber(
@@ -79,15 +84,18 @@ where
7984
}
8085
}
8186
}
82-
});
87+
}));
8388
}
89+
90+
handles
8491
}
8592

8693
/// When an account RPC Subscription update is receiveed.
8794
///
8895
/// We check if the account is one we're aware of and tracking, and if so, spawn
8996
/// a small background task that handles that update. We only do this for price
9097
/// accounts, all other accounts are handled below in the poller.
98+
#[instrument(skip(config, state))]
9199
async fn subscriber<S>(
92100
config: Config,
93101
network: Network,
@@ -144,6 +152,7 @@ where
144152
}
145153

146154
/// On poll lookup all Pyth Mapping/Product/Price accounts and sync.
155+
#[instrument(skip(config, state))]
147156
async fn poller<S>(
148157
config: Config,
149158
network: Network,

src/agent/state/api.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use {
5353
mpsc,
5454
RwLock,
5555
},
56+
tracing::instrument,
5657
};
5758

5859
// TODO: implement Display on PriceStatus and then just call PriceStatus::to_string
@@ -382,6 +383,10 @@ where
382383
.map_err(|_| anyhow!("failed to send update to local store"))
383384
}
384385

386+
#[instrument(skip(self, update), fields(update = match update {
387+
Update::ProductAccountUpdate { account_key, .. } => account_key,
388+
Update::PriceAccountUpdate { account_key, .. } => account_key,
389+
}.to_string()))]
385390
async fn update_global_price(&self, network: Network, update: &Update) -> Result<()> {
386391
GlobalStore::update(self, network, update)
387392
.await

0 commit comments

Comments
 (0)