Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,19 @@ impl Executor for ConstantVUsExecutor {
tokio::sync::mpsc::channel(self.config.max_vus as usize);
let active_vus = Arc::new(AtomicI64::new(0));
// start all VUs

let (timeout_tx, _) = tokio::sync::broadcast::channel::<()>(self.config.max_vus as usize);
// sleep for duration and send timeout signal
{
let timeout_duration = self.config.duration;
let timeout_tx = timeout_tx.clone();
tokio::spawn(async move {
tokio::time::sleep(timeout_duration).await;
// send timeout signal to all VUs
let _ = timeout_tx.send(());
});
}

for _ in 0..self.config.max_vus {
let mut requests_guard = requests.lock().await;
let request = Arc::from(requests_guard.generate_request());
Expand All @@ -80,6 +93,7 @@ impl Executor for ConstantVUsExecutor {
responses_tx.clone(),
end_tx.clone(),
stop_sender.clone(),
Some(timeout_tx.subscribe()),
)
.await;
active_vus.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Expand All @@ -105,7 +119,7 @@ impl Executor for ConstantVUsExecutor {
let request = Arc::from(requests_guard.generate_request());
drop(requests_guard);
active_vus.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
start_vu(self.backend.clone(), request, responses_tx.clone(), end_tx.clone(), stop_sender.clone()).await;
start_vu(self.backend.clone(), request, responses_tx.clone(), end_tx.clone(), stop_sender.clone(), Some(timeout_tx.subscribe())).await;
}
}
}=>{}
Expand All @@ -119,13 +133,23 @@ async fn start_vu(
responses_tx: UnboundedSender<TextGenerationAggregatedResponse>,
end_tx: Sender<bool>,
stop_sender: broadcast::Sender<()>,
mut timeout_stop_receiver: Option<broadcast::Receiver<()>>,
) -> JoinHandle<()> {
let mut stop_receiver = stop_sender.subscribe();
tokio::spawn(async move {
tokio::select! {
_ = stop_receiver.recv() => {
let _ = end_tx.send(true).await;
},
_ = async {
if let Some(ref mut receiver) = timeout_stop_receiver {
let _ = receiver.recv().await;
} else {
std::future::pending::<()>().await; // wait forever if no timeout receiver is provided
}
} => {
let _ = end_tx.send(true).await;
},
_ = async{
let (tx, mut rx): (Sender<TextGenerationAggregatedResponse>, Receiver<TextGenerationAggregatedResponse>) = tokio::sync::mpsc::channel(1);
trace!("VU started with request: {:?}", request);
Expand Down Expand Up @@ -213,7 +237,7 @@ impl Executor for ConstantArrivalRateExecutor {
if active_vus_thread.load(std::sync::atomic::Ordering::SeqCst) < max_vus as i64 {
let mut requests_guard = requests.lock().await;
let request = Arc::from(requests_guard.generate_request());
start_vu(backend.clone(), request.clone(), responses_tx.clone(), end_tx.clone(),stop_sender.clone()).await;
start_vu(backend.clone(), request.clone(), responses_tx.clone(), end_tx.clone(), stop_sender.clone(), None).await;
active_vus_thread.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
} else {
warn!("Max VUs reached, skipping request");
Expand Down