Skip to content

Commit bbb125d

Browse files
authored
Call flush during end of the FuelService (#1456)
Closes #1428 Call the `flush` function during the end of the `FuelService` to move the processing of WAL and SST fiels from the node's launching to the node's shutdown. Added additional logs to track the performance of the starting/shutdown process.
1 parent 2ecde6a commit bbb125d

File tree

9 files changed

+53
-7
lines changed

9 files changed

+53
-7
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ Description of the upcoming release here.
1010

1111
### Added
1212
- [#1457](https://github.com/FuelLabs/fuel-core/pull/1457): Fixing incorrect measurement for fast(µs) opcodes.
13-
- [#1449](https://github.com/FuelLabs/fuel-core/pull/1449): fix coin pagination in e2e test client
13+
- [#1456](https://github.com/FuelLabs/fuel-core/pull/1456): Added flushing of the RocksDB during a graceful shutdown.
14+
- [#1456](https://github.com/FuelLabs/fuel-core/pull/1456): Added more logs to track the service lifecycle.
15+
- [#1449](https://github.com/FuelLabs/fuel-core/pull/1449): Fix coin pagination in e2e test client.
1416
- [#1447](https://github.com/FuelLabs/fuel-core/pull/1447): Add timeout for continuous e2e tests
1517
- [#1444](https://github.com/FuelLabs/fuel-core/pull/1444): Add "sanity" benchmarks for memory opcodes.
1618
- [#1437](https://github.com/FuelLabs/fuel-core/pull/1437): Add some transaction throughput tests for basic transfers.

crates/fuel-core/src/database.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,10 @@ impl Database {
232232
pub fn transaction(&self) -> DatabaseTransaction {
233233
self.into()
234234
}
235+
236+
pub fn flush(self) -> DatabaseResult<()> {
237+
self.data.flush()
238+
}
235239
}
236240

237241
/// Mutable methods.

crates/fuel-core/src/service.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ impl FuelService {
6868
#[tracing::instrument(skip_all, fields(name = %config.name))]
6969
pub fn new(database: Database, config: Config) -> anyhow::Result<Self> {
7070
let config = config.make_config_consistent();
71-
database.init(&config.chain_conf)?;
7271
let task = Task::new(database, config)?;
7372
let runner = ServiceRunner::new(task);
7473
let shared = runner.shared.clone();
@@ -93,6 +92,11 @@ impl FuelService {
9392
);
9493
Database::default()
9594
} else {
95+
tracing::info!(
96+
"Opening database {:?} with cache size \"{}\"",
97+
config.database_path,
98+
config.max_database_cache_size
99+
);
96100
Database::open(&config.database_path, config.max_database_cache_size)?
97101
}
98102
}
@@ -182,9 +186,12 @@ impl Task {
182186
/// Private inner method for initializing the fuel service task
183187
pub fn new(database: Database, config: Config) -> anyhow::Result<Task> {
184188
// initialize state
189+
tracing::info!("Initializing database");
190+
database.init(&config.chain_conf)?;
185191
genesis::maybe_initialize_state(&config, &database)?;
186192

187193
// initialize sub services
194+
tracing::info!("Initializing sub services");
188195
let (services, shared) = sub_services::init_sub_services(&config, &database)?;
189196
Ok(Task { services, shared })
190197
}
@@ -251,6 +258,7 @@ impl RunnableTask for Task {
251258
);
252259
}
253260
}
261+
self.shared.database.flush()?;
254262
Ok(())
255263
}
256264
}

crates/fuel-core/src/state.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ pub enum WriteOperation {
8686
Remove,
8787
}
8888

89-
pub trait TransactableStorage: BatchOperations + Debug + Send + Sync {}
89+
pub trait TransactableStorage: BatchOperations + Debug + Send + Sync {
90+
fn flush(&self) -> DatabaseResult<()>;
91+
}
9092

9193
pub mod in_memory;
9294
#[cfg(feature = "rocksdb")]

crates/fuel-core/src/state/in_memory/memory_store.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,14 @@ impl KeyValueStore for MemoryStore {
212212

213213
impl BatchOperations for MemoryStore {}
214214

215-
impl TransactableStorage for MemoryStore {}
215+
impl TransactableStorage for MemoryStore {
216+
fn flush(&self) -> DatabaseResult<()> {
217+
for lock in self.inner.iter() {
218+
lock.lock().expect("poisoned").clear();
219+
}
220+
Ok(())
221+
}
222+
}
216223

217224
#[cfg(test)]
218225
mod tests {

crates/fuel-core/src/state/in_memory/transaction.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,15 @@ impl KeyValueStore for MemoryTransactionView {
284284

285285
impl BatchOperations for MemoryTransactionView {}
286286

287-
impl TransactableStorage for MemoryTransactionView {}
287+
impl TransactableStorage for MemoryTransactionView {
288+
fn flush(&self) -> DatabaseResult<()> {
289+
for lock in self.changes.iter() {
290+
lock.lock().expect("poisoned lock").clear();
291+
}
292+
self.view_layer.flush()?;
293+
self.data_source.flush()
294+
}
295+
}
288296

289297
#[cfg(test)]
290298
mod tests {

crates/fuel-core/src/state/rocks_db.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ impl RocksDb {
7777
opts.set_row_cache(&cache);
7878
}
7979

80+
DB::repair(&opts, &path).map_err(|e| DatabaseError::Other(e.into()))?;
81+
8082
let db = match DB::open_cf_descriptors(&opts, &path, cf_descriptors) {
8183
Err(_) => {
8284
// setup cfs
@@ -390,7 +392,17 @@ impl BatchOperations for RocksDb {
390392
}
391393
}
392394

393-
impl TransactableStorage for RocksDb {}
395+
impl TransactableStorage for RocksDb {
396+
fn flush(&self) -> DatabaseResult<()> {
397+
self.db
398+
.flush_wal(true)
399+
.map_err(|e| anyhow::anyhow!("Unable to flush WAL file: {}", e))?;
400+
self.db
401+
.flush()
402+
.map_err(|e| anyhow::anyhow!("Unable to flush SST files: {}", e))?;
403+
Ok(())
404+
}
405+
}
394406

395407
#[cfg(test)]
396408
mod tests {

crates/services/src/service.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,8 @@ where
295295
}
296296
});
297297

298+
tracing::info!("The service {} is shut down", S::NAME);
299+
298300
if let State::StoppedWithError(err) = stopped_state {
299301
std::panic::resume_unwind(Box::new(err));
300302
}
@@ -325,6 +327,7 @@ async fn run<S>(
325327
}
326328

327329
// We can panic here, because it is inside of the task.
330+
tracing::info!("Starting {} service", S::NAME);
328331
let mut task = service
329332
.into_task(&state, params)
330333
.await
@@ -375,6 +378,7 @@ async fn run<S>(
375378
}
376379
}
377380

381+
tracing::info!("Shutting down {} service", S::NAME);
378382
let shutdown = std::panic::AssertUnwindSafe(task.shutdown());
379383
match shutdown.catch_unwind().await {
380384
Ok(Ok(_)) => {}

crates/services/sync/src/service.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ where
124124
}
125125

126126
async fn shutdown(self) -> anyhow::Result<()> {
127-
tracing::info!("Sync task shutting down");
128127
self.import_task_handle.stop_and_await().await?;
129128
Ok(())
130129
}

0 commit comments

Comments
 (0)