Skip to content

Commit 2cd0d32

Browse files
authored
Merge pull request #7 from rex-rs/membench
- tools/memcached_benchmark: refactor into multiple modules Signed-off-by: Jinghao Jia <jinghao7@illinois.edu>
2 parents 9fdeab8 + e423313 commit 2cd0d32

File tree

6 files changed

+580
-524
lines changed

6 files changed

+580
-524
lines changed
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
use clap::{Parser, Subcommand};
2+
3+
use crate::Protocol;
4+
5+
#[derive(Parser)]
6+
#[command(author, version, about, long_about = None)]
7+
pub(crate) struct Cli {
8+
#[command(subcommand)]
9+
pub(crate) command: Commands,
10+
}
11+
12+
#[derive(Debug, Subcommand)]
13+
pub(crate) enum Commands {
14+
#[command(arg_required_else_help = true)]
15+
Bench {
16+
/// memcached server addr
17+
#[arg(short, long, required = true)]
18+
server_address: String,
19+
20+
#[arg(short, long, default_value = "11211")]
21+
port: String,
22+
23+
/// key size to generate random memcached key
24+
#[arg(short, long, default_value = "16")]
25+
key_size: usize,
26+
27+
/// value size to generate random memcached value
28+
#[arg(short, long, default_value = "32")]
29+
value_size: usize,
30+
31+
/// verify the value after get command
32+
#[arg(long, default_value = "false")]
33+
validate: bool,
34+
35+
/// number of test entries to generate
36+
#[arg(short, long, default_value = "100000")]
37+
nums: usize,
38+
39+
/// number of threads to run
40+
#[arg(short, long, default_value = "4")]
41+
threads: usize,
42+
43+
/// udp or tcp protocol for memcached
44+
#[arg(short = 'l', long, default_value_t = Protocol::Udp , value_enum)]
45+
protocol: Protocol,
46+
47+
/// number of dict entries to generate
48+
#[arg(short, long, default_value = "1000000")]
49+
dict_entries: usize,
50+
51+
/// load the prepared test_entries from disk
52+
#[arg(long, default_value = "false")]
53+
load_bench_entries: bool,
54+
55+
/// skip set memcached value if the data is already imported
56+
#[arg(long, default_value = "false")]
57+
skip_set: bool,
58+
59+
/// bounded mpsc channel for communicating between asynchronous tasks
60+
/// with backpressure
61+
#[arg(long, default_value = "200")]
62+
pipeline: usize,
63+
64+
/// dict path to load
65+
#[arg(
66+
short = 'f',
67+
long,
68+
default_value = "test_dict.yml.zst",
69+
conflicts_with = "key_size",
70+
conflicts_with = "dict_entries"
71+
)]
72+
dict_path: String,
73+
},
74+
GenTestdict {
75+
/// key size to generate random memcached key
76+
#[arg(short, long, default_value = "16")]
77+
key_size: usize,
78+
79+
/// value size to generate random memcached value
80+
#[arg(short, long, default_value = "32")]
81+
value_size: usize,
82+
83+
/// number of dict entries to generate
84+
#[arg(short, long, default_value = "1000000")]
85+
dict_entries: usize,
86+
87+
/// dict path to store
88+
#[arg(short = 'f', long, default_value = "test_dict.yml.zst")]
89+
dict_path: String,
90+
},
91+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
use std::{collections::HashMap, mem::size_of_val, sync::Arc};
2+
3+
use log::{debug, info};
4+
use rand::{
5+
Rng,
6+
distr::{Alphanumeric, SampleString},
7+
};
8+
use rand_chacha::{ChaCha8Rng, rand_core::SeedableRng};
9+
use rand_distr::Zipf;
10+
use rayon::prelude::*;
11+
12+
use crate::{Protocol, fs::write_hashmap_to_file};
13+
14+
const SEED: u64 = 12312;
15+
16+
/// Generate random string of specified length
17+
pub(crate) fn generate_random_str(len: usize) -> String {
18+
Alphanumeric.sample_string(&mut rand::rng(), len)
19+
}
20+
21+
/// Generate test dictionary for memcached with random keys and values
22+
pub(crate) fn generate_memcached_test_dict(
23+
key_size: usize,
24+
value_size: usize,
25+
nums: usize,
26+
) -> HashMap<String, String> {
27+
// random generate dict for memcached test
28+
(0..nums)
29+
.into_par_iter()
30+
.map(|_| {
31+
(
32+
generate_random_str(key_size),
33+
generate_random_str(value_size),
34+
)
35+
})
36+
.collect()
37+
}
38+
39+
/// Generate test dict and write to disk
40+
/// # Arguments
41+
/// * `key_size` - key size
42+
/// * `value_size` - value size
43+
/// * `nums` - number of entries
44+
/// * `dict_path` - dict path to store
45+
/// # Returns
46+
/// * `Result` - Result<HashMap<String, String>, anyhow::Error>
47+
/// # Example
48+
/// ```rust
49+
/// let test_dict = generate_test_dict_write_to_disk(16, 32, 100000, "test_dict.yml.zst");
50+
/// ```
51+
pub(crate) fn generate_test_dict_write_to_disk(
52+
key_size: usize,
53+
value_size: usize,
54+
nums: usize,
55+
dict_path: &str,
56+
) -> anyhow::Result<HashMap<String, String>> {
57+
let test_dict = generate_memcached_test_dict(key_size, value_size, nums);
58+
debug!("test dict len: {}", test_dict.len());
59+
if let Some((key, value)) = test_dict.iter().next() {
60+
debug!("test dict key size: {}", size_of_val(key.as_str()));
61+
debug!("test dict value size: {}", size_of_val(value.as_str()));
62+
} else {
63+
return Err(anyhow::anyhow!("test dict is empty"));
64+
}
65+
write_hashmap_to_file(&test_dict, dict_path)?;
66+
info!("write test dict to path {}", dict_path);
67+
Ok(test_dict)
68+
}
69+
70+
/// Generate test entries with Zipf distribution for benchmarking
71+
pub(crate) fn generate_test_entries(
72+
test_dict: Arc<HashMap<Arc<String>, Arc<String>>>,
73+
nums: usize,
74+
) -> Vec<(Arc<String>, Arc<String>, Protocol)> {
75+
let mut rng = ChaCha8Rng::seed_from_u64(SEED);
76+
let zipf = Zipf::new((test_dict.len() - 1) as f64, 0.99).unwrap();
77+
78+
let keys: Vec<Arc<String>> = test_dict.keys().cloned().collect();
79+
(0..nums)
80+
.map(|idx| {
81+
let key = &keys[rng.sample(zipf) as usize];
82+
let value = test_dict.get(key).unwrap();
83+
// every 31 element is tcp. udp:tcp = 30:1
84+
let protocol = if idx % 31 == 30 {
85+
Protocol::Tcp
86+
} else {
87+
Protocol::Udp
88+
};
89+
(key.clone(), value.clone(), protocol)
90+
})
91+
.collect()
92+
}
93+
94+
/// Analyze the statistics of test entries
95+
#[allow(dead_code)]
96+
pub(crate) fn test_entries_statistics(
97+
test_entries: Arc<Vec<(&String, &String, Protocol)>>,
98+
) {
99+
let mut udp_count: usize = 0;
100+
let mut tcp_count: usize = 0;
101+
102+
// analyze the key distribution base on the frequency
103+
let mut key_frequency = HashMap::new();
104+
105+
// only get the first element in the tuple
106+
test_entries.iter().for_each(|(key, _, proto)| {
107+
*key_frequency.entry(key.to_string()).or_insert(0) += 1;
108+
if *proto == Protocol::Udp {
109+
udp_count += 1;
110+
} else {
111+
tcp_count += 1;
112+
}
113+
});
114+
115+
// sort by frequency
116+
let mut key_frequency: Vec<_> = key_frequency.into_iter().collect();
117+
key_frequency.sort_by(|a, b| a.1.cmp(&b.1));
118+
119+
// Display the frequency of each item
120+
for (key, count) in &key_frequency {
121+
if *count < key_frequency.len() / 1000 {
122+
continue;
123+
}
124+
info!("{}: {}", key, count);
125+
}
126+
127+
info!("tcp count: {}, udp count: {}", tcp_count, udp_count);
128+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
use std::{
2+
collections::HashMap,
3+
fs::File,
4+
io::{BufRead, BufReader, Write},
5+
path::Path,
6+
};
7+
8+
use anyhow::Result;
9+
use log::{debug, info};
10+
use serde::Serialize;
11+
12+
use crate::Protocol;
13+
14+
/// Write hashmap to a compressed file using zstd
15+
/// # Arguments
16+
/// * `hashmap` - hashmap to write to file
17+
/// * `file_path` - file path to write
18+
/// # Returns
19+
/// * `Result` - Result<(), anyhow::Error>
20+
pub(crate) fn write_hashmap_to_file<T: Serialize>(
21+
hashmap: &T,
22+
file_path: &str,
23+
) -> Result<()> {
24+
// Serialize the hashmap to a YAML string
25+
let serialized =
26+
serde_yaml::to_string(hashmap).expect("Failed to serialize");
27+
28+
// Create or open the file
29+
let file = File::create(file_path)?;
30+
31+
// Create a zstd encoder with compression level 7
32+
let mut encoder = zstd::stream::write::Encoder::new(file, 7)?;
33+
34+
// Write the YAML string to the file
35+
encoder.write_all(serialized.as_bytes())?;
36+
encoder.finish()?;
37+
38+
Ok(())
39+
}
40+
41+
/// Loads benchmark entries from disk from a zstd-compressed YAML file.
42+
///
43+
/// Although the entries are typically selected randomly from a test dictionary,
44+
/// there are cases where a fixed sequence of entries is needed to ensure
45+
/// consistent performance comparisons, and this function is utilized to
46+
/// retrieve the stored benchmark entries
47+
pub(crate) fn load_bench_entries_from_disk(
48+
path: &Path,
49+
) -> Vec<(String, String, Protocol)> {
50+
let file = std::fs::File::open(path).unwrap();
51+
let decoder = zstd::stream::read::Decoder::new(file).unwrap();
52+
let reader = std::io::BufReader::new(decoder);
53+
let test_entries: Vec<(String, String, Protocol)> =
54+
serde_yaml::from_reader(reader).unwrap();
55+
test_entries
56+
}
57+
58+
/// Load test dictionary from disk
59+
///
60+
/// This function opens a file located at `test_dict_path`, which is expected to
61+
/// be a zstd-compressed and valid YAML document key-value pair
62+
/// (`HashMap<String, String>`).
63+
pub(crate) fn load_test_dict(
64+
test_dict_path: &std::path::Path,
65+
) -> anyhow::Result<HashMap<String, String>> {
66+
// load dict from file if dict_path is not empty
67+
info!("loading dict from path {:?}", test_dict_path);
68+
let file = std::fs::File::open(test_dict_path)?;
69+
let decoder = zstd::stream::read::Decoder::new(file)?;
70+
let reader = BufReader::new(decoder);
71+
72+
// Deserialize the string into a HashMap
73+
let mut test_dict = HashMap::new();
74+
75+
reader.lines().for_each(|line| {
76+
let line = line.unwrap();
77+
// Assuming each line in your file is a valid YAML representing a
78+
// key-value pair
79+
let deserialized_map: HashMap<String, String> =
80+
serde_yaml::from_str(&line).unwrap();
81+
test_dict.extend(deserialized_map);
82+
});
83+
84+
debug!("test dict len: {}", test_dict.len());
85+
if let Some(key) = test_dict.keys().next() {
86+
debug!("test dict key size: {}", key.len());
87+
}
88+
if let Some(value) = test_dict.values().next() {
89+
debug!("test dict value size: {}", value.len());
90+
}
91+
Ok(test_dict)
92+
}

0 commit comments

Comments
 (0)