Skip to content

Commit d908d13

Browse files
authored
Add option to commit changes to the database automatically (#53)
1 parent 65861de commit d908d13

File tree

6 files changed

+601
-57
lines changed

6 files changed

+601
-57
lines changed

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@ If this flag is set then output the node and edge information for the state_grou
106106
directed graph built up from the predecessor state_group links. These can be looked
107107
at in something like Gephi (https://gephi.org)
108108

109+
- -c
110+
If this flag is set then the changes the compressor makes will be committed to the
111+
database. This should be safe to use while synapse is running as it assumes by default
112+
that the transactions flag is set
113+
109114
## Using as python library
110115

111116
The compressor can also be built into a python library as it uses PyO3. It can be

compressor_integration_tests/src/lib.rs

Lines changed: 230 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
use openssl::ssl::{SslConnector, SslMethod, SslVerifyMode};
2-
use postgres::Client;
2+
use postgres::{fallible_iterator::FallibleIterator, Client};
33
use postgres_openssl::MakeTlsConnector;
44
use rand::{distributions::Alphanumeric, thread_rng, Rng};
5+
use state_map::StateMap;
56
use std::{borrow::Cow, collections::BTreeMap, fmt};
7+
use string_cache::DefaultAtom as Atom;
68

79
use synapse_compress_state::StateGroupEntry;
810

@@ -105,3 +107,230 @@ impl<'a> fmt::Display for PGEscape<'a> {
105107
write!(f, "{}{}{}", delim, self.0, delim)
106108
}
107109
}
110+
111+
/// Checks whether the state at each state group is the same as what the map thinks it should be
112+
///
113+
/// i.e. when synapse tries to work out the state for a given state group by looking at
114+
/// the database. Will the state it gets be the same as what the map thinks it should be
115+
pub fn database_collapsed_states_match_map(
116+
state_group_map: &BTreeMap<i64, StateGroupEntry>,
117+
) -> bool {
118+
for sg in state_group_map.keys() {
119+
let map_state = collapse_state_with_map(state_group_map, *sg);
120+
let database_state = collapse_state_with_database(*sg);
121+
if map_state != database_state {
122+
println!("database state {} doesn't match", sg);
123+
println!("expected {:?}", map_state);
124+
println!("but found {:?}", database_state);
125+
return false;
126+
}
127+
}
128+
true
129+
}
130+
131+
/// Gets the full state for a given group from the map (of deltas)
132+
fn collapse_state_with_map(
133+
map: &BTreeMap<i64, StateGroupEntry>,
134+
state_group: i64,
135+
) -> StateMap<Atom> {
136+
let mut entry = &map[&state_group];
137+
let mut state_map = StateMap::new();
138+
139+
let mut stack = vec![state_group];
140+
141+
while let Some(prev_state_group) = entry.prev_state_group {
142+
stack.push(prev_state_group);
143+
if !map.contains_key(&prev_state_group) {
144+
panic!("Missing {}", prev_state_group);
145+
}
146+
entry = &map[&prev_state_group];
147+
}
148+
149+
for sg in stack.iter().rev() {
150+
state_map.extend(
151+
map[sg]
152+
.state_map
153+
.iter()
154+
.map(|((t, s), e)| ((t, s), e.clone())),
155+
);
156+
}
157+
158+
state_map
159+
}
160+
161+
fn collapse_state_with_database(state_group: i64) -> StateMap<Atom> {
162+
// connect to the database
163+
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
164+
builder.set_verify(SslVerifyMode::NONE);
165+
let connector = MakeTlsConnector::new(builder.build());
166+
167+
let mut client = Client::connect(DB_URL, connector).unwrap();
168+
169+
// Gets the delta for a specific state group
170+
let query_deltas = r#"
171+
SELECT m.id, type, state_key, s.event_id
172+
FROM state_groups AS m
173+
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
174+
WHERE m.id = $1
175+
"#;
176+
177+
// If there is no delta for that specific state group, then we still want to find
178+
// the predecessor (so have split this into a different query)
179+
let query_pred = r#"
180+
SELECT prev_state_group
181+
FROM state_group_edges
182+
WHERE state_group = $1
183+
"#;
184+
185+
let mut state_map: StateMap<Atom> = StateMap::new();
186+
187+
let mut next_group = Some(state_group);
188+
189+
while let Some(sg) = next_group {
190+
// get predecessor from state_group_edges
191+
let mut pred = client.query_raw(query_pred, &[sg]).unwrap();
192+
193+
// set next_group to predecessor
194+
next_group = match pred.next().unwrap() {
195+
Some(p) => p.get(0),
196+
None => None,
197+
};
198+
199+
// if there was a predecessor then assert that it is unique
200+
if next_group.is_some() {
201+
assert!(pred.next().unwrap().is_none());
202+
}
203+
drop(pred);
204+
205+
let mut rows = client.query_raw(query_deltas, &[sg]).unwrap();
206+
207+
while let Some(row) = rows.next().unwrap() {
208+
// Copy the single delta from the predecessor stored in this row
209+
if let Some(etype) = row.get::<_, Option<String>>(1) {
210+
let key = &row.get::<_, String>(2);
211+
212+
// only insert if not overriding existing entry
213+
// this is because the newer delta is found FIRST
214+
if state_map.get(&etype, key).is_none() {
215+
state_map.insert(&etype, key, row.get::<_, String>(3).into());
216+
}
217+
}
218+
}
219+
}
220+
221+
state_map
222+
}
223+
224+
/// Check whether predecessors and deltas stored in the database are the same as in the map
225+
pub fn database_structure_matches_map(state_group_map: &BTreeMap<i64, StateGroupEntry>) -> bool {
226+
// connect to the database
227+
let mut builder = SslConnector::builder(SslMethod::tls()).unwrap();
228+
builder.set_verify(SslVerifyMode::NONE);
229+
let connector = MakeTlsConnector::new(builder.build());
230+
231+
let mut client = Client::connect(DB_URL, connector).unwrap();
232+
233+
// Gets the delta for a specific state group
234+
let query_deltas = r#"
235+
SELECT m.id, type, state_key, s.event_id
236+
FROM state_groups AS m
237+
LEFT JOIN state_groups_state AS s ON (m.id = s.state_group)
238+
WHERE m.id = $1
239+
"#;
240+
241+
// If there is no delta for that specific state group, then we still want to find
242+
// the predecessor (so have split this into a different query)
243+
let query_pred = r#"
244+
SELECT prev_state_group
245+
FROM state_group_edges
246+
WHERE state_group = $1
247+
"#;
248+
249+
for (sg, entry) in state_group_map {
250+
// get predecessor from state_group_edges
251+
let mut pred_iter = client.query_raw(query_pred, &[sg]).unwrap();
252+
253+
// read out the predecessor value from the database
254+
let database_pred = match pred_iter.next().unwrap() {
255+
Some(p) => p.get(0),
256+
None => None,
257+
};
258+
259+
// if there was a predecessor then assert that it is unique
260+
if database_pred.is_some() {
261+
assert!(pred_iter.next().unwrap().is_none());
262+
}
263+
264+
// check if it matches map
265+
if database_pred != entry.prev_state_group {
266+
println!(
267+
"ERROR: predecessor for {} was {:?} (expected {:?})",
268+
sg, database_pred, entry.prev_state_group
269+
);
270+
return false;
271+
}
272+
// needed so that can create another query
273+
drop(pred_iter);
274+
275+
// Now check that deltas are the same
276+
let mut state_map: StateMap<Atom> = StateMap::new();
277+
278+
// Get delta from state_groups_state
279+
let mut rows = client.query_raw(query_deltas, &[sg]).unwrap();
280+
281+
while let Some(row) = rows.next().unwrap() {
282+
// Copy the single delta from the predecessor stored in this row
283+
if let Some(etype) = row.get::<_, Option<String>>(1) {
284+
state_map.insert(
285+
&etype,
286+
&row.get::<_, String>(2),
287+
row.get::<_, String>(3).into(),
288+
);
289+
}
290+
}
291+
292+
// Check that the delta matches the map
293+
if state_map != entry.state_map {
294+
println!("ERROR: delta for {} didn't match", sg);
295+
println!("Expected: {:?}", entry.state_map);
296+
println!("Actual: {:?}", state_map);
297+
return false;
298+
}
299+
}
300+
true
301+
}
302+
303+
#[test]
304+
fn functions_are_self_consistent() {
305+
let mut initial: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
306+
let mut prev = None;
307+
308+
// This starts with the following structure
309+
//
310+
// 0-1-2-3-4-5-6-7-8-9-10-11-12-13
311+
//
312+
// Each group i has state:
313+
// ('node','is', i)
314+
// ('group', j, 'seen') - for all j less than i
315+
for i in 0i64..=13i64 {
316+
let mut entry = StateGroupEntry {
317+
in_range: true,
318+
prev_state_group: prev,
319+
state_map: StateMap::new(),
320+
};
321+
entry
322+
.state_map
323+
.insert("group", &i.to_string(), "seen".into());
324+
entry.state_map.insert("node", "is", i.to_string().into());
325+
326+
initial.insert(i, entry);
327+
328+
prev = Some(i)
329+
}
330+
331+
empty_database();
332+
add_contents_to_database("room1", &initial);
333+
334+
assert!(database_collapsed_states_match_map(&initial));
335+
assert!(database_structure_matches_map(&initial));
336+
}

compressor_integration_tests/src/map_builder.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,129 @@ pub fn line_with_state(start: i64, end: i64) -> BTreeMap<i64, StateGroupEntry> {
3434

3535
initial
3636
}
37+
38+
/// Generates line segments in a chain of state groups each with state deltas
39+
///
40+
/// If called wiht start=0, end=13 this would build the following:
41+
///
42+
/// 0-1-2 3-4-5 6-7-8 9-10-11 12-13
43+
///
44+
/// Where each group i has state:
45+
/// ('node','is', i)
46+
/// ('group', j, 'seen') - for all j less than i
47+
pub fn line_segments_with_state(start: i64, end: i64) -> BTreeMap<i64, StateGroupEntry> {
48+
let mut initial: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
49+
let mut prev = None;
50+
51+
for i in start..=end {
52+
// if the state is a snapshot then set its predecessor to NONE
53+
if (i - start) % 3 == 0 {
54+
prev = None;
55+
}
56+
57+
// create a blank entry for it
58+
let mut entry = StateGroupEntry {
59+
in_range: true,
60+
prev_state_group: prev,
61+
state_map: StateMap::new(),
62+
};
63+
64+
// if it's a snapshot then add in all previous state
65+
if prev.is_none() {
66+
for j in start..i {
67+
entry
68+
.state_map
69+
.insert("group", &j.to_string(), "seen".into());
70+
}
71+
}
72+
73+
// add in the new state for this state group
74+
entry
75+
.state_map
76+
.insert("group", &i.to_string(), "seen".into());
77+
entry.state_map.insert("node", "is", i.to_string().into());
78+
79+
// put it into the initial map
80+
initial.insert(i, entry);
81+
82+
// set this group as the predecessor for the next
83+
prev = Some(i)
84+
}
85+
initial
86+
}
87+
88+
/// This generates the correct compressed structure with 3,3 levels
89+
///
90+
/// Note: only correct structure when no impossible predecessors
91+
///
92+
/// Structure generated:
93+
///
94+
/// 0 3\ 12
95+
/// 1 4 6\ 13
96+
/// 2 5 7 9
97+
/// 8 10
98+
/// 11
99+
/// Where each group i has state:
100+
/// ('node','is', i)
101+
/// ('group', j, 'seen') - for all j less than i
102+
pub fn compressed_3_3_from_0_to_13_with_state() -> BTreeMap<i64, StateGroupEntry> {
103+
let expected_edges: BTreeMap<i64, i64> = vec![
104+
(1, 0),
105+
(2, 1),
106+
(4, 3),
107+
(5, 4),
108+
(6, 3),
109+
(7, 6),
110+
(8, 7),
111+
(9, 6),
112+
(10, 9),
113+
(11, 10),
114+
(13, 12),
115+
]
116+
.into_iter()
117+
.collect();
118+
119+
let mut expected: BTreeMap<i64, StateGroupEntry> = BTreeMap::new();
120+
121+
// Each group i has state:
122+
// ('node','is', i)
123+
// ('group', j, 'seen') - for all j less than i
124+
for i in 0i64..=13i64 {
125+
let prev = expected_edges.get(&i);
126+
127+
//change from Option<&i64> to Option<i64>
128+
let prev = prev.copied();
129+
130+
// create a blank entry for it
131+
let mut entry = StateGroupEntry {
132+
in_range: true,
133+
prev_state_group: prev,
134+
state_map: StateMap::new(),
135+
};
136+
137+
// Add in all state between predecessor and now (non inclusive)
138+
if let Some(p) = prev {
139+
for j in (p + 1)..i {
140+
entry
141+
.state_map
142+
.insert("group", &j.to_string(), "seen".into());
143+
}
144+
} else {
145+
for j in 0i64..i {
146+
entry
147+
.state_map
148+
.insert("group", &j.to_string(), "seen".into());
149+
}
150+
}
151+
152+
// add in the new state for this state group
153+
entry
154+
.state_map
155+
.insert("group", &i.to_string(), "seen".into());
156+
entry.state_map.insert("node", "is", i.to_string().into());
157+
158+
// put it into the expected map
159+
expected.insert(i, entry);
160+
}
161+
expected
162+
}

0 commit comments

Comments
 (0)