Skip to content

Commit 55ee83c

Browse files
authored
Don't panic in continue_run if no groups found within range (#62)
1 parent a409cdb commit 55ee83c

File tree

3 files changed

+83
-76
lines changed

3 files changed

+83
-76
lines changed

compressor_integration_tests/tests/compressor_continue_run_tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,15 +30,15 @@ fn continue_run_called_twice_same_as_run() {
3030
let room_id = "room1".to_string();
3131

3232
// will run the compression in two batches
33-
let start = -1;
33+
let start = None;
3434
let chunk_size = 7;
3535

3636
// compress in 3,3 level sizes
3737
// since the compressor hasn't been run before they are empty
3838
let level_info = vec![Level::restore(3, 0, None), Level::restore(3, 0, None)];
3939

4040
// Run the compressor with those settings
41-
let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info);
41+
let chunk_stats_1 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
4242

4343
// Assert that it stopped at 6 (i.e. after the 7 groups 0...6)
4444
assert_eq!(chunk_stats_1.last_compressed_group, 6);
@@ -53,12 +53,12 @@ fn continue_run_called_twice_same_as_run() {
5353
vec![Level::restore(3, 1, Some(6)), Level::restore(3, 2, Some(6))]
5454
);
5555

56-
let start = 6;
56+
let start = Some(6);
5757
let chunk_size = 7;
5858
let level_info = chunk_stats_1.new_level_info.clone();
5959

6060
// Run the compressor with those settings
61-
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info);
61+
let chunk_stats_2 = continue_run(start, chunk_size, &db_url, &room_id, &level_info).unwrap();
6262

6363
// Assert that it stopped at 7
6464
assert_eq!(chunk_stats_2.last_compressed_group, 13);

src/database.rs

Lines changed: 66 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use super::StateGroupEntry;
2727
/// specific room.
2828
///
2929
/// Returns with the state_group map and the id of the last group that was used
30+
/// Or None if there are no state groups within the range given
3031
///
3132
/// # Arguments
3233
///
@@ -36,6 +37,8 @@ use super::StateGroupEntry;
3637
/// * `min_state_group` - If specified, then only fetch the entries for state
3738
/// groups greater than (but not equal) to this number. It
3839
/// also requires groups_to_compress to be specified
40+
/// * `max_state_group` - If specified, then only fetch the entries for state
41+
/// groups lower than or equal to this number.
3942
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
4043
4144
pub fn get_data_from_db(
@@ -44,7 +47,7 @@ pub fn get_data_from_db(
4447
min_state_group: Option<i64>,
4548
groups_to_compress: Option<i64>,
4649
max_state_group: Option<i64>,
47-
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
50+
) -> Option<(BTreeMap<i64, StateGroupEntry>, i64)> {
4851
// connect to the database
4952
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
5053
builder.set_verify(SslVerifyMode::NONE);
@@ -53,16 +56,27 @@ pub fn get_data_from_db(
5356
let mut client = Client::connect(db_url, connector)
5457
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
5558

56-
let state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
59+
// Search for the group id of the groups_to_compress'th group after min_state_group
60+
// If this is saved, then the compressor can continue by having min_state_group being
61+
// set to this maximum. If no such group can be found then return None.
5762

58-
load_map_from_db(
63+
let max_group_found = find_max_group(
5964
&mut client,
6065
room_id,
6166
min_state_group,
6267
groups_to_compress,
6368
max_state_group,
69+
)?;
70+
71+
let state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
72+
73+
Some(load_map_from_db(
74+
&mut client,
75+
room_id,
76+
min_state_group,
77+
max_group_found,
6478
state_group_map,
65-
)
79+
))
6680
}
6781

6882
/// Fetch the entries in state_groups_state (and their prev groups) for a
@@ -71,6 +85,7 @@ pub fn get_data_from_db(
7185
/// of each of the levels (as they were at the end of the last run of the compressor)
7286
///
7387
/// Returns with the state_group map and the id of the last group that was used
88+
/// Or None if there are no state groups within the range given
7489
///
7590
/// # Arguments
7691
///
@@ -81,8 +96,6 @@ pub fn get_data_from_db(
8196
/// groups greater than (but not equal) to this number. It
8297
/// also requires groups_to_compress to be specified
8398
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
84-
/// * `max_state_group` - If specified, then only fetch the entries for state
85-
/// groups lower than or equal to this number.
8699
/// * 'level_info' - The maximum size, current length and current head for each
87100
/// level (as it was when the compressor last finished for this
88101
/// room)
@@ -92,7 +105,7 @@ pub fn reload_data_from_db(
92105
min_state_group: Option<i64>,
93106
groups_to_compress: Option<i64>,
94107
level_info: &[Level],
95-
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
108+
) -> Option<(BTreeMap<i64, StateGroupEntry>, i64)> {
96109
// connect to the database
97110
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
98111
builder.set_verify(SslVerifyMode::NONE);
@@ -101,20 +114,30 @@ pub fn reload_data_from_db(
101114
let mut client = Client::connect(db_url, connector)
102115
.unwrap_or_else(|e| panic!("Error connecting to the database: {}", e));
103116

117+
// Search for the group id of the groups_to_compress'th group after min_state_group
118+
// If this is saved, then the compressor can continue by having min_state_group being
119+
// set to this maximum.If no such group can be found then return None.
120+
let max_group_found = find_max_group(
121+
&mut client,
122+
room_id,
123+
min_state_group,
124+
groups_to_compress,
125+
// max state group not used when saving and loading
126+
None,
127+
)?;
128+
104129
// load just the state_groups at the head of each level
105130
// this doesn't load their predecessors as that will be done at the end of
106131
// load_map_from_db()
107132
let state_group_map: BTreeMap<i64, StateGroupEntry> = load_level_heads(&mut client, level_info);
108133

109-
load_map_from_db(
134+
Some(load_map_from_db(
110135
&mut client,
111136
room_id,
112137
min_state_group,
113-
groups_to_compress,
114-
// max state group not used when saving and loading
115-
None,
138+
max_group_found,
116139
state_group_map,
117-
)
140+
))
118141
}
119142

120143
/// Finds the state_groups that are at the head of each compressor level
@@ -181,28 +204,16 @@ fn load_level_heads(client: &mut Client, level_info: &[Level]) -> BTreeMap<i64,
181204
/// * `min_state_group` - If specified, then only fetch the entries for state
182205
/// groups greater than (but not equal) to this number. It
183206
/// also requires groups_to_compress to be specified
184-
/// * 'groups_to_compress' - The number of groups to get from the database before stopping
207+
/// * 'max_group_found' - The last group to get from the database before stopping
185208
/// * 'state_group_map' - The map to populate with the entries from the database
186209
187210
fn load_map_from_db(
188211
client: &mut Client,
189212
room_id: &str,
190213
min_state_group: Option<i64>,
191-
groups_to_compress: Option<i64>,
192-
max_state_group: Option<i64>,
214+
max_group_found: i64,
193215
mut state_group_map: BTreeMap<i64, StateGroupEntry>,
194216
) -> (BTreeMap<i64, StateGroupEntry>, i64) {
195-
// Search for the group id of the groups_to_compress'th group after min_state_group
196-
// If this is saved, then the compressor can continue by having min_state_group being
197-
// set to this maximum
198-
let max_group_found = find_max_group(
199-
client,
200-
room_id,
201-
min_state_group,
202-
groups_to_compress,
203-
max_state_group,
204-
);
205-
206217
state_group_map.append(&mut get_initial_data_from_db(
207218
client,
208219
room_id,
@@ -261,7 +272,8 @@ fn load_map_from_db(
261272
/// Returns the group ID of the last group to be compressed
262273
///
263274
/// This can be saved so that future runs of the compressor only
264-
/// continue from after this point
275+
/// continue from after this point. If no groups can be found in
276+
/// the range specified it returns None.
265277
///
266278
/// # Arguments
267279
///
@@ -276,7 +288,7 @@ fn find_max_group(
276288
min_state_group: Option<i64>,
277289
groups_to_compress: Option<i64>,
278290
max_state_group: Option<i64>,
279-
) -> i64 {
291+
) -> Option<i64> {
280292
// Get list of state_id's in a certain room
281293
let mut query_chunk_of_ids = "SELECT id FROM state_groups WHERE room_id = $1".to_string();
282294
let params: Vec<&(dyn ToSql + Sync)>;
@@ -285,22 +297,33 @@ fn find_max_group(
285297
query_chunk_of_ids = format!("{} AND id <= {}", query_chunk_of_ids, max)
286298
}
287299

288-
// Adds additional constraint if a groups_to_compress has been specified
300+
// Adds additional constraint if a groups_to_compress or min_state_group have been specified
301+
// Note a min state group is only used if groups_to_compress also is
289302
if min_state_group.is_some() && groups_to_compress.is_some() {
290303
params = vec![&room_id, &min_state_group, &groups_to_compress];
291304
query_chunk_of_ids = format!(r"{} AND id > $2 LIMIT $3", query_chunk_of_ids);
305+
} else if groups_to_compress.is_some() {
306+
params = vec![&room_id, &groups_to_compress];
307+
query_chunk_of_ids = format!(r"{} LIMIT $2", query_chunk_of_ids);
292308
} else {
293309
params = vec![&room_id];
294-
query_chunk_of_ids = format!(r"{} ORDER BY id DESC LIMIT 1", query_chunk_of_ids);
295310
}
296311

297312
let sql_query = format!(
298313
"SELECT id FROM ({}) AS ids ORDER BY ids.id DESC LIMIT 1",
299314
query_chunk_of_ids
300315
);
301-
let final_row = client.query(sql_query.as_str(), &params).unwrap();
302316

303-
final_row.last().unwrap().get(0)
317+
// This vector should have length 0 or 1
318+
let rows = client
319+
.query(sql_query.as_str(), &params)
320+
.expect("Something went wrong while querying the database");
321+
322+
// If no row can be found then return None
323+
let final_row = rows.last()?;
324+
325+
// Else return the id of the group found
326+
Some(final_row.get::<_, i64>(0))
304327
}
305328

306329
/// Fetch the entries in state_groups_state and immediate predecessors for
@@ -330,22 +353,18 @@ fn get_initial_data_from_db(
330353
FROM state_groups AS m
331354
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
332355
LEFT JOIN state_group_edges AS e ON (m.id = e.state_group)
333-
WHERE m.room_id = $1
356+
WHERE m.room_id = $1 AND m.id <= $2
334357
"#;
335358

336359
// Adds additional constraint if minimum state_group has been specified.
337-
// note that the maximum group only affects queries if there is also a minimum
338-
// otherwise it is assumed that ALL groups should be fetched
339360
let mut rows = if let Some(min) = min_state_group {
340-
let params: Vec<&dyn ToSql> = vec![&room_id, &min, &max_group_found];
341-
client.query_raw(
342-
format!(r"{} AND m.id > $2 AND m.id <= $3", sql).as_str(),
343-
params,
344-
)
361+
let params: Vec<&dyn ToSql> = vec![&room_id, &max_group_found, &min];
362+
client.query_raw(format!(r"{} AND m.id > $3", sql).as_str(), params)
345363
} else {
346-
client.query_raw(sql, &[room_id])
364+
let params: Vec<&dyn ToSql> = vec![&room_id, &max_group_found];
365+
client.query_raw(sql, params)
347366
}
348-
.unwrap();
367+
.expect("Something went wrong while querying the database");
349368

350369
// Copy the data from the database into a map
351370
let mut state_group_map: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
@@ -481,13 +500,16 @@ fn test_pg_escape() {
481500
assert_eq!(&s[start_pos - 1..start_pos], "$");
482501
}
483502

503+
/// Send changes to the database
504+
///
484505
/// Note that currently ignores config.transactions and wraps every state
485506
/// group in it's own transaction (i.e. as if config.transactions was true)
486507
///
487508
/// # Arguments
488509
///
489-
/// * `config` - A Config struct that contains information
490-
/// about the run (e.g. room_id and database url)
510+
/// * `db_url` - The URL of a Postgres database. This should be of the
511+
/// form: "postgresql://user:pass@domain:port/database"
512+
/// * `room_id` - The ID of the room in the database
491513
/// * `old_map` - The state group data originally in the database
492514
/// * `new_map` - The state group data generated by the compressor to
493515
/// replace replace the old contents

src/lib.rs

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,9 @@ pub fn run(mut config: Config) {
298298
config.min_state_group,
299299
config.groups_to_compress,
300300
config.max_state_group,
301-
);
301+
)
302+
.unwrap_or_else(|| panic!("No state groups found within this range"));
303+
302304
println!("Fetched state groups up to {}", max_group_found);
303305

304306
println!("Number of state groups: {}", state_group_map.len());
@@ -510,6 +512,7 @@ fn output_sql(
510512
}
511513

512514
/// Information about what compressor did to chunk that it was ran on
515+
#[derive(Debug)]
513516
pub struct ChunkStats {
514517
// The state of each of the levels of the compressor when it stopped
515518
pub new_level_info: Vec<Level>,
@@ -524,16 +527,18 @@ pub struct ChunkStats {
524527
pub commited: bool,
525528
}
526529

530+
/// Loads a compressor state, runs it on a room and then returns info on how it got on
527531
pub fn continue_run(
528-
start: i64,
532+
start: Option<i64>,
529533
chunk_size: i64,
530534
db_url: &str,
531535
room_id: &str,
532536
level_info: &[Level],
533-
) -> ChunkStats {
537+
) -> Option<ChunkStats> {
534538
// First we need to get the current state groups
539+
// If nothing was found then return None
535540
let (state_group_map, max_group_found) =
536-
database::reload_data_from_db(db_url, room_id, Some(start), Some(chunk_size), level_info);
541+
database::reload_data_from_db(db_url, room_id, start, Some(chunk_size), level_info)?;
537542

538543
let original_num_rows = state_group_map.iter().map(|(_, v)| v.state_map.len()).sum();
539544

@@ -548,48 +553,28 @@ pub fn continue_run(
548553

549554
let ratio = (new_num_rows as f64) / (original_num_rows as f64);
550555

551-
println!(
552-
"Number of rows after compression: {} ({:.2}%)",
553-
new_num_rows,
554-
ratio * 100.
555-
);
556-
557-
println!("Compression Statistics:");
558-
println!(
559-
" Number of forced resets due to lacking prev: {}",
560-
compressor.stats.resets_no_suitable_prev
561-
);
562-
println!(
563-
" Number of compressed rows caused by the above: {}",
564-
compressor.stats.resets_no_suitable_prev_size
565-
);
566-
println!(
567-
" Number of state groups changed: {}",
568-
compressor.stats.state_groups_changed
569-
);
570-
571556
if ratio > 1.0 {
572557
println!("This compression would not remove any rows. Aborting.");
573-
return ChunkStats {
558+
return Some(ChunkStats {
574559
new_level_info: compressor.get_level_info(),
575560
last_compressed_group: max_group_found,
576561
original_num_rows,
577562
new_num_rows,
578563
commited: false,
579-
};
564+
});
580565
}
581566

582567
check_that_maps_match(&state_group_map, new_state_group_map);
583568

584569
database::send_changes_to_db(db_url, room_id, &state_group_map, new_state_group_map);
585570

586-
ChunkStats {
571+
Some(ChunkStats {
587572
new_level_info: compressor.get_level_info(),
588573
last_compressed_group: max_group_found,
589574
original_num_rows,
590575
new_num_rows,
591576
commited: true,
592-
}
577+
})
593578
}
594579

595580
/// Compares two sets of state groups

0 commit comments

Comments
 (0)