Skip to content

Commit c443f82

Browse files
committed
fix: immediately retry sync if a getchunk/putchunk fails due to stale inventory data
1 parent 7c37f89 commit c443f82

File tree

3 files changed

+59
-17
lines changed

3 files changed

+59
-17
lines changed

stackslib/src/net/stackerdb/db.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,8 @@ impl<'a> StackerDBTx<'a> {
323323
}
324324
}
325325

326+
debug!("Reset slot {} of {}", slot_id, smart_contract);
327+
326328
// new slot, or existing slot with a different signer
327329
let qry = "INSERT OR REPLACE INTO chunks (stackerdb_id,signer,slot_id,version,write_time,data,data_hash,signature) VALUES (?1,?2,?3,?4,?5,?6,?7,?8)";
328330
let mut stmt = self.sql_tx.prepare(&qry)?;

stackslib/src/net/stackerdb/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,8 @@ pub struct StackerDBSync<NC: NeighborComms> {
388388
/// whether or not we should immediately re-fetch chunks because we learned about new chunks
389389
/// from our peers when they replied to our chunk-pushes with new inventory state
390390
need_resync: bool,
391+
/// whether or not the fetched inventory was determined to be stale
392+
stale_inv: bool,
391393
/// Track stale neighbors
392394
pub(crate) stale_neighbors: HashSet<NeighborAddress>,
393395
/// How many attempted connections have been made in the last pass (gets reset)
@@ -466,7 +468,9 @@ impl PeerNetwork {
466468
Err(e) => {
467469
debug!(
468470
"{:?}: failed to get chunk versions for {}: {:?}",
469-
self.local_peer, contract_id, &e
471+
self.get_local_peer(),
472+
contract_id,
473+
&e
470474
);
471475

472476
// most likely indicates that this DB doesn't exist
@@ -475,6 +479,14 @@ impl PeerNetwork {
475479
};
476480

477481
let num_outbound_replicas = self.count_outbound_stackerdb_replicas(contract_id) as u32;
482+
483+
debug!(
484+
"{:?}: inventory for {} has {} outbound replicas; versions are {:?}",
485+
self.get_local_peer(),
486+
contract_id,
487+
num_outbound_replicas,
488+
&slot_versions
489+
);
478490
StacksMessageType::StackerDBChunkInv(StackerDBChunkInvData {
479491
slot_versions,
480492
num_outbound_replicas,

stackslib/src/net/stackerdb/sync.rs

Lines changed: 44 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
7171
total_pushed: 0,
7272
last_run_ts: 0,
7373
need_resync: false,
74+
stale_inv: false,
7475
stale_neighbors: HashSet::new(),
7576
num_connections: 0,
7677
num_attempted_connections: 0,
@@ -210,6 +211,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
210211
self.write_freq = config.write_freq;
211212

212213
self.need_resync = false;
214+
self.stale_inv = false;
213215
self.last_run_ts = get_epoch_time_secs();
214216

215217
self.state = StackerDBSyncState::ConnectBegin;
@@ -253,7 +255,7 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
253255
.get_slot_write_timestamps(&self.smart_contract_id)?;
254256

255257
if local_slot_versions.len() != local_write_timestamps.len() {
256-
let msg = format!("Local slot versions ({}) out of sync with DB slot versions ({}); abandoning sync and trying again", local_slot_versions.len(), local_write_timestamps.len());
258+
let msg = format!("Local slot versions ({}) out of sync with DB slot versions ({}) for {}; abandoning sync and trying again", local_slot_versions.len(), local_write_timestamps.len(), &self.smart_contract_id);
257259
warn!("{}", &msg);
258260
return Err(net_error::Transient(msg));
259261
}
@@ -267,12 +269,13 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
267269
let write_ts = local_write_timestamps[i];
268270
if write_ts + self.write_freq > now {
269271
debug!(
270-
"{:?}: Chunk {} was written too frequently ({} + {} >= {}), so will not fetch chunk",
272+
"{:?}: Chunk {} was written too frequently ({} + {} >= {}) in {}, so will not fetch chunk",
271273
network.get_local_peer(),
272274
i,
273275
write_ts,
274276
self.write_freq,
275-
now
277+
now,
278+
&self.smart_contract_id,
276279
);
277280
continue;
278281
}
@@ -340,10 +343,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
340343
schedule.reverse();
341344

342345
debug!(
343-
"{:?}: Will request up to {} chunks for {}",
346+
"{:?}: Will request up to {} chunks for {}. Schedule: {:?}",
344347
network.get_local_peer(),
345348
&schedule.len(),
346349
&self.smart_contract_id,
350+
&schedule
347351
);
348352
Ok(schedule)
349353
}
@@ -507,12 +511,13 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
507511
if *old_version < new_inv.slot_versions[old_slot_id] {
508512
// remote peer indicated that it has a newer version of this chunk.
509513
debug!(
510-
"{:?}: peer {:?} has a newer version of slot {} ({} < {})",
514+
"{:?}: peer {:?} has a newer version of slot {} ({} < {}) in {}",
511515
_network.get_local_peer(),
512516
&naddr,
513517
old_slot_id,
514518
old_version,
515-
new_inv.slot_versions[old_slot_id]
519+
new_inv.slot_versions[old_slot_id],
520+
&self.smart_contract_id,
516521
);
517522
resync = true;
518523
break;
@@ -621,9 +626,10 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
621626
self.replicas = replicas;
622627
}
623628
debug!(
624-
"{:?}: connect_begin: establish StackerDB sessions to {} neighbors",
629+
"{:?}: connect_begin: establish StackerDB sessions to {} neighbors (out of {} p2p peers)",
625630
network.get_local_peer(),
626-
self.replicas.len()
631+
self.replicas.len(),
632+
network.get_num_p2p_convos()
627633
);
628634
if self.replicas.len() == 0 {
629635
// nothing to do
@@ -820,9 +826,10 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
820826
}
821827
StacksMessageType::Nack(data) => {
822828
debug!(
823-
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunksInv us with code {}",
829+
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunksInv us (on {}) with code {}",
824830
&network.get_local_peer(),
825831
&naddr,
832+
&self.smart_contract_id,
826833
data.error_code
827834
);
828835
self.connected_replicas.remove(&naddr);
@@ -838,9 +845,10 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
838845
}
839846
};
840847
debug!(
841-
"{:?}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}",
848+
"{:?}: getchunksinv_try_finish: Received StackerDBChunkInv from {:?}: {:?}",
842849
network.get_local_peer(),
843-
&naddr
850+
&naddr,
851+
&chunk_inv_opt
844852
);
845853

846854
if let Some(chunk_inv) = chunk_inv_opt {
@@ -956,14 +964,17 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
956964
StacksMessageType::StackerDBChunk(data) => data,
957965
StacksMessageType::Nack(data) => {
958966
debug!(
959-
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunk with code {}",
967+
"{:?}: remote peer {:?} NACK'ed our StackerDBGetChunk (on {}) with code {}",
960968
network.get_local_peer(),
961969
&naddr,
970+
&self.smart_contract_id,
962971
data.error_code
963972
);
964-
self.connected_replicas.remove(&naddr);
965973
if data.error_code == NackErrorCodes::StaleView {
966974
self.stale_neighbors.insert(naddr);
975+
} else if data.error_code == NackErrorCodes::StaleVersion {
976+
// try again immediately, without throttling
977+
self.stale_inv = true;
967978
}
968979
continue;
969980
}
@@ -1068,7 +1079,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
10681079
&selected_neighbor,
10691080
&e
10701081
);
1071-
self.connected_replicas.remove(&selected_neighbor);
10721082
continue;
10731083
}
10741084

@@ -1107,7 +1117,6 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
11071117
&naddr,
11081118
data.error_code
11091119
);
1110-
self.connected_replicas.remove(&naddr);
11111120
if data.error_code == NackErrorCodes::StaleView {
11121121
self.stale_neighbors.insert(naddr);
11131122
}
@@ -1199,8 +1208,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
11991208
let done = self.connect_begin(network)?;
12001209
if done {
12011210
self.state = StackerDBSyncState::ConnectFinish;
1202-
blocked = false;
1211+
} else {
1212+
// no replicas; try again
1213+
self.state = StackerDBSyncState::Finished;
12031214
}
1215+
blocked = false;
12041216
}
12051217
StackerDBSyncState::ConnectFinish => {
12061218
let done = self.connect_try_finish(network)?;
@@ -1248,6 +1260,11 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
12481260
{
12491261
// someone pushed newer chunk data to us, and getting chunks is
12501262
// enabled, so immediately go request them
1263+
debug!(
1264+
"{:?}: immediately retry StackerDB GetChunks on {} due to PushChunk NACK",
1265+
network.get_local_peer(),
1266+
&self.smart_contract_id
1267+
);
12511268
self.recalculate_chunk_request_schedule(network)?;
12521269
self.state = StackerDBSyncState::GetChunks;
12531270
} else {
@@ -1259,8 +1276,19 @@ impl<NC: NeighborComms> StackerDBSync<NC> {
12591276
}
12601277
}
12611278
StackerDBSyncState::Finished => {
1279+
let stale_inv = self.stale_inv;
1280+
12621281
let result = self.reset(Some(network), config);
12631282
self.state = StackerDBSyncState::ConnectBegin;
1283+
1284+
if stale_inv {
1285+
debug!(
1286+
"{:?}: immediately retry StackerDB sync on {} due to stale inventory",
1287+
network.get_local_peer(),
1288+
&self.smart_contract_id
1289+
);
1290+
self.wakeup();
1291+
}
12641292
return Ok(Some(result));
12651293
}
12661294
};

0 commit comments

Comments
 (0)