Skip to content

Commit 78a5f14

Browse files
[STREAM-594] Generate Unique Namespace for Tests (#519)
* Generate Unique Namespace for Tests * Improve Comments * Remove Unused Import
1 parent a83eaa8 commit 78a5f14

File tree

7 files changed

+116
-53
lines changed

7 files changed

+116
-53
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ contains a `Makefile` with frequently run development commands:
4444

4545
- `devenv sync` Run setup tasks to create and configure your development
4646
environment.
47-
- `make test` Run rust tests.
47+
- `make unit-test` Run rust tests.
4848
- `make integration-test` Run end to end tests.
4949
- `make format` Format rust code with `cargo fmt` and `cargo clippy`.
5050

benches/store_bench.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use taskbroker::{
77
store::inflight_activation::{
88
InflightActivationStatus, InflightActivationStore, InflightActivationStoreConfig,
99
},
10-
test_utils::{generate_temp_filename, make_activations},
10+
test_utils::{
11+
generate_temp_filename, generate_unique_namespace, make_activations_with_namespace,
12+
},
1113
};
1214
use tokio::task::JoinSet;
1315

@@ -36,7 +38,9 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) {
3638
.unwrap(),
3739
);
3840

39-
for chunk in make_activations(num_activations).chunks(1024) {
41+
let namespace = generate_unique_namespace();
42+
43+
for chunk in make_activations_with_namespace(namespace.clone(), num_activations).chunks(1024) {
4044
store.store(chunk.to_vec()).await.unwrap();
4145
}
4246

@@ -48,11 +52,13 @@ async fn get_pending_activations(num_activations: u32, num_workers: u32) {
4852
let mut join_set = JoinSet::new();
4953
for _ in 0..num_workers {
5054
let store = store.clone();
55+
let ns = namespace.clone();
56+
5157
join_set.spawn(async move {
5258
let mut num_activations_processed = 0;
5359

5460
while store
55-
.get_pending_activation(Some("namespace"))
61+
.get_pending_activation(Some(&ns))
5662
.await
5763
.unwrap()
5864
.is_some()
@@ -81,6 +87,7 @@ async fn set_status(num_activations: u32, num_workers: u32) {
8187
} else {
8288
generate_temp_filename()
8389
};
90+
8491
let store = Arc::new(
8592
InflightActivationStore::new(
8693
&url,
@@ -95,7 +102,9 @@ async fn set_status(num_activations: u32, num_workers: u32) {
95102
.unwrap(),
96103
);
97104

98-
for chunk in make_activations(num_activations).chunks(1024) {
105+
let namespace = generate_unique_namespace();
106+
107+
for chunk in make_activations_with_namespace(namespace, num_activations).chunks(1024) {
99108
store.store(chunk.to_vec()).await.unwrap();
100109
}
101110

src/kafka/deserialize_activation.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,9 @@ mod tests {
107107
use rdkafka::{Timestamp, message::OwnedMessage};
108108
use sentry_protos::taskbroker::v1::TaskActivation;
109109

110-
use crate::store::inflight_activation::InflightActivationStatus;
110+
use crate::{
111+
store::inflight_activation::InflightActivationStatus, test_utils::generate_unique_namespace,
112+
};
111113

112114
use super::{Config, DeserializeActivationConfig, new};
113115

@@ -121,7 +123,7 @@ mod tests {
121123
#[allow(deprecated)]
122124
let activation = TaskActivation {
123125
id: "id_0".into(),
124-
namespace: "namespace".into(),
126+
namespace: generate_unique_namespace(),
125127
taskname: "taskname".into(),
126128
parameters: "{}".into(),
127129
headers: HashMap::new(),
@@ -165,7 +167,7 @@ mod tests {
165167
#[allow(deprecated)]
166168
let activation = TaskActivation {
167169
id: "id_0".into(),
168-
namespace: "namespace".into(),
170+
namespace: generate_unique_namespace(),
169171
taskname: "taskname".into(),
170172
parameters: "{}".into(),
171173
headers: HashMap::new(),
@@ -210,7 +212,7 @@ mod tests {
210212
#[allow(deprecated)]
211213
let activation = TaskActivation {
212214
id: "id_0".into(),
213-
namespace: "namespace".into(),
215+
namespace: generate_unique_namespace(),
214216
taskname: "taskname".into(),
215217
parameters: "{}".into(),
216218
headers: HashMap::new(),
@@ -255,7 +257,7 @@ mod tests {
255257
#[allow(deprecated)]
256258
let activation = TaskActivation {
257259
id: "id_0".into(),
258-
namespace: "namespace".into(),
260+
namespace: generate_unique_namespace(),
259261
taskname: "taskname".into(),
260262
parameters: "{}".into(),
261263
headers: HashMap::new(),
@@ -301,7 +303,7 @@ mod tests {
301303
#[allow(deprecated)]
302304
let activation = TaskActivation {
303305
id: "id_0".into(),
304-
namespace: "namespace".into(),
306+
namespace: generate_unique_namespace(),
305307
taskname: "taskname".into(),
306308
parameters: "{}".into(),
307309
headers: HashMap::new(),

src/kafka/inflight_activation_batcher.rs

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,9 @@ mod tests {
224224
use sentry_protos::taskbroker::v1::{OnAttemptsExceeded, TaskActivation};
225225
use std::sync::Arc;
226226

227-
use crate::store::inflight_activation::InflightActivationStatus;
227+
use crate::{
228+
store::inflight_activation::InflightActivationStatus, test_utils::generate_unique_namespace,
229+
};
228230

229231
#[tokio::test]
230232
async fn test_drop_task_due_to_killswitch() {
@@ -244,11 +246,13 @@ demoted_namespaces:
244246
runtime_config,
245247
);
246248

249+
let namespace = generate_unique_namespace();
250+
247251
let inflight_activation_0 = InflightActivation {
248252
id: "0".to_string(),
249253
activation: TaskActivation {
250254
id: "0".to_string(),
251-
namespace: "namespace".to_string(),
255+
namespace: namespace.clone(),
252256
taskname: "task_to_be_filtered".to_string(),
253257
parameters: "{}".to_string(),
254258
headers: HashMap::new(),
@@ -270,7 +274,7 @@ demoted_namespaces:
270274
delay_until: None,
271275
processing_deadline: None,
272276
at_most_once: false,
273-
namespace: "namespace".to_string(),
277+
namespace: namespace.clone(),
274278
taskname: "task_to_be_filtered".to_string(),
275279
on_attempts_exceeded: OnAttemptsExceeded::Discard,
276280
};
@@ -290,11 +294,13 @@ demoted_namespaces:
290294
runtime_config,
291295
);
292296

297+
let namespace = generate_unique_namespace();
298+
293299
let inflight_activation_0 = InflightActivation {
294300
id: "0".to_string(),
295301
activation: TaskActivation {
296302
id: "0".to_string(),
297-
namespace: "namespace".to_string(),
303+
namespace: namespace.clone(),
298304
taskname: "task_to_be_filtered".to_string(),
299305
parameters: "{}".to_string(),
300306
headers: HashMap::new(),
@@ -316,7 +322,7 @@ demoted_namespaces:
316322
delay_until: None,
317323
processing_deadline: None,
318324
at_most_once: false,
319-
namespace: "namespace".to_string(),
325+
namespace: namespace.clone(),
320326
taskname: "task_to_be_filtered".to_string(),
321327
on_attempts_exceeded: OnAttemptsExceeded::Discard,
322328
};
@@ -339,11 +345,13 @@ demoted_namespaces:
339345
runtime_config,
340346
);
341347

348+
let namespace = generate_unique_namespace();
349+
342350
let inflight_activation_0 = InflightActivation {
343351
id: "0".to_string(),
344352
activation: TaskActivation {
345353
id: "0".to_string(),
346-
namespace: "namespace".to_string(),
354+
namespace: namespace.clone(),
347355
taskname: "taskname".to_string(),
348356
parameters: "{}".to_string(),
349357
headers: HashMap::new(),
@@ -365,7 +373,7 @@ demoted_namespaces:
365373
delay_until: None,
366374
processing_deadline: None,
367375
at_most_once: false,
368-
namespace: "namespace".to_string(),
376+
namespace: namespace.clone(),
369377
taskname: "taskname".to_string(),
370378
on_attempts_exceeded: OnAttemptsExceeded::Discard,
371379
};
@@ -390,11 +398,13 @@ demoted_namespaces:
390398
runtime_config,
391399
);
392400

401+
let namespace = generate_unique_namespace();
402+
393403
let inflight_activation_0 = InflightActivation {
394404
id: "0".to_string(),
395405
activation: TaskActivation {
396406
id: "0".to_string(),
397-
namespace: "namespace".to_string(),
407+
namespace: namespace.clone(),
398408
taskname: "taskname".to_string(),
399409
parameters: "{}".to_string(),
400410
headers: HashMap::new(),
@@ -416,7 +426,7 @@ demoted_namespaces:
416426
delay_until: None,
417427
processing_deadline: None,
418428
at_most_once: false,
419-
namespace: "namespace".to_string(),
429+
namespace: namespace.clone(),
420430
taskname: "taskname".to_string(),
421431
on_attempts_exceeded: OnAttemptsExceeded::Discard,
422432
};
@@ -425,7 +435,7 @@ demoted_namespaces:
425435
id: "1".to_string(),
426436
activation: TaskActivation {
427437
id: "1".to_string(),
428-
namespace: "namespace".to_string(),
438+
namespace: namespace.clone(),
429439
taskname: "taskname".to_string(),
430440
parameters: "{}".to_string(),
431441
headers: HashMap::new(),
@@ -447,7 +457,7 @@ demoted_namespaces:
447457
delay_until: None,
448458
processing_deadline: None,
449459
at_most_once: false,
450-
namespace: "namespace".to_string(),
460+
namespace: namespace.clone(),
451461
taskname: "taskname".to_string(),
452462
on_attempts_exceeded: OnAttemptsExceeded::Discard,
453463
};

0 commit comments

Comments
 (0)