Skip to content

Commit f3051f4

Browse files
iovoidedg-l
andauthored
fix(l1): close background threads before dropping db (#5749)
**Motivation** The error `pthread lock: Invalid argument` has been happening rarely with rocksdb, but #5497 manages to trigger it consistently so it should be fixed soon. This PR is inspired by @PivasDesant's solution (see #5661) **Description** Stops background DB threads before dropping the inner database object. The store background threads do not need to be stopped since these hold a reference to the db (preventing it from being dropped early). #5658 contains a script to test the fix by running the rocksdb test a thousand times <!-- A clear and concise general description of the changes this PR introduces --> <!-- Link to issues: Resolves #111, Resolves #222 --> **Checklist** - [X] Updated `STORE_SCHEMA_VERSION` (crates/storage/lib.rs) if the PR includes breaking changes to the `Store` requiring a re-sync. Closes #5658 --------- Co-authored-by: Edgar <[email protected]>
1 parent c0dd265 commit f3051f4

File tree

2 files changed

+35
-4
lines changed

2 files changed

+35
-4
lines changed

crates/storage/backend/rocksdb.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,16 @@ impl RocksDBBackend {
203203
}
204204
}
205205

206+
impl Drop for RocksDBBackend {
207+
fn drop(&mut self) {
208+
// When the last reference to the db is dropped, stop background threads
209+
// See https://github.com/facebook/rocksdb/issues/11349
210+
if let Some(db) = Arc::get_mut(&mut self.db) {
211+
db.cancel_all_background_work(true);
212+
}
213+
}
214+
}
215+
206216
impl StorageBackend for RocksDBBackend {
207217
fn clear_table(&self, table: &'static str) -> Result<(), StoreError> {
208218
let cf = self

crates/storage/store.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use std::{
5050
atomic::AtomicU64,
5151
mpsc::{SyncSender, TryRecvError, sync_channel},
5252
},
53+
thread::JoinHandle,
5354
};
5455
use tracing::{debug, error, info};
5556
/// Number of state trie segments to fetch concurrently during state sync
@@ -145,6 +146,21 @@ pub struct Store {
145146
/// those changes already affect the code hash stored in the account, and only
146147
/// may result in this cache having useless data.
147148
account_code_cache: Arc<CodeCache>,
149+
150+
background_threads: Arc<ThreadList>,
151+
}
152+
153+
#[derive(Debug, Default)]
154+
struct ThreadList {
155+
list: Vec<JoinHandle<()>>,
156+
}
157+
158+
impl Drop for ThreadList {
159+
fn drop(&mut self) {
160+
for handle in self.list.drain(..) {
161+
let _ = handle.join();
162+
}
163+
}
148164
}
149165

150166
pub type StorageTrieNodes = Vec<(H256, Vec<(Nibbles, Vec<u8>)>)>;
@@ -1273,7 +1289,8 @@ impl Store {
12731289
last_written
12741290
}
12751291
};
1276-
let store = Self {
1292+
let mut background_threads = Vec::new();
1293+
let mut store = Self {
12771294
db_path,
12781295
backend,
12791296
chain_config: Default::default(),
@@ -1283,10 +1300,11 @@ impl Store {
12831300
trie_update_worker_tx: trie_upd_tx,
12841301
last_computed_flatkeyvalue: Arc::new(Mutex::new(last_written)),
12851302
account_code_cache: Arc::new(CodeCache::default()),
1303+
background_threads: Default::default(),
12861304
};
12871305
let backend_clone = store.backend.clone();
12881306
let last_computed_fkv = store.last_computed_flatkeyvalue.clone();
1289-
std::thread::spawn(move || {
1307+
background_threads.push(std::thread::spawn(move || {
12901308
let rx = fkv_rx;
12911309
// Wait for the first Continue to start generation
12921310
loop {
@@ -1302,7 +1320,7 @@ impl Store {
13021320

13031321
let _ = flatkeyvalue_generator(&backend_clone, &last_computed_fkv, &rx)
13041322
.inspect_err(|err| error!("Error while generating FlatKeyValue: {err}"));
1305-
});
1323+
}));
13061324
let backend = store.backend.clone();
13071325
let flatkeyvalue_control_tx = store.flatkeyvalue_control_tx.clone();
13081326
let trie_cache = store.trie_cache.clone();
@@ -1326,7 +1344,7 @@ impl Store {
13261344
13271345
- Third, it removes the (no longer needed) bottom-most diff layer from the trie layers in the same way as the first step.
13281346
*/
1329-
std::thread::spawn(move || {
1347+
background_threads.push(std::thread::spawn(move || {
13301348
let rx = trie_upd_rx;
13311349
loop {
13321350
match rx.recv() {
@@ -1346,6 +1364,9 @@ impl Store {
13461364
}
13471365
}
13481366
}
1367+
}));
1368+
store.background_threads = Arc::new(ThreadList {
1369+
list: background_threads,
13491370
});
13501371
Ok(store)
13511372
}

0 commit comments

Comments
 (0)