Skip to content

Commit e3eb005

Browse files
committed
fix
1 parent 07c09f6 commit e3eb005

File tree

4 files changed

+47
-22
lines changed

4 files changed

+47
-22
lines changed

scripts/tests/calibnet_other_check.sh

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ if [ "$cid" != "bafy2bzacecgqgzh3gxpariy3mzqb37y2vvxoaw5nwbrlzkhso6owus3zqckwe"
2424
exit 1
2525
fi
2626

27-
echo "Test snapshot GC"
2827
forest_check_db_stats
28+
echo "Run snapshot GC"
2929
forest_run_snap_gc
3030
sleep 5
31+
echo "Wait the node to sync"
3132
forest_wait_for_sync
3233
forest_check_db_stats
3334

src/daemon/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -609,6 +609,10 @@ pub(super) async fn start(
609609
.set(snap_gc.clone())
610610
.ok()
611611
.context("failed to set GLOBAL_SNAPSHOT_GC")?;
612+
tokio::task::spawn({
613+
let snap_gc = snap_gc.clone();
614+
async move { snap_gc.event_toop().await }
615+
});
612616
loop {
613617
tokio::select! {
614618
_ = snap_gc_reboot_rx.recv_async() => {

src/db/gc/snapshot.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,13 @@ pub struct SnapshotGarbageCollector<DB> {
3131
recent_state_roots: i64,
3232
db_config: DbConfig,
3333
running: AtomicBool,
34-
reboot_tx: flume::Sender<()>,
3534
exported_chain_head: RwLock<Option<Tipset>>,
3635
blessed_lite_snapshot: RwLock<Option<PathBuf>>,
3736
db: RwLock<Option<Arc<DB>>>,
37+
reboot_tx: flume::Sender<()>,
38+
trigger_tx: flume::Sender<()>,
39+
trigger_rx: flume::Receiver<()>,
40+
progress_tx: RwLock<Option<flume::Sender<()>>>,
3841
}
3942

4043
impl<DB> SnapshotGarbageCollector<DB>
@@ -47,46 +50,58 @@ where
4750
let car_db_dir = db_root_dir.join(CAR_DB_DIR_NAME);
4851
let recent_state_roots = config.sync.recent_state_roots;
4952
let (reboot_tx, reboot_rx) = flume::bounded(1);
53+
let (trigger_tx, trigger_rx) = flume::bounded(1);
5054
Ok((
5155
Self {
5256
db_root_dir,
5357
car_db_dir,
5458
recent_state_roots,
5559
db_config: config.db_config().clone(),
5660
running: AtomicBool::new(false),
57-
reboot_tx,
5861
exported_chain_head: RwLock::new(None),
5962
blessed_lite_snapshot: RwLock::new(None),
6063
db: RwLock::new(None),
64+
reboot_tx,
65+
trigger_tx,
66+
trigger_rx,
67+
progress_tx: RwLock::new(None),
6168
},
6269
reboot_rx,
6370
))
6471
}
6572

66-
pub fn running(&self) -> bool {
67-
self.running.load(Ordering::Relaxed)
68-
}
69-
7073
pub fn set_db(&self, db: Arc<DB>) {
7174
*self.db.write() = Some(db);
7275
}
7376

74-
pub async fn run(&self) -> anyhow::Result<()> {
75-
if self.running() {
76-
anyhow::bail!("snap gc has already been running");
77+
pub async fn event_toop(&self) {
78+
while self.trigger_rx.recv_async().await.is_ok() {
79+
if self.running.load(Ordering::Relaxed) {
80+
tracing::warn!("snap gc has already been running");
81+
} else {
82+
self.running.store(true, Ordering::Relaxed);
83+
if let Err(e) = self.export_snapshot().await {
84+
tracing::warn!("{e}");
85+
}
86+
}
7787
}
78-
if let Err(e) = self.export_snapshot().await {
79-
self.running.store(false, Ordering::Relaxed);
80-
Err(e)
81-
} else {
82-
Ok(())
88+
}
89+
90+
pub fn trigger(&self) -> flume::Receiver<()> {
91+
let (progress_tx, progress_rx) = flume::unbounded();
92+
*self.progress_tx.write() = Some(progress_tx);
93+
if self.trigger_tx.try_send(()).is_err() {
94+
tracing::warn!("snap gc has already been triggered");
8395
}
96+
progress_rx
8497
}
8598

8699
async fn export_snapshot(&self) -> anyhow::Result<()> {
87100
let db = self.db.read().clone().context("db not yet initialzied")?;
88-
self.running.store(true, Ordering::Relaxed);
89-
tracing::info!("exporting lite snapshot");
101+
tracing::info!(
102+
"exporting lite snapshot with {} recent state roots",
103+
self.recent_state_roots
104+
);
90105
let temp_path = tempfile::NamedTempFile::new_in(&self.car_db_dir)?.into_temp_path();
91106
let file = tokio::fs::File::create(&temp_path).await?;
92107
let (head_ts, _) = crate::chain::export_from_head::<Sha256>(
@@ -97,9 +112,11 @@ where
97112
true,
98113
)
99114
.await?;
100-
let target_path = self
101-
.car_db_dir
102-
.join(format!("lite_{}.forest.car.zst", head_ts.epoch()));
115+
let target_path = self.car_db_dir.join(format!(
116+
"lite_{}_{}.forest.car.zst",
117+
self.recent_state_roots,
118+
head_ts.epoch()
119+
));
103120
temp_path.persist(&target_path)?;
104121
tracing::info!("exported lite snapshot at {}", target_path.display());
105122
*self.exported_chain_head.write() = Some(head_ts);
@@ -112,6 +129,7 @@ where
112129
}
113130

114131
pub async fn cleanup_before_reboot(&self) {
132+
drop(self.progress_tx.write().take());
115133
if let Err(e) = self.cleanup_before_reboot_inner().await {
116134
tracing::warn!("{e}");
117135
}

src/rpc/methods/chain.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ impl RpcMethod<0> for ChainPruneSnapshot {
200200
const NAME: &'static str = "Forest.SnapshotGC";
201201
const PARAM_NAMES: [&'static str; 0] = [];
202202
const API_PATHS: BitFlags<ApiPaths> = ApiPaths::all();
203-
const PERMISSION: Permission = Permission::Read;
203+
const PERMISSION: Permission = Permission::Admin;
204204

205205
type Params = ();
206206
type Ok = ();
@@ -210,7 +210,9 @@ impl RpcMethod<0> for ChainPruneSnapshot {
210210
(): Self::Params,
211211
) -> Result<Self::Ok, ServerError> {
212212
if let Some(gc) = crate::daemon::GLOBAL_SNAPSHOT_GC.get() {
213-
Ok(gc.run().await?)
213+
let progress_rx = gc.trigger();
214+
while progress_rx.recv_async().await.is_ok() {}
215+
Ok(())
214216
} else {
215217
Err(anyhow::anyhow!("snapshot gc is not enabled").into())
216218
}

0 commit comments

Comments
 (0)