@@ -43,6 +43,22 @@ using node::GetTransaction;
4343namespace llmq {
4444static const std::string_view INPUTLOCK_REQUESTID_PREFIX = " inlock" ;
4545
46+ namespace {
47+ template <typename T>
48+ std::unordered_set<uint256, StaticSaltedHasher> GetIdsFromLockable (const std::vector<T>& vec)
49+ {
50+ static_assert (std::is_same_v<T, CTxIn> || std::is_same_v<T, COutPoint>, " Unexpected type" );
51+
52+ std::unordered_set<uint256, StaticSaltedHasher> ret{};
53+ if (vec.empty ()) return ret;
54+ ret.reserve (vec.size ());
55+ for (const auto & in : vec) {
56+ ret.emplace (::SerializeHash (std::make_pair (INPUTLOCK_REQUESTID_PREFIX, in)));
57+ }
58+ return ret;
59+ }
60+ } // anonymous namespace
61+
4662CInstantSendManager::CInstantSendManager (CChainLocksHandler& _clhandler, CChainState& chainstate, CQuorumManager& _qman,
4763 CSigningManager& _sigman, CSigSharesManager& _shareman, CSporkManager& sporkman,
4864 CTxMemPool& _mempool, const CMasternodeSync& mn_sync, bool is_masternode,
@@ -76,14 +92,14 @@ void CInstantSendManager::Start(PeerManager& peerman)
7692 workThread = std::thread (&util::TraceThread, " isman" , [this , &peerman] { WorkThreadMain (peerman); });
7793
7894 if (m_activeman) {
79- sigman. RegisterRecoveredSigsListener ( m_activeman. get () );
95+ m_activeman-> Start ( );
8096 }
8197}
8298
8399void CInstantSendManager::Stop ()
84100{
85101 if (m_activeman) {
86- sigman. UnregisterRecoveredSigsListener ( m_activeman. get () );
102+ m_activeman-> Stop ( );
87103 }
88104
89105 // make sure to call InterruptWorkerThread() first
@@ -346,9 +362,7 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, PeerManager& peerm
346362 LogPrint (BCLog::INSTANTSEND, " CInstantSendManager::%s -- txid=%s, islock=%s: processing islock, peer=%d\n " , __func__,
347363 islock->txid .ToString (), hash.ToString (), from);
348364 if (m_activeman) {
349- LOCK (m_activeman->cs_creating );
350- m_activeman->creatingInstantSendLocks .erase (islock->GetRequestId ());
351- m_activeman->txToCreatingInstantSendLocks .erase (islock->txid );
365+ m_activeman->ClearLockFromQueue (islock);
352366 }
353367 if (db.KnownInstantSendLock (hash)) {
354368 return ;
@@ -594,23 +608,28 @@ void CInstantSendManager::RemoveNonLockedTx(const uint256& txid, bool retryChild
594608void CInstantSendManager::RemoveConflictedTx (const CTransaction& tx)
595609{
596610 RemoveNonLockedTx (tx.GetHash (), false );
597- if (!m_activeman) return ;
598-
599- LOCK (m_activeman->cs_inputReqests );
600- for (const auto & in : tx.vin ) {
601- auto inputRequestId = ::SerializeHash (std::make_pair (INPUTLOCK_REQUESTID_PREFIX, in));
602- m_activeman->inputRequestIds .erase (inputRequestId);
611+ if (m_activeman) {
612+ m_activeman->ClearInputsFromQueue (GetIdsFromLockable (tx.vin ));
603613 }
604614}
605615
606616void CInstantSendManager::TruncateRecoveredSigsForInputs (const llmq::CInstantSendLock& islock)
607617{
608- if (!m_activeman) return ;
618+ auto ids = GetIdsFromLockable (islock.inputs );
619+ if (m_activeman) {
620+ m_activeman->ClearInputsFromQueue (ids);
621+ }
622+ for (const auto & id : ids) {
623+ sigman.TruncateRecoveredSig (Params ().GetConsensus ().llmqTypeDIP0024InstantSend , id);
624+ }
625+ }
609626
610- for (const auto & in : islock.inputs ) {
611- auto inputRequestId = ::SerializeHash (std::make_pair (INPUTLOCK_REQUESTID_PREFIX, in));
612- WITH_LOCK (m_activeman->cs_inputReqests , m_activeman->inputRequestIds .erase (inputRequestId));
613- sigman.TruncateRecoveredSig (Params ().GetConsensus ().llmqTypeDIP0024InstantSend , inputRequestId);
627+ void CInstantSendManager::TryEmplacePendingLock (const uint256& hash, const NodeId id, const CInstantSendLockPtr& islock)
628+ {
629+ if (db.KnownInstantSendLock (hash)) return ;
630+ LOCK (cs_pendingLocks);
631+ if (!pendingInstantSendLocks.count (hash)) {
632+ pendingInstantSendLocks.emplace (hash, std::make_pair (id, islock));
614633 }
615634}
616635
@@ -912,10 +931,30 @@ size_t CInstantSendManager::GetInstantSendLockCount() const
912931void CInstantSendManager::WorkThreadMain (PeerManager& peerman)
913932{
914933 while (!workInterrupt) {
915- bool fMoreWork = ProcessPendingInstantSendLocks (peerman);
916- if (m_activeman) {
917- m_activeman->ProcessPendingRetryLockTxs ();
918- }
934+ bool fMoreWork {false };
935+ do {
936+ if (!IsInstantSendEnabled ()) break ;
937+ fMoreWork = ProcessPendingInstantSendLocks (peerman);
938+ if (!m_activeman) break ;
939+ // Construct set of non-locked transactions that are pending to retry
940+ std::vector<CTransactionRef> txns{};
941+ {
942+ LOCK2 (cs_nonLocked, cs_pendingRetry);
943+ if (pendingRetryTxs.empty ()) break ;
944+ txns.reserve (pendingRetryTxs.size ());
945+ for (const auto & txid : pendingRetryTxs) {
946+ if (auto it = nonLockedTxs.find (txid); it != nonLockedTxs.end ()) {
947+ const auto & [_, tx_info] = *it;
948+ if (tx_info.tx ) {
949+ txns.push_back (tx_info.tx );
950+ }
951+ }
952+ }
953+ txns.shrink_to_fit ();
954+ }
955+ // Retry processing them
956+ m_activeman->ProcessPendingRetryLockTxs (txns);
957+ } while (0 );
919958
920959 if (!fMoreWork && !workInterrupt.sleep_for (std::chrono::milliseconds (100 ))) {
921960 return ;
0 commit comments