Skip to content

Commit 4c3d6bd

Browse files
authored
Add method to run the compressor starting from a particular compressor-state (#55)
1 parent d32f493 commit 4c3d6bd

File tree

4 files changed

+369
-17
lines changed

4 files changed

+369
-17
lines changed
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
use compressor_integration_tests::{
2+
add_contents_to_database, database_collapsed_states_match_map, database_structure_matches_map,
3+
empty_database,
4+
map_builder::{compressed_3_3_from_0_to_13_with_state, line_segments_with_state},
5+
DB_URL,
6+
};
7+
use serial_test::serial;
8+
use synapse_compress_state::{continue_run, Level};
9+
10+
// Tests the saving and continuing functionality
11+
// The compressor should produce the same results when run in one go
12+
// as when run in multiple stages
13+
#[test]
14+
#[serial(db)]
15+
fn continue_run_called_twice_same_as_run() {
16+
// This starts with the following structure
17+
//
18+
// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
19+
//
20+
// Each group i has state:
21+
// ('node','is', i)
22+
// ('group', j, 'seen') - for all j less than i
23+
let initial = line_segments_with_state(0, 13);
24+
25+
// Place this initial state into an empty database
26+
empty_database();
27+
add_contents_to_database("room1", &initial);
28+
29+
let db_url = DB_URL.to_string();
30+
let room_id = "room1".to_string();
31+
32+
// will run the compression in two batches
33+
let start = -1;
34+
let chunk_size = 7;
35+
36+
// compress in 3,3 level sizes
37+
// since the compressor hasn't been run before they are empty
38+
let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
39+
40+
// Run the compressor with those settings
41+
let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info);
42+
43+
// Assert that it stopped at 6 (i.e. after the 7 groups 0...6)
44+
assert_eq!(chunk_stats_1.last_compressed_group, 6);
45+
// structure should be the following at this point
46+
// (NOTE: only including compressed groups)
47+
//
48+
// 0 3\
49+
// 1 4 6
50+
// 2 5
51+
assert_eq!(
52+
chunk_stats_1.new_level_info,
53+
vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))]
54+
);
55+
56+
let start = 6;
57+
let chunk_size = 7;
58+
let level_info = chunk_stats_1.new_level_info.clone();
59+
60+
// Run the compressor with those settings
61+
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info);
62+
63+
// Assert that it stopped at 7
64+
assert_eq!(chunk_stats_2.last_compressed_group, 13);
65+
66+
// This should have created the following structure in the database
67+
// i.e. groups 6 and 9 should have changed from before
68+
// N.B. this saves 11 rows
69+
//
70+
// 0 3\ 12
71+
// 1 4 6\ 13
72+
// 2 5 7 9
73+
// 8 10
74+
// 11
75+
let expected = compressed_3_3_from_0_to_13_with_state();
76+
77+
// Check that the database still gives correct states for each group!
78+
assert!(database_collapsed_states_match_map(&initial));
79+
80+
// Check that the structure of the database matches the expected structure
81+
assert!(database_structure_matches_map(&expected))
82+
}

src/compressor.rs

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ use string_cache::DefaultAtom as Atom;
3636
use super::{collapse_state_maps, StateGroupEntry};
3737

3838
/// Holds information about a particular level.
39-
#[derive(Debug)]
40-
struct Level {
39+
#[derive(Debug, Clone, PartialEq, Eq)]
40+
pub struct Level {
4141
/// The maximum size this level is allowed to be
4242
max_length: usize,
4343
/// The (approximate) current chain length of this level. This is equivalent
@@ -57,11 +57,20 @@ impl Level {
5757
}
5858
}
5959

60+
/// Creates a new level from stored state
61+
pub fn restore(max_length: usize, current_chain_length: usize, current: Option<i64>) -> Level {
62+
Level {
63+
max_length,
64+
current_chain_length,
65+
current,
66+
}
67+
}
68+
6069
/// Update the current head of this level. If delta is true then it means
6170
/// that given state group will (probably) reference the previous head.
6271
///
6372
/// Panics if `delta` is true and the level is already full.
64-
pub fn update(&mut self, current: i64, delta: bool) {
73+
fn update(&mut self, current: i64, delta: bool) {
6574
self.current = Some(current);
6675

6776
if delta {
@@ -128,6 +137,35 @@ impl<'a> Compressor<'a> {
128137
compressor
129138
}
130139

140+
/// Creates a compressor and runs the compression algorithm.
141+
/// used when restoring compressor state from a previous run
142+
/// in which case the levels heads are also known
143+
pub fn compress_from_save(
144+
original_state_map: &'a BTreeMap<i64, StateGroupEntry>,
145+
// level_info: &[(usize, usize, Option<i64>)],
146+
level_info: &[Level],
147+
) -> Compressor<'a> {
148+
let levels = level_info
149+
.iter()
150+
.map(|l| Level::restore((*l).max_length, (*l).current_chain_length, (*l).current))
151+
.collect();
152+
153+
let mut compressor = Compressor {
154+
original_state_map,
155+
new_state_group_map: BTreeMap::new(),
156+
levels,
157+
stats: Stats::default(),
158+
};
159+
160+
compressor.create_new_tree();
161+
compressor
162+
}
163+
164+
/// Returns all the state required to save the compressor so it can be continued later
165+
pub fn get_level_info(&self) -> Vec<Level> {
166+
self.levels.clone()
167+
}
168+
131169
/// Actually runs the compression algorithm
132170
fn create_new_tree(&mut self) {
133171
if !self.new_state_group_map.is_empty() {

src/database.rs

Lines changed: 155 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,56 @@ use postgres_openssl::MakeTlsConnector;
1919
use rand::{distributions::Alphanumeric, thread_rng, Rng};
2020
use std::{borrow::Cow, collections::BTreeMap, fmt};
2121

22-
use crate::{generate_sql, Config};
22+
use crate::{compressor::Level, generate_sql};
2323

2424
use super::StateGroupEntry;
2525

2626
/// Fetch the entries in state_groups_state (and their prev groups) for a
2727
/// specific room.
2828
///
29-
/// - Connects to the database
30-
/// - Fetches the first [group] rows with group id after [min]
31-
/// - Recursively searches for missing predecessors and adds those
29+
/// Returns with the state_group map and the id of the last group that was used
30+
///
31+
/// # Arguments
32+
///
33+
/// * `room_id` - The ID of the room in the database
34+
/// * `db_url` - The URL of a Postgres database. This should be of the
35+
/// form: "postgresql://user:pass@domain:port/database"
36+
/// * `min_state_group` - If specified, then only fetch the entries for state
37+
/// groups greater than (but not equal) to this number. It
38+
/// also requires groups_to_compress to be specified
39+
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
40+
41+
pub fn get_data_from_db(
42+
db_url: &str,
43+
room_id: &str,
44+
min_state_group: Option<i64>,
45+
groups_to_compress: Option<i64>,
46+
max_state_group: Option<i64>,
47+
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
48+
// connect to the database
49+
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
50+
builder.set_verify(SslVerifyMode::NONE);
51+
let connector = MakeTlsConnector::new(builder.build());
52+
53+
let mut client = Client::connect(db_url, connector)
54+
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
55+
56+
let state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
57+
58+
load_map_from_db(
59+
&mut client,
60+
room_id,
61+
min_state_group,
62+
groups_to_compress,
63+
max_state_group,
64+
state_group_map,
65+
)
66+
}
67+
68+
/// Fetch the entries in state_groups_state (and their prev groups) for a
69+
/// specific room. This method should only be called if resuming the compressor from
70+
/// where it last finished - and as such also loads in the state groups from the heads
71+
/// of each of the levels (as they were at the end of the last run of the compressor)
3272
///
3373
/// Returns with the state_group map and the id of the last group that was used
3474
///
@@ -43,12 +83,15 @@ use super::StateGroupEntry;
4383
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
4484
/// * `max_state_group` - If specified, then only fetch the entries for state
4585
/// groups lower than or equal to this number.
46-
pub fn get_data_from_db(
86+
/// * 'level_info' - The maximum size, current length and current head for each
87+
/// level (as it was when the compressor last finished for this
88+
/// room)
89+
pub fn reload_data_from_db(
4790
db_url: &str,
4891
room_id: &str,
4992
min_state_group: Option<i64>,
5093
groups_to_compress: Option<i64>,
51-
max_state_group: Option<i64>,
94+
level_info: &[Level],
5295
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
5396
// connect to the database
5497
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
@@ -58,19 +101,117 @@ pub fn get_data_from_db(
58101
let mut client = Client::connect(db_url, connector)
59102
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
60103

104+
// load just the state_groups at the head of each level
105+
// this doesn't load their predecessors as that will be done at the end of
106+
// load_map_from_db()
107+
let state_group_map: BTreeMap<i64, StateGroupEntry> = load_level_heads(&mut client, level_info);
108+
109+
load_map_from_db(
110+
&mut client,
111+
room_id,
112+
min_state_group,
113+
groups_to_compress,
114+
// max state group not used when saving and loading
115+
None,
116+
state_group_map,
117+
)
118+
}
119+
120+
/// Finds the state_groups that are at the head of each compressor level
121+
/// NOTE this does not also retrieve their predecessors
122+
///
123+
/// # Arguments
124+
///
125+
/// * `client' - A Postgres client to make requests with
126+
/// * `levels' - The levels who's heads are being requested
127+
fn load_level_heads(client: &mut Client, level_info: &[Level]) -> BTreeMap<i64, StateGroupEntry> {
128+
// obtain all of the heads that aren't None from level_info
129+
let level_heads: Vec<i64> = level_info
130+
.iter()
131+
.filter_map(|l| (*l).get_current())
132+
.collect();
133+
134+
// Query to get id, predecessor and deltas for each state group
135+
let sql = r#"
136+
SELECT m.id, prev_state_group, type, state_key, s.event_id
137+
FROM state_groups AS m
138+
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
139+
LEFT JOIN state_group_edges AS e ON (m.id = e.state_group)
140+
WHERE m.id = ANY($1)
141+
"#;
142+
143+
// Actually do the query
144+
let mut rows = client.query_raw(sql, &[&level_heads]).unwrap();
145+
146+
// Copy the data from the database into a map
147+
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
148+
149+
while let Some(row) = rows.next().unwrap() {
150+
// The row in the map to copy the data to
151+
// NOTE: default StateGroupEntry has in_range as false
152+
// This is what we want since as a level head, it has already been compressed by the
153+
// previous run!
154+
let entry = state_group_map.entry(row.get(0)).or_default();
155+
156+
// Save the predecessor (this may already be there)
157+
entry.prev_state_group = row.get(1);
158+
159+
// Copy the single delta from the predecessor stored in this row
160+
if let Some(etype) = row.get::<_, Option<String>>(2) {
161+
entry.state_map.insert(
162+
&etype,
163+
&row.get::<_, String>(3),
164+
row.get::<_, String>(4).into(),
165+
);
166+
}
167+
}
168+
state_group_map
169+
}
170+
171+
/// Fetch the entries in state_groups_state (and their prev groups) for a
172+
/// specific room within a certain range. These are appended onto the provided
173+
/// map.
174+
///
175+
/// - Fetches the first [group] rows with group id after [min]
176+
/// - Recursively searches for missing predecessors and adds those
177+
///
178+
/// Returns with the state_group map and the id of the last group that was used
179+
///
180+
/// # Arguments
181+
///
182+
/// * `client` - A Postgres client to make requests with
183+
/// * `room_id` - The ID of the room in the database
184+
/// * `min_state_group` - If specified, then only fetch the entries for state
185+
/// groups greater than (but not equal) to this number. It
186+
/// also requires groups_to_compress to be specified
187+
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
188+
/// * 'state_group_map' - The map to populate with the entries from the database
189+
190+
fn load_map_from_db(
191+
client: &mut Client,
192+
room_id: &str,
193+
min_state_group: Option<i64>,
194+
groups_to_compress: Option<i64>,
195+
max_state_group: Option<i64>,
196+
mut state_group_map: BTreeMap<i64, StateGroupEntry>,
197+
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
61198
// Search for the group id of the groups_to_compress'th group after min_state_group
62199
// If this is saved, then the compressor can continue by having min_state_group being
63200
// set to this maximum
64201
let max_group_found = find_max_group(
65-
&mut client,
202+
client,
66203
room_id,
67204
min_state_group,
68205
groups_to_compress,
69206
max_state_group,
70207
);
71208

72-
let mut state_group_map =
73-
get_initial_data_from_db(&mut client, room_id, min_state_group, max_group_found);
209+
state_group_map.append(&mut get_initial_data_from_db(
210+
client,
211+
room_id,
212+
min_state_group,
213+
max_group_found,
214+
));
74215

75216
println!("Got initial state from database. Checking for any missing state groups...");
76217

@@ -111,7 +252,7 @@ pub fn get_data_from_db(
111252
// println!("Missing {} state groups", missing_sgs.len());
112253

113254
// find state groups not picked up already and add them to the map
114-
let map = get_missing_from_db(&mut client, &missing_sgs, min_state_group, max_group_found);
255+
let map = get_missing_from_db(client, &missing_sgs, min_state_group, max_group_found);
115256
for (k, v) in map {
116257
state_group_map.entry(k).or_insert(v);
117258
}
@@ -354,7 +495,8 @@ fn test_pg_escape() {
354495
/// * `new_map` - The state group data generated by the compressor to
355496
/// replace replace the old contents
356497
pub fn send_changes_to_db(
357-
config: &Config,
498+
db_url: &str,
499+
room_id: &str,
358500
old_map: &BTreeMap<i64, StateGroupEntry>,
359501
new_map: &BTreeMap<i64, StateGroupEntry>,
360502
) {
@@ -363,7 +505,7 @@ pub fn send_changes_to_db(
363505
builder.set_verify(SslVerifyMode::NONE);
364506
let connector = MakeTlsConnector::new(builder.build());
365507

366-
let mut client = Client::connect(&config.db_url, connector).unwrap();
508+
let mut client = Client::connect(db_url, connector).unwrap();
367509

368510
println!("Writing changes...");
369511

@@ -375,7 +517,7 @@ pub fn send_changes_to_db(
375517
pb.set_message("state groups");
376518
pb.enable_steady_tick(100);
377519

378-
for sql_transaction in generate_sql(old_map, new_map, &config.room_id) {
520+
for sql_transaction in generate_sql(old_map, new_map, room_id) {
379521
// commit this change to the database
380522
// N.B. this is a synchronous library so will wait until finished before continueing...
381523
// if want to speed up compressor then this might be a good place to start!

0 commit comments

Comments
 (0)