Skip to content

Commit e199600

Browse files
committed
Removing worker group
1 parent ff9eeba commit e199600

File tree

4 files changed

+19
-65
lines changed

4 files changed

+19
-65
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/config.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,6 @@ key_store.pyth_price_store_program_key = "3m6sv6HGqEbuyLV84mD7rJn4MAC9LhUa1y1AUN
7373
# takes to fetch all symbols.
7474
# oracle.max_lookup_batch_size = 100
7575

76-
# Number of workers used to wait for the handle_price_account_update
77-
# oracle.handle_price_account_update_worker_poll_size = 25
78-
# Channel size used to wait for the handle_price_account_update
79-
# oracle.handle_price_account_update_channel_size = 1000
8076
# Minimum time for a subscriber to run
8177
# oracle.subscriber_finished_min_time = "30s"
8278
# Time to sleep if the subscriber do not run for more than the minimum time

src/agent/services/oracle.rs

Lines changed: 8 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,6 @@ where
7070
)));
7171

7272
if config.oracle.subscriber_enabled {
73-
let number_of_workers = config.oracle.handle_price_account_update_worker_poll_size;
74-
let (sender, receiver) =
75-
tokio::sync::mpsc::channel(config.oracle.handle_price_account_update_channel_size);
7673
let min_elapsed_time = config.oracle.subscriber_finished_min_time;
7774
let sleep_time = config.oracle.subscriber_finished_sleep_time;
7875

@@ -84,7 +81,6 @@ where
8481
network,
8582
state.clone(),
8683
key_store.pyth_oracle_program_key,
87-
sender.clone(),
8884
)
8985
.await
9086
{
@@ -96,22 +92,6 @@ where
9692
}
9793
}
9894
}));
99-
100-
let receiver = Arc::new(tokio::sync::Mutex::new(receiver));
101-
for _ in 0..number_of_workers {
102-
let receiver = receiver.clone();
103-
handles.push(tokio::spawn(async move {
104-
loop {
105-
let mut receiver = receiver.lock().await;
106-
if let Some(task) = receiver.recv().await {
107-
drop(receiver);
108-
if let Err(err) = task.await {
109-
tracing::error!(%err, "error running price update");
110-
}
111-
}
112-
}
113-
}));
114-
}
11595
}
11696

11797
handles
@@ -128,7 +108,6 @@ async fn subscriber<S>(
128108
network: Network,
129109
state: Arc<S>,
130110
program_key: Pubkey,
131-
sender: tokio::sync::mpsc::Sender<tokio::task::JoinHandle<()>>,
132111
) -> Result<()>
133112
where
134113
S: Oracle,
@@ -156,17 +135,14 @@ where
156135
Some(account) => {
157136
let pubkey: Pubkey = update.value.pubkey.as_str().try_into()?;
158137
let state = state.clone();
159-
sender
160-
.send(tokio::spawn(async move {
161-
if let Err(err) =
162-
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
163-
.await
164-
{
165-
tracing::error!(?err, "Failed to handle account update");
166-
}
167-
}))
168-
.await
169-
.context("sending handle_price_account_update task to worker")?;
138+
tokio::spawn(async move {
139+
if let Err(err) =
140+
Oracle::handle_price_account_update(&*state, network, &pubkey, &account)
141+
.await
142+
{
143+
tracing::error!(?err, "Failed to handle account update");
144+
}
145+
});
170146
}
171147

172148
None => {

src/agent/state/oracle.rs

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,6 @@ pub struct Data {
138138
pub publisher_buffer_key: Option<Pubkey>,
139139
}
140140

141-
fn default_handle_price_account_update_channel_size() -> usize {
142-
1000
143-
}
144-
145-
fn default_handle_price_account_update_worker_poll_size() -> usize {
146-
25
147-
}
148-
149141
fn default_subscriber_finished_min_time() -> Duration {
150142
Duration::from_secs(30)
151143
}
@@ -176,35 +168,25 @@ pub struct Config {
176168
/// trading off overall time it takes to fetch all symbols.
177169
pub max_lookup_batch_size: usize,
178170

179-
/// Number of workers used to wait for the handle_price_account_update
180-
#[serde(default = "default_handle_price_account_update_worker_poll_size")]
181-
pub handle_price_account_update_worker_poll_size: usize,
182-
/// Channel size used to wait for the handle_price_account_update
183-
#[serde(default = "default_handle_price_account_update_channel_size")]
184-
pub handle_price_account_update_channel_size: usize,
185171
/// Minimum time for a subscriber to run
186172
#[serde(default = "default_subscriber_finished_min_time")]
187-
pub subscriber_finished_min_time: Duration,
173+
pub subscriber_finished_min_time: Duration,
188174
/// Time to sleep if the subscriber do not run for more than the minimum time
189175
#[serde(default = "default_subscriber_finished_sleep_time")]
190-
pub subscriber_finished_sleep_time: Duration,
176+
pub subscriber_finished_sleep_time: Duration,
191177
}
192178

193179
impl Default for Config {
194180
fn default() -> Self {
195181
Self {
196-
commitment: CommitmentLevel::Confirmed,
197-
poll_interval_duration: Duration::from_secs(5),
198-
subscriber_enabled: true,
199-
updates_channel_capacity: 10000,
200-
data_channel_capacity: 10000,
201-
max_lookup_batch_size: 100,
202-
handle_price_account_update_worker_poll_size:
203-
default_handle_price_account_update_worker_poll_size(),
204-
handle_price_account_update_channel_size:
205-
default_handle_price_account_update_channel_size(),
206-
subscriber_finished_min_time: default_subscriber_finished_min_time(),
207-
subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
182+
commitment: CommitmentLevel::Confirmed,
183+
poll_interval_duration: Duration::from_secs(5),
184+
subscriber_enabled: true,
185+
updates_channel_capacity: 10000,
186+
data_channel_capacity: 10000,
187+
max_lookup_batch_size: 100,
188+
subscriber_finished_min_time: default_subscriber_finished_min_time(),
189+
subscriber_finished_sleep_time: default_subscriber_finished_sleep_time(),
208190
}
209191
}
210192
}

0 commit comments

Comments
 (0)