Skip to content

Commit 9c4be5d

Browse files
authored
ref(stresstest): Distribute workloads across usecases and orgs (#91)
Makes the stresstest more realistic by writing multiple usecases and scopes. The name of each concurrent workload is used as usecase, which also mimics production behavior more closely. Also, each workload can specify a number of organizations (defaulting to 1), which is used as scope. Internally, we create a project-level scope, since this is what the real usecases will also be doing. We reuse the org ID as project ID.
1 parent 604a66f commit 9c4be5d

File tree

6 files changed

+87
-20
lines changed

6 files changed

+87
-20
lines changed

stresstest/example.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ duration: 5s
55
workloads:
66
- name: attachments
77
mode: throughput
8+
organizations: 1000
89
file_sizes:
910
p50: 50 KiB
1011
p99: 200 KiB

stresstest/src/config.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,19 @@ pub struct Workload {
1919
pub name: String,
2020
#[serde(default)]
2121
pub concurrency: usize,
22+
#[serde(default = "default_organizations")]
23+
pub organizations: u64,
2224
#[serde(default)]
2325
pub mode: WorkloadMode,
2426
pub file_sizes: FileSizes,
2527
#[serde(default)]
2628
pub actions: Actions,
2729
}
2830

31+
fn default_organizations() -> u64 {
32+
1
33+
}
34+
2935
#[derive(Debug, Deserialize)]
3036
pub struct FileSizes {
3137
pub p50: ByteSize,

stresstest/src/http.rs

Lines changed: 40 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
//! Contains a remote implementation using HTTP to interact with objectstore.
22
3+
use std::collections::BTreeMap;
4+
35
use futures::StreamExt;
46
use objectstore_client::{Client, ClientBuilder, GetResult};
57
use tokio::io::AsyncReadExt;
@@ -10,23 +12,29 @@ use crate::workload::Payload;
1012
/// A remote implementation using HTTP to interact with objectstore.
1113
#[derive(Debug)]
1214
pub struct HttpRemote {
13-
/// The Storage Client used to talk to our service.
14-
pub client: Client,
15+
remote: String,
16+
builders: BTreeMap<String, ClientBuilder>,
1517
}
1618

1719
impl HttpRemote {
1820
/// Creates a new `HttpRemote` instance with the given remote URL and a default client.
1921
pub fn new(remote: &str) -> Self {
20-
let client = ClientBuilder::new(remote, "stresstest")
21-
.unwrap()
22-
.for_organization(12345);
23-
Self { client }
22+
Self {
23+
remote: remote.to_owned(),
24+
builders: BTreeMap::new(),
25+
}
2426
}
2527

26-
pub(crate) async fn write(&self, payload: Payload) -> String {
28+
pub(crate) async fn write(
29+
&self,
30+
usecase: &str,
31+
organization_id: u64,
32+
payload: Payload,
33+
) -> String {
34+
let client = self.client(usecase, organization_id);
2735
let stream = ReaderStream::new(payload).boxed();
2836

29-
self.client
37+
client
3038
.put_stream(stream)
3139
.compression(None)
3240
.send()
@@ -35,8 +43,15 @@ impl HttpRemote {
3543
.key
3644
}
3745

38-
pub(crate) async fn read(&self, key: &str, mut payload: Payload) {
39-
let GetResult { stream, .. } = self.client.get(key).send().await.unwrap().unwrap();
46+
pub(crate) async fn read(
47+
&self,
48+
usecase: &str,
49+
organization_id: u64,
50+
key: &str,
51+
mut payload: Payload,
52+
) {
53+
let client = self.client(usecase, organization_id);
54+
let GetResult { stream, .. } = client.get(key).send().await.unwrap().unwrap();
4055
let mut reader = StreamReader::new(stream);
4156

4257
// TODO: both of these are currently buffering in-memory. we should use streaming here as well.
@@ -50,7 +65,20 @@ impl HttpRemote {
5065
}
5166
}
5267

53-
pub(crate) async fn delete(&self, key: String) {
54-
self.client.delete(&key).await.unwrap();
68+
pub(crate) async fn delete(&self, usecase: &str, organization_id: u64, key: &str) {
69+
let client = self.client(usecase, organization_id);
70+
client.delete(key).await.unwrap();
71+
}
72+
73+
/// Registers a new usecase that can be used by the workloads.
74+
pub fn register_usecase(&mut self, usecase: &str) {
75+
let builder = ClientBuilder::new(&self.remote, usecase).unwrap();
76+
self.builders.insert(usecase.to_owned(), builder);
77+
}
78+
79+
fn client(&self, usecase: &str, organization_id: u64) -> Client {
80+
// NB: Reuse the organization ID as project ID to create unique projects. Right now, we do
81+
// not benefit from simulating multiple projects per org.
82+
self.builders[usecase].for_project(organization_id, organization_id)
5583
}
5684
}

stresstest/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ async fn main() -> anyhow::Result<()> {
5050
.map(|w| {
5151
Workload::builder(w.name)
5252
.concurrency(w.concurrency)
53+
.organizations(w.organizations)
5354
.mode(w.mode)
5455
.size_distribution(w.file_sizes.p50.0, w.file_sizes.p99.0)
5556
.action_weights(w.actions.writes, w.actions.reads, w.actions.deletes)

stresstest/src/stresstest.rs

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,17 @@ use crate::workload::{Action, Workload, WorkloadMode};
1919
///
2020
/// The function runs all workloads concurrently, then prints metrics and finally deletes all
2121
/// objects from the remote.
22-
pub async fn run(remote: HttpRemote, workloads: Vec<Workload>, duration: Duration) -> Result<()> {
22+
pub async fn run(
23+
mut remote: HttpRemote,
24+
workloads: Vec<Workload>,
25+
duration: Duration,
26+
) -> Result<()> {
27+
for workload in &workloads {
28+
remote.register_usecase(&workload.name);
29+
}
30+
2331
let remote = Arc::new(remote);
32+
2433
// run the workloads concurrently
2534
let tasks: Vec<_> = workloads
2635
.into_iter()
@@ -76,12 +85,12 @@ pub async fn run(remote: HttpRemote, workloads: Vec<Workload>, duration: Duratio
7685
let start = Instant::now();
7786
let cleanup_timing = Arc::new(Mutex::new(DDSketch::default()));
7887
futures::stream::iter(files_to_cleanup)
79-
.for_each_concurrent(max_concurrency, |external_id| {
88+
.for_each_concurrent(max_concurrency, |(usecase, organization_id, object_key)| {
8089
let remote = remote.clone();
8190
let cleanup_timing = cleanup_timing.clone();
8291
async move {
8392
let start = Instant::now();
84-
remote.delete(external_id).await;
93+
remote.delete(&usecase, organization_id, &object_key).await;
8594
cleanup_timing
8695
.lock()
8796
.unwrap()
@@ -147,12 +156,16 @@ async fn run_workload(
147156
tokio::time::sleep(Duration::from_millis(10)).await;
148157
};
149158

159+
150160
let task = async move {
151161
let start = Instant::now();
152162
match action {
153163
Action::Write(internal_id, payload) => {
154164
let file_size = payload.len;
155-
let external_id = remote.write(payload).await;
165+
let usecase = workload.lock().unwrap().name.clone();
166+
let organization_id = workload.lock().unwrap().next_organization_id();
167+
let object_key = remote.write(&usecase, organization_id, payload).await;
168+
let external_id = (usecase, organization_id, object_key);
156169
workload.lock().unwrap().push_file(internal_id, external_id);
157170
let mut metrics = metrics.lock().unwrap();
158171
metrics.write_timing.add(start.elapsed().as_secs_f64());
@@ -161,14 +174,16 @@ async fn run_workload(
161174
}
162175
Action::Read(internal_id, external_id, payload) => {
163176
let file_size = payload.len;
164-
remote.read(&external_id, payload).await;
177+
let (usecase, organization_id, object_key) = &external_id;
178+
remote.read(usecase, *organization_id, object_key, payload).await;
165179
workload.lock().unwrap().push_file(internal_id, external_id);
166180
let mut metrics = metrics.lock().unwrap();
167181
metrics.read_timing.add(start.elapsed().as_secs_f64());
168182
metrics.bytes_read += file_size;
169183
}
170184
Action::Delete(external_id) => {
171-
remote.delete(external_id).await;
185+
let (usecase, organization_id, object_key) = &external_id;
186+
remote.delete(usecase, *organization_id, object_key).await;
172187
let mut metrics = metrics.lock().unwrap();
173188
metrics.delete_timing.add(start.elapsed().as_secs_f64());
174189
}

stresstest/src/workload.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ pub enum WorkloadMode {
3333
pub struct WorkloadBuilder {
3434
name: String,
3535
concurrency: usize,
36+
organizations: u64,
3637
mode: WorkloadMode,
3738
seed: u64,
3839

@@ -51,6 +52,12 @@ impl WorkloadBuilder {
5152
self
5253
}
5354

55+
/// The number of organizations to distribute the workload across.
56+
pub fn organizations(mut self, organizations: u64) -> Self {
57+
self.organizations = organizations;
58+
self
59+
}
60+
5461
/// The mode of the workload, either `weighted` or `throughput`.
5562
pub fn mode(mut self, mode: WorkloadMode) -> Self {
5663
self.mode = mode;
@@ -89,6 +96,7 @@ impl WorkloadBuilder {
8996
Workload {
9097
name: self.name,
9198
concurrency: self.concurrency,
99+
organizations: self.organizations,
92100
mode: self.mode,
93101

94102
rng,
@@ -117,6 +125,8 @@ pub struct Workload {
117125
pub(crate) name: String,
118126
/// The maximum number of concurrent operations that can be performed within this workload.
119127
pub(crate) concurrency: usize,
128+
/// The number of organizations to distribute the workload across.
129+
pub(crate) organizations: u64,
120130
/// The target throughput for the workload, in bytes per second. Overrides concurrency.
121131
pub(crate) mode: WorkloadMode,
122132

@@ -140,6 +150,7 @@ impl Workload {
140150
WorkloadBuilder {
141151
name: name.into(),
142152
concurrency: available_parallelism().unwrap().get(),
153+
organizations: 1,
143154
mode: WorkloadMode::default(),
144155
seed: rand::random(),
145156

@@ -233,6 +244,10 @@ impl Workload {
233244
}
234245
}
235246

247+
pub(crate) fn next_organization_id(&mut self) -> u64 {
248+
self.rng.next_u64() % self.organizations
249+
}
250+
236251
/// Adds a file to the internal store, so it can be yielded for reads or deletes.
237252
///
238253
/// This function has to be called for files when a write or read has completed.
@@ -262,8 +277,9 @@ impl fmt::Display for InternalId {
262277

263278
/// Unique identifier for an object in the remote storage.
264279
///
265-
/// These identifiers map to [`InternalId`]s.
266-
pub type ExternalId = String;
280+
/// The identifier consists of the usecase, organization ID and the object key. These key maps to
281+
/// [`InternalId`].
282+
pub type ExternalId = (String, u64, String);
267283

268284
/// An action that can be performed by the workload.
269285
#[derive(Debug)]

0 commit comments

Comments
 (0)