|
6 | 6 | //! to the database and uses these to enable it to incrementally work
|
7 | 7 | //! on space reductions
|
8 | 8 |
|
| 9 | +use anyhow::Result; |
| 10 | +use log::{error, LevelFilter}; |
| 11 | +use pyo3::{ |
| 12 | + exceptions::PyRuntimeError, prelude::pymodule, types::PyModule, PyErr, PyResult, Python, |
| 13 | +}; |
9 | 14 | use std::str::FromStr;
|
10 | 15 |
|
11 |
| -use anyhow::Result; |
12 | 16 | use synapse_compress_state::Level;
|
13 | 17 |
|
14 | 18 | pub mod manager;
|
@@ -50,3 +54,74 @@ impl FromStr for LevelInfo {
|
50 | 54 | Ok(LevelInfo(level_info))
|
51 | 55 | }
|
52 | 56 | }
|
| 57 | + |
| 58 | +// PyO3 INTERFACE STARTS HERE |
| 59 | +#[pymodule] |
| 60 | +fn auto_compressor(_py: Python, m: &PyModule) -> PyResult<()> { |
| 61 | + let _ = pyo3_log::Logger::default() |
| 62 | + // don't send out anything lower than a warning from other crates |
| 63 | + .filter(LevelFilter::Warn) |
| 64 | + // don't log warnings from synapse_compress_state, the auto_compressor handles these |
| 65 | + // situations and provides better log messages |
| 66 | + .filter_target("synapse_compress_state".to_owned(), LevelFilter::Error) |
| 67 | + // log info and above for the auto_compressor |
| 68 | + .filter_target("auto_compressor".to_owned(), LevelFilter::Debug) |
| 69 | + .install(); |
| 70 | + // ensure any panics produce error messages in the log |
| 71 | + log_panics::init(); |
| 72 | + |
| 73 | + #[pyfn(m, compress_largest_rooms)] |
| 74 | + fn compress_state_events_table( |
| 75 | + py: Python, |
| 76 | + db_url: String, |
| 77 | + chunk_size: i64, |
| 78 | + default_levels: String, |
| 79 | + number_of_chunks: i64, |
| 80 | + ) -> PyResult<()> { |
| 81 | + // Stops the compressor from holding the GIL while running |
| 82 | + py.allow_threads(|| { |
| 83 | + _compress_state_events_table_body(db_url, chunk_size, default_levels, number_of_chunks) |
| 84 | + }) |
| 85 | + } |
| 86 | + |
| 87 | + // Not accessbile through Py03. It is a "private" function. |
| 88 | + fn _compress_state_events_table_body( |
| 89 | + db_url: String, |
| 90 | + chunk_size: i64, |
| 91 | + default_levels: String, |
| 92 | + number_of_chunks: i64, |
| 93 | + ) -> PyResult<()> { |
| 94 | + // Announce the start of the program to the logs |
| 95 | + log::info!("auto_compressor started"); |
| 96 | + |
| 97 | + // Parse the default_level string into a LevelInfo struct |
| 98 | + let default_levels: LevelInfo = match default_levels.parse() { |
| 99 | + Ok(l_sizes) => l_sizes, |
| 100 | + Err(e) => { |
| 101 | + return Err(PyErr::new::<PyRuntimeError, _>(format!( |
| 102 | + "Unable to parse level_sizes: {}", |
| 103 | + e |
| 104 | + ))) |
| 105 | + } |
| 106 | + }; |
| 107 | + |
| 108 | + // call compress_largest_rooms with the arguments supplied |
| 109 | + let run_result = manager::compress_chunks_of_database( |
| 110 | + &db_url, |
| 111 | + chunk_size, |
| 112 | + &default_levels.0, |
| 113 | + number_of_chunks, |
| 114 | + ); |
| 115 | + |
| 116 | + // (Note, need to do `{:?}` formatting to show error context) |
| 117 | + // Don't log the context of errors but do use it in the PyRuntimeError |
| 118 | + if let Err(e) = run_result { |
| 119 | + error!("{}", e); |
| 120 | + return Err(PyErr::new::<PyRuntimeError, _>(format!("{:?}", e))); |
| 121 | + } |
| 122 | + |
| 123 | + log::info!("auto_compressor finished"); |
| 124 | + Ok(()) |
| 125 | + } |
| 126 | + Ok(()) |
| 127 | +} |
0 commit comments