Skip to content

Commit 15ed000

Browse files
authored
Merge pull request #530 from tansu-io/529-perf-cli
perf cli wip
2 parents 44d2679 + c1f9cdf commit 15ed000

File tree

17 files changed

+1132
-26
lines changed

17 files changed

+1132
-26
lines changed

Cargo.lock

Lines changed: 41 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ members = [
88
"tansu-client",
99
"tansu-generator",
1010
"tansu-model",
11-
"tansu-otel",
11+
"tansu-otel", "tansu-perf",
1212
"tansu-proxy",
1313
"tansu-sans-io",
1414
"tansu-schema",
@@ -111,6 +111,7 @@ object_store = { version = "0.12.3", features = ["aws"] }
111111
opentelemetry = "0.30.0"
112112
opentelemetry-otlp = "0.30.0"
113113
opentelemetry-semantic-conventions = "0.30.0"
114+
opentelemetry-stdout = "0.30.0"
114115
opentelemetry_sdk = { version = "0.30.0", features = ["testing"] }
115116
ordered-float = "4.6"
116117
parquet = "55"
@@ -138,6 +139,7 @@ tansu-client = { version = "0.5.10", path = "tansu-client", default-features = f
138139
tansu-generator = { version = "0.5.10", path = "tansu-generator", default-features = false }
139140
tansu-model = { version = "0.5.10", path = "tansu-model", default-features = false }
140141
tansu-otel = { version = "0.5.10", path = "tansu-otel", default-features = false }
142+
tansu-perf = { version = "0.5.10", path = "tansu-perf", default-features = false }
141143
tansu-proxy = { version = "0.5.10", path = "tansu-proxy", default-features = false }
142144
tansu-sans-io = { version = "0.5.10", path = "tansu-sans-io", default-features = false }
143145
tansu-schema = { version = "0.5.10", path = "tansu-schema", default-features = false }

justfile

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,10 @@ customer-topic-generator *args: (generator "customer" args)
387387

388388
customer-duckdb-delta: (duckdb "\"select * from delta_scan('s3://lake/tansu.customer');\"")
389389

390+
broker-memory profile="profiling":
391+
cargo build --profile {{ profile }} --bin tansu
392+
./target/{{ replace(profile, "dev", "debug") }}/tansu broker --storage-engine=memory:// 2>&1 | tee broker.log
393+
390394
broker-null profile="profiling":
391395
cargo build --profile {{ profile }} --bin tansu
392396
./target/{{ replace(profile, "dev", "debug") }}/tansu --storage-engine=null://sink 2>&1 | tee broker.log
@@ -449,11 +453,13 @@ producer-perf throughput="1000" record_size="1024" num_records="100000":
449453

450454
producer-perf-10: (producer-perf "10")
451455

452-
producer-perf-1000: (producer-perf "1000")
456+
producer-perf-1000: (producer-perf "1000" "1024" "25000")
457+
458+
producer-perf-2000: (producer-perf "2000" "1024" "50000")
453459

454-
producer-perf-2000: (producer-perf "2000")
460+
producer-perf-3000: (producer-perf "3000" "1024" "75000")
455461

456-
producer-perf-3000: (producer-perf "3000")
462+
producer-perf-4000: (producer-perf "4000" "1024" "100000")
457463

458464
producer-perf-5000: (producer-perf "5000" "1024" "125000")
459465

@@ -471,16 +477,35 @@ producer-perf-15000: (producer-perf "15000" "1024" "375000")
471477

472478
producer-perf-20000: (producer-perf "20000" "1024" "500000")
473479

480+
producer-perf-30000: (producer-perf "30000" "1024" "750000")
481+
474482
producer-perf-40000: (producer-perf "40000" "1024" "1000000")
475483

476484
producer-perf-45000: (producer-perf "45000" "1024" "1100000")
477485

478486
producer-perf-50000: (producer-perf "50000" "1024" "1250000")
479487

488+
producer-perf-60000: (producer-perf "60000" "1024" "1500000")
489+
490+
producer-perf-70000: (producer-perf "70000" "1024" "1750000")
491+
492+
producer-perf-80000: (producer-perf "80000" "1024" "2000000")
493+
494+
producer-perf-90000: (producer-perf "90000" "1024" "2250000")
495+
480496
producer-perf-100000: (producer-perf "100000" "1024" "2500000")
481497

482498
producer-perf-200000: (producer-perf "200000" "1024" "5000000")
483499

500+
producer-perf-300000: (producer-perf "300000" "1024" "7500000")
501+
502+
producer-perf-400000: (producer-perf "400000" "1024" "10000000")
503+
484504
producer-perf-500000: (producer-perf "500000" "1024" "12500000")
485505

506+
producer-perf-600000: (producer-perf "600000" "1024" "15000000")
507+
486508
producer-perf-1000000: (producer-perf "1000000" "1024" "25000000")
509+
510+
ps-tansu-rss:
511+
ps -p $(pgrep tansu) -o rss= | awk '{print $1/1024 " MB"}'

tansu-broker/tests/metadata.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub async fn topics_none(
9393
.iter()
9494
.map(|partition| partition.leader_epoch)
9595
.inspect(|leader_epoch| debug!(leader_epoch))
96-
.all(|leader_epoch| leader_epoch == Some(-1))
96+
.all(|leader_epoch| leader_epoch == Some(0))
9797
);
9898

9999
assert!(
@@ -202,7 +202,7 @@ pub async fn topics_some_empty(
202202
.iter()
203203
.map(|partition| partition.leader_epoch)
204204
.inspect(|leader_epoch| debug!(leader_epoch))
205-
.all(|leader_epoch| leader_epoch == Some(-1))
205+
.all(|leader_epoch| leader_epoch == Some(0))
206206
);
207207

208208
assert!(
@@ -311,7 +311,7 @@ pub async fn topics_some_matching_by_name(
311311
.iter()
312312
.map(|partition| partition.leader_epoch)
313313
.inspect(|leader_epoch| debug!(leader_epoch))
314-
.all(|leader_epoch| leader_epoch == Some(-1))
314+
.all(|leader_epoch| leader_epoch == Some(0))
315315
);
316316

317317
assert!(
@@ -459,7 +459,7 @@ pub async fn topics_some_matching_by_id(
459459
.iter()
460460
.map(|partition| partition.leader_epoch)
461461
.inspect(|leader_epoch| debug!(leader_epoch))
462-
.all(|leader_epoch| leader_epoch == Some(-1))
462+
.all(|leader_epoch| leader_epoch == Some(0))
463463
);
464464

465465
assert!(

tansu-broker/tests/topic.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ pub async fn create_describe_topic_partitions_by_id(
115115
for partition in responses[0].partitions.as_deref().unwrap_or_default() {
116116
assert_eq!(ErrorCode::None, ErrorCode::try_from(partition.error_code)?);
117117
assert_eq!(broker_id, partition.leader_id);
118-
assert_eq!(-1, partition.leader_epoch);
118+
assert_eq!(0, partition.leader_epoch);
119119
assert_eq!(
120120
Some(vec![broker_id; replication_factor as usize]),
121121
partition.replica_nodes
@@ -186,7 +186,7 @@ pub async fn create_describe_topic_partitions_by_name(
186186
for partition in responses[0].partitions.as_deref().unwrap_or_default() {
187187
assert_eq!(ErrorCode::None, ErrorCode::try_from(partition.error_code)?);
188188
assert_eq!(broker_id, partition.leader_id);
189-
assert_eq!(-1, partition.leader_epoch);
189+
assert_eq!(0, partition.leader_epoch);
190190
assert_eq!(
191191
Some(vec![broker_id; replication_factor as usize]),
192192
partition.replica_nodes

tansu-cli/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ regex.workspace = true
3232
tansu-broker.workspace = true
3333
tansu-cat.workspace = true
3434
tansu-generator.workspace = true
35+
tansu-perf.workspace = true
3536
tansu-proxy.workspace = true
3637
tansu-sans-io.workspace = true
3738
tansu-schema.workspace = true

tansu-cli/src/cli.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use tracing::debug;
3131
mod broker;
3232
mod cat;
3333
mod generator;
34+
mod perf;
3435
mod proxy;
3536
mod topic;
3637

@@ -97,6 +98,9 @@ enum Command {
9798
/// Traffic Generator for schema backed topics
9899
Generator(Box<generator::Arg>),
99100

101+
/// Performance
102+
Perf(Box<perf::Arg>),
103+
100104
/// Apache Kafka compatible proxy
101105
Proxy(Box<proxy::Arg>),
102106

@@ -132,6 +136,12 @@ impl Cli {
132136
.inspect(|result| debug!(?result))
133137
.inspect_err(|err| debug!(?err)),
134138

139+
Command::Perf(arg) => arg
140+
.main()
141+
.await
142+
.inspect(|result| debug!(?result))
143+
.inspect_err(|err| debug!(?err)),
144+
135145
Command::Proxy(arg) => tansu_proxy::Proxy::main(
136146
arg.listener_url.into_inner(),
137147
arg.advertised_listener_url.into_inner(),

tansu-cli/src/cli/perf.rs

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright ⓒ 2024-2025 Peter Morgan <peter.james.morgan@gmail.com>
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use clap::{Args, Subcommand};
16+
use human_units::iec::iec_unit;
17+
use tansu_perf::Perf;
18+
use tansu_sans_io::ErrorCode;
19+
use url::Url;
20+
21+
use crate::{EnvVarExp, Result, cli::DEFAULT_BROKER};
22+
23+
#[derive(Clone, Debug, Subcommand)]
24+
pub(super) enum Command {
25+
/// Produce messages
26+
Produce {
27+
/// The partition to produce messages into
28+
#[arg(long, default_value = "0")]
29+
partition: i32,
30+
31+
/// Message batch size used by every producer
32+
#[arg(long, default_value = "1")]
33+
batch_size: u32,
34+
35+
/// Message batch size used by every producer
36+
#[arg(long, default_value = "1k", value_parser=clap::value_parser!(human_units::Size))]
37+
record_size: human_units::Size,
38+
39+
/// The maximum number of messages per second
40+
#[clap(long, group = "output")]
41+
per_second: Option<u32>,
42+
43+
/// Message throughput
44+
#[clap(long, group = "output")]
45+
throughput: Option<Throughput>,
46+
47+
/// The number of producers generating messages
48+
#[arg(long, default_value = "1")]
49+
producers: u32,
50+
51+
/// Stop sending messages after this time
52+
#[arg(long, default_value = "1m", value_parser=clap::value_parser!(human_units::Duration))]
53+
duration: Option<human_units::Duration>,
54+
},
55+
56+
Consume,
57+
}
58+
59+
#[iec_unit(symbol = "B/s")]
60+
#[derive(Copy, Clone, Debug, Default, Eq, Hash, Ord, PartialEq, PartialOrd)]
61+
pub(super) struct Throughput(pub u32);
62+
63+
#[derive(Args, Clone, Debug)]
64+
pub(super) struct Arg {
65+
#[command(subcommand)]
66+
command: Command,
67+
68+
/// The URL of the broker
69+
#[arg(long, default_value = DEFAULT_BROKER, env = "ADVERTISED_LISTENER_URL")]
70+
broker: EnvVarExp<Url>,
71+
72+
/// The topic to generate messages into
73+
#[clap(value_parser)]
74+
topic: String,
75+
}
76+
77+
impl Arg {
78+
pub(super) async fn main(self) -> Result<ErrorCode> {
79+
match self.command {
80+
Command::Produce {
81+
partition,
82+
batch_size,
83+
record_size,
84+
per_second,
85+
throughput,
86+
producers,
87+
duration,
88+
} => Perf::builder()
89+
.broker(self.broker.into_inner())
90+
.topic(self.topic)
91+
.partition(partition)
92+
.batch_size(batch_size)
93+
.record_size(record_size.0 as usize)
94+
.per_second(per_second)
95+
.throughput(throughput.map(|throughput| throughput.0))
96+
.producers(producers)
97+
.duration(duration.map(|duration| duration.0))
98+
.build()
99+
.main()
100+
.await
101+
.map_err(Into::into),
102+
103+
Command::Consume => todo!(),
104+
}
105+
}
106+
}

tansu-cli/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub enum Error {
2525
Cat(Box<tansu_cat::Error>),
2626
DotEnv(#[from] dotenv::Error),
2727
Generate(#[from] tansu_generator::Error),
28+
Perf(#[from] tansu_perf::Error),
2829
Proxy(#[from] tansu_proxy::Error),
2930
Regex(#[from] regex::Error),
3031
Schema(Box<tansu_schema::Error>),

tansu-perf/Cargo.toml

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
[package]
2+
name = "tansu-perf"
3+
description = "Performance producer/consumer"
4+
authors.workspace = true
5+
edition.workspace = true
6+
homepage.workspace = true
7+
include.workspace = true
8+
keywords.workspace = true
9+
license.workspace = true
10+
repository.workspace = true
11+
version.workspace = true
12+
13+
[dependencies]
14+
bytes.workspace = true
15+
futures-core.workspace = true
16+
futures-util.workspace = true
17+
futures.workspace = true
18+
getrandom.workspace = true
19+
governor.workspace = true
20+
human-units.workspace = true
21+
nonzero_ext.workspace = true
22+
opentelemetry-otlp.workspace = true
23+
opentelemetry-semantic-conventions.workspace = true
24+
opentelemetry-stdout.workspace = true
25+
opentelemetry.workspace = true
26+
opentelemetry_sdk.workspace = true
27+
rama.workspace = true
28+
serde_json.workspace = true
29+
tansu-client.workspace = true
30+
tansu-sans-io.workspace = true
31+
thiserror.workspace = true
32+
tokio-util.workspace = true
33+
tokio.workspace = true
34+
tracing-subscriber.workspace = true
35+
tracing.workspace = true
36+
url.workspace = true
37+
38+
[lints]
39+
workspace = true

0 commit comments

Comments
 (0)