Skip to content

Commit f822da9

Browse files
authored
Add a binary crate to the autocompressor (#68)
1 parent b8e323c commit f822da9

File tree

3 files changed

+197
-1
lines changed

3 files changed

+197
-1
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,5 @@
33
*.data
44
*.old
55
**.sql
6-
*.csv
6+
*.csv
7+
**.log

auto_compressor/src/lib.rs

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,47 @@
66
//! to the database and uses these to enable it to incrementally work
77
//! on space reductions
88
9+
use std::str::FromStr;
10+
11+
use anyhow::Result;
12+
use synapse_compress_state::Level;
13+
914
pub mod manager;
1015
pub mod state_saving;
16+
17+
/// Helper struct for parsing the `default_levels` argument.
18+
///
19+
/// The compressor keeps track of a number of Levels, each of which have a maximum length,
20+
/// current length, and an optional current head (None if level is empty, Some if a head
21+
/// exists).
22+
///
23+
/// This is needed since FromStr cannot be implemented for structs
24+
/// that aren't defined in this scope
25+
#[derive(PartialEq, Debug)]
26+
pub struct LevelInfo(pub Vec<Level>);
27+
28+
// Implement FromStr so that an argument of the form "100,50,25"
29+
// can be used to create a vector of levels with max sizes 100, 50 and 25
30+
// For more info see the LevelState documentation in lib.rs
31+
impl FromStr for LevelInfo {
32+
type Err = &'static str;
33+
34+
fn from_str(s: &str) -> Result<Self, Self::Err> {
35+
// Stores the max sizes of each level
36+
let mut level_info: Vec<Level> = Vec::new();
37+
38+
// Split the string up at each comma
39+
for size_str in s.split(',') {
40+
// try and convert each section into a number
41+
// panic if that fails
42+
let size: usize = size_str
43+
.parse()
44+
.map_err(|_| "Not a comma separated list of numbers")?;
45+
// add this parsed number to the sizes struct
46+
level_info.push(Level::new(size));
47+
}
48+
49+
// Return the built up vector inside a LevelInfo struct
50+
Ok(LevelInfo(level_info))
51+
}
52+
}

auto_compressor/src/main.rs

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
//! This is a tool that uses the synapse_compress_state library to
2+
//! reduce the size of the synapse state_groups_state table in a postgres
3+
//! database.
4+
//!
5+
//! It adds the tables state_compressor_state and state_compressor_progress
6+
//! to the database and uses these to enable it to incrementally work
7+
//! on space reductions.
8+
//!
9+
//! This binary calls manager::compress_largest_rooms() with the arguments
10+
//! provided. That is, it compresses (in batches) the top N rooms ranked by
11+
//! amount of "uncompressed" state. This is measured by the number of rows in
12+
//! the state_groups_state table.
13+
//!
14+
//! After each batch, the rows processed are marked as "compressed" (using
15+
//! the state_compressor_progress table), and the program state is saved into
16+
//! the state_compressor_state table so that the compressor can seemlesly
17+
//! continue from where it left off.
18+
19+
#[global_allocator]
20+
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;
21+
22+
use auto_compressor::{manager, state_saving, LevelInfo};
23+
use clap::{crate_authors, crate_description, crate_name, crate_version, value_t, App, Arg};
24+
use log::LevelFilter;
25+
use std::{env, fs::OpenOptions};
26+
27+
/// Execution starts here
28+
fn main() {
29+
// setup the logger for the auto_compressor
30+
// The default can be overwritten with RUST_LOG
31+
// see the README for more information
32+
let log_file = OpenOptions::new()
33+
.append(true)
34+
.create(true)
35+
.open("auto_compressor.log")
36+
.unwrap_or_else(|e| panic!("Error occured while opening the log file: {}", e));
37+
38+
if env::var("RUST_LOG").is_err() {
39+
let mut log_builder = env_logger::builder();
40+
log_builder.target(env_logger::Target::Pipe(Box::new(log_file)));
41+
// Ensure panics still come through
42+
log_builder.filter_module("panic", LevelFilter::Error);
43+
// Only output errors from the synapse_compress state library
44+
log_builder.filter_module("synapse_compress_state", LevelFilter::Error);
45+
// Output log levels info and above from auto_compressor
46+
log_builder.filter_module("auto_compressor", LevelFilter::Info);
47+
log_builder.init();
48+
} else {
49+
// If RUST_LOG was set then use that
50+
let mut log_builder = env_logger::Builder::from_env("RUST_LOG");
51+
log_builder.target(env_logger::Target::Pipe(Box::new(log_file)));
52+
// Ensure panics still come through
53+
log_builder.filter_module("panic", LevelFilter::Error);
54+
log_builder.init();
55+
}
56+
log_panics::init();
57+
// Announce the start of the program to the logs
58+
log::info!("auto_compressor started");
59+
60+
// parse the command line arguments using the clap crate
61+
let arguments = App::new(crate_name!())
62+
.version(crate_version!())
63+
.author(crate_authors!("\n"))
64+
.about(crate_description!())
65+
.arg(
66+
Arg::with_name("postgres-url")
67+
.short("p")
68+
.value_name("URL")
69+
.help("The url for connecting to the postgres database.")
70+
.long_help(concat!(
71+
"The url for connecting to the postgres database.This should be of",
72+
" the form \"postgresql://username:[email protected]/database\""))
73+
.takes_value(true)
74+
.required(true),
75+
).arg(
76+
Arg::with_name("chunk_size")
77+
.short("c")
78+
.value_name("COUNT")
79+
.help("The maximum number of state groups to load into memroy at once")
80+
.long_help(concat!(
81+
"The number of state_groups to work on at once. All of the entries",
82+
" from state_groups_state are requested from the database",
83+
" for state groups that are worked on. Therefore small",
84+
" chunk sizes may be needed on machines with low memory.",
85+
" (Note: if the compressor fails to find space savings on the",
86+
" chunk as a whole (which may well happen in rooms with lots",
87+
" of backfill in) then the entire chunk is skipped.)",
88+
))
89+
.takes_value(true)
90+
.required(true),
91+
).arg(
92+
Arg::with_name("default_levels")
93+
.short("l")
94+
.value_name("LEVELS")
95+
.help("Sizes of each new level in the compression algorithm, as a comma separated list.")
96+
.long_help(concat!(
97+
"Sizes of each new level in the compression algorithm, as a comma separated list.",
98+
" The first entry in the list is for the lowest, most granular level,",
99+
" with each subsequent entry being for the next highest level.",
100+
" The number of entries in the list determines the number of levels",
101+
" that will be used.",
102+
"\nThe sum of the sizes of the levels effect the performance of fetching the state",
103+
" from the database, as the sum of the sizes is the upper bound on number of",
104+
" iterations needed to fetch a given set of state.",
105+
))
106+
.default_value("100,50,25")
107+
.takes_value(true)
108+
.required(false),
109+
).arg(
110+
Arg::with_name("number_of_chunks")
111+
.short("n")
112+
.value_name("CHUNKS_TO_COMPRESS")
113+
.help("The number of chunks to compress")
114+
.long_help("This many chunks of the database will be compressed ")
115+
.takes_value(true)
116+
.required(true),
117+
).get_matches();
118+
119+
// The URL of the database
120+
let db_url = arguments
121+
.value_of("postgres-url")
122+
.expect("A database url is required");
123+
124+
// The number of state groups to work on at once
125+
let chunk_size = arguments
126+
.value_of("chunk_size")
127+
.map(|s| s.parse().expect("chunk_size must be an integer"))
128+
.expect("A chunk size is required");
129+
130+
// The default structure to use when compressing
131+
let default_levels = value_t!(arguments, "default_levels", LevelInfo)
132+
.unwrap_or_else(|e| panic!("Unable to parse default levels: {}", e));
133+
134+
// The number of rooms to compress with this tool
135+
let number_of_chunks = arguments
136+
.value_of("number_of_chunks")
137+
.map(|s| s.parse().expect("number_of_chunks must be an integer"))
138+
.expect("number_of_chunks is required");
139+
140+
// Connect to the database and create the 2 tables this tool needs
141+
// (Note: if they already exist then this does nothing)
142+
let mut client = state_saving::connect_to_database(db_url)
143+
.unwrap_or_else(|e| panic!("Error occured while connecting to {}: {}", db_url, e));
144+
state_saving::create_tables_if_needed(&mut client)
145+
.unwrap_or_else(|e| panic!("Error occured while creating tables in database: {}", e));
146+
147+
// call compress_largest_rooms with the arguments supplied
148+
// panic if an error is produced
149+
manager::compress_chunks_of_database(db_url, chunk_size, &default_levels.0, number_of_chunks)
150+
.unwrap();
151+
152+
log::info!("auto_compressor finished");
153+
}

0 commit comments

Comments
 (0)