Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ else()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -pthread ")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Werror -pthread -fvisibility=hidden")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -ggdb3")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -march=native")

if (EMSCRIPTEN)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -O3 -s USE_PTHREADS=1 \
Expand Down
2 changes: 1 addition & 1 deletion core/proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -480,7 +480,7 @@ namespace proto {
static const uint8_t Viewer = 'V';
};

static const uint32_t g_HdrPackMaxSize = 2048; // about 400K
static const uint32_t g_HdrPackMaxSize = 8192; // 8x original for faster sync

struct Event
{
Expand Down
29 changes: 27 additions & 2 deletions node/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,13 @@ void NodeDB::Open(const char* szPath)
sqlite3_busy_timeout(m_pDb, 5000);

ExecTextOut("PRAGMA locking_mode = EXCLUSIVE");
ExecTextOut("PRAGMA journal_mode = WAL"); // Write-Ahead Logging for better concurrent read/write performance
ExecTextOut("PRAGMA synchronous = NORMAL"); // Safe with WAL, avoids fsync on every commit
ExecTextOut("PRAGMA cache_size = -131072"); // 128MB page cache (vs default ~2MB)
ExecTextOut("PRAGMA mmap_size = 268435456"); // 256MB memory-mapped I/O
ExecTextOut("PRAGMA temp_store = MEMORY"); // In-memory temp tables
ExecTextOut("PRAGMA wal_autocheckpoint = 2000"); // Moderate checkpoint frequency (default 1000, ~8MB WAL threshold)
ExecTextOut("PRAGMA page_size = 4096"); // Optimal page size
ExecTextOut("PRAGMA journal_size_limit=1048576"); // limit journal file, otherwise it may remain huge even after tx commit, until the app is closed

bool bCreate;
Expand Down Expand Up @@ -1675,6 +1682,23 @@ void NodeDB::DelStateBlockPP(uint64_t rowid)
TestChanged1Row();
}

void NodeDB::ActivateState(uint64_t rowid)
{
Recordset rs(*this, Query::StateActivateOnly, "UPDATE " TblStates " SET " TblStates_Flags "=" TblStates_Flags " | ? WHERE rowid=?");
rs.put(0, StateFlags::Active);
rs.put(1, rowid);
rs.Step();
TestChanged1Row();
}

void NodeDB::DelStateBlockPPRange(uint64_t hMin, uint64_t hMax)
{
Recordset rs(*this, Query::StateDelBlockPPRange, "UPDATE " TblStates " SET " TblStates_BodyP "=NULL," TblStates_Peer "=NULL WHERE " TblStates_Number ">=? AND " TblStates_Number "<=? AND " TblStates_BodyP " IS NOT NULL");
rs.put(0, hMin);
rs.put(1, hMax);
rs.Step();
}

void NodeDB::DelStateBlockPPR(uint64_t rowid)
{
Recordset rs(*this, Query::StateDelBlockPPR, "UPDATE " TblStates " SET " TblStates_BodyP "=NULL," TblStates_Rollback "=NULL," TblStates_Peer "=NULL WHERE rowid=?");
Expand Down Expand Up @@ -2404,15 +2428,16 @@ void NodeDB::TxoDelFrom(TxoID id)
rs.Step();
}

void NodeDB::TxoSetSpent(TxoID id, Height h)
void NodeDB::TxoSetSpent(TxoID id, Height h, bool bMustExist)
{
Recordset rs(*this, Query::TxoSetSpent, "UPDATE " TblTxo " SET " TblTxo_SpendHeight "=? WHERE " TblTxo_ID "=?");
if (MaxHeight != h)
rs.put(0, h);
rs.put(1, id);

rs.Step();
TestChanged1Row();
if (bMustExist)
TestChanged1Row();
}

void NodeDB::EnumTxos(WalkerTxo& wlk, TxoID id0)
Expand Down
7 changes: 6 additions & 1 deletion node/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,9 @@ class NodeDB
Dbg3,
Dbg4,

StateActivateOnly,
StateDelBlockPPRange,

count
};
};
Expand Down Expand Up @@ -380,6 +383,8 @@ class NodeDB
void DelStateBlockPP(uint64_t rowid); // delete perishable, peer. Keep eternal, extra, txos, rollback
void DelStateBlockPPR(uint64_t rowid); // delete perishable, rollback, peer. Keep eternal, extra, txos
void DelStateBlockAll(uint64_t rowid); // delete perishable, peer, eternal, extra, txos, rollback
void DelStateBlockPPRange(uint64_t hMin, uint64_t hMax); // batch delete perishable+peer for height range
void ActivateState(uint64_t rowid); // set Active flag without cursor update

TxoID FindStateByTxoID(StateID&, TxoID); // returns the Txos at state end

Expand Down Expand Up @@ -583,7 +588,7 @@ class NodeDB
void TxoAdd(TxoID, const Blob&);
void TxoDel(TxoID);
void TxoDelFrom(TxoID);
void TxoSetSpent(TxoID, Height);
void TxoSetSpent(TxoID, Height, bool bMustExist = true);

struct WalkerTxo
{
Expand Down
123 changes: 99 additions & 24 deletions node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,23 @@ void Node::Wanted::OnTimer()

void Node::TryAssignTask(Task& t)
{
// Prioritize w.r.t. rating!
// For body tasks, first try peers that don't already have body tasks (distribute across peers)
if (t.m_Key.second)
{
for (PeerMan::LiveSet::iterator it = m_PeerMan.m_LiveSet.begin(); m_PeerMan.m_LiveSet.end() != it; ++it)
{
Peer& p = *it->m_p;
bool bHasBody = false;
for (TaskList::iterator jt = p.m_lstTasks.begin(); p.m_lstTasks.end() != jt; ++jt)
{
if (jt->m_Key.second) { bHasBody = true; break; }
}
if (!bHasBody && TryAssignTask(t, p))
return;
}
}

// Fall through: try any peer by rating (original behavior)
for (PeerMan::LiveSet::iterator it = m_PeerMan.m_LiveSet.begin(); m_PeerMan.m_LiveSet.end() != it; ++it)
{
Peer& p = *it->m_p;
Expand Down Expand Up @@ -414,12 +430,16 @@ bool Node::TryAssignTask(Task& t, Peer& p)
if (t.m_Key.first.m_Number.v <= m_Processor.m_SyncData.m_Target.m_Number.v)
{
// fast-sync mode, diluted blocks request.
// m_Top must always be the sync target for protocol compatibility.
// The peer sends blocks starting from (m_Top - m_CountExtra).
msg.m_Top.m_Number = m_Processor.m_SyncData.m_Target.m_Number;
if (m_Processor.IsFastSync())
m_Processor.get_DB().get_StateHash(m_Processor.m_SyncData.m_Target.m_Row, msg.m_Top.m_Hash);
else
msg.m_Top.m_Hash = Zero; // treasury

// CountExtra = distance from task start to sync target.
// This tells the peer where to begin sending blocks.
msg.m_CountExtra.v = m_Processor.m_SyncData.m_Target.m_Number.v - t.m_Key.first.m_Number.v;
msg.m_Block0 = m_Processor.m_SyncData.m_n0;
msg.m_HorizonLo1 = m_Processor.m_SyncData.m_TxoLo;
Expand All @@ -443,18 +463,15 @@ bool Node::TryAssignTask(Task& t, Peer& p)
}
else
{
const uint32_t nMaxHdrRequests = proto::g_HdrPackMaxSize * 2;
const uint32_t nMaxHdrRequests = proto::g_HdrPackMaxSize * 4;
if (m_nTasksPackHdr >= nMaxHdrRequests)
{
BEAM_LOG_VERBOSE() << "too many hdrs requested";
return false; // too many hdrs requested
}

if (nBlocks)
{
BEAM_LOG_VERBOSE() << "peer busy";
return false; // don't requests headers from the peer that transfers a block
}
// Removed "peer busy" check: allow header + block downloads simultaneously
// during fast sync, headers and blocks are independent data

uint32_t nPackSize = std::min(proto::g_HdrPackMaxSize, nMaxHdrRequests - m_nTasksPackHdr);

Expand Down Expand Up @@ -500,6 +517,34 @@ bool Node::TryAssignTask(Task& t, Peer& p)
return true;
}

void Node::Peer::UpdateResponseTime(uint32_t elapsed_ms)
{
if (m_nResponseSamples < 100)
m_nResponseSamples++;
// Exponential moving average
m_AvgResponseTime_ms = (m_AvgResponseTime_ms * (m_nResponseSamples - 1) + elapsed_ms) / m_nResponseSamples;
}

uint32_t Node::Peer::GetAdaptiveTimeout(bool bBlock) const
{
uint32_t base_ms = bBlock ?
m_This.m_Cfg.m_Timeout.m_GetBlock_ms :
m_This.m_Cfg.m_Timeout.m_GetState_ms;

if (m_nResponseSamples < 3)
return base_ms; // not enough data yet

// Adaptive: 3x average response time, clamped between 5s and base timeout
uint32_t adaptive_ms = m_AvgResponseTime_ms * 3;
const uint32_t min_ms = 5000;
if (adaptive_ms < min_ms)
adaptive_ms = min_ms;
if (adaptive_ms > base_ms)
adaptive_ms = base_ms;

return adaptive_ms;
}

void Node::Peer::SetTimerWrtFirstTask()
{
if (m_lstTasks.empty())
Expand All @@ -509,11 +554,8 @@ void Node::Peer::SetTimerWrtFirstTask()
}
else
{
// TODO - timer w.r.t. rating, i.e. should not exceed much the best avail peer rating

uint32_t timeout_ms = m_lstTasks.front().m_Key.second ?
m_This.m_Cfg.m_Timeout.m_GetBlock_ms :
m_This.m_Cfg.m_Timeout.m_GetState_ms;
bool bBlock = m_lstTasks.front().m_Key.second;
uint32_t timeout_ms = GetAdaptiveTimeout(bBlock);

if (!m_pTimerRequest)
m_pTimerRequest = io::Timer::create(io::Reactor::get_Current());
Expand Down Expand Up @@ -923,7 +965,11 @@ void Node::Processor::OnGoUpTimer()
{
m_bGoUpPending = false;
TryGoUp();
get_ParentObj().RefreshCongestions();
if (m_bCongestionCacheDirty)
{
// Only refresh if TryGoUp made progress (changed cursor/reachability)
get_ParentObj().RefreshCongestions();
}
get_ParentObj().UpdateSyncStatus();
}

Expand Down Expand Up @@ -2097,6 +2143,9 @@ void Node::Peer::ModifyRatingWrtData(size_t nSize)
PeerManager::TimePoint tp;
uint32_t dt_ms = tp.get() - get_FirstTask().m_TimeAssigned_ms;

// Update adaptive timeout tracking
UpdateResponseTime(dt_ms);

// Calculate the weighted average of the effective bandwidth.
// We assume the "previous" bandwidth bw0 was calculated within "previous" window t0, and the total download amount was v0 = t0 * bw0.
// Hence, after accounting for newly-downloaded data, the average bandwidth becomes:
Expand Down Expand Up @@ -2246,24 +2295,50 @@ void Node::Peer::OnMsg(proto::HdrPack&& msg)
if (!(idLast == t.m_Key.first))
ThrowUnexpected();

for (size_t i = 0; i < v.size(); i++)
// Process headers HIGH-to-LOW (reverse order) during fast sync.
// This eliminates tip table churn: each header finds its child (already inserted)
// rather than its prev, avoiding ~16K TipAdd/TipDel SQL ops per pack.
// v.back() = highest height, v[0] = lowest height.
if (m_This.m_Processor.IsFastSync())
{
NodeProcessor::DataStatus::Enum eStatus = m_This.m_Processor.OnStateSilent(v[i], m_pInfo->m_ID.m_Key, idLast, true);
switch (eStatus)
for (size_t i = v.size(); i > 0; i--)
{
case NodeProcessor::DataStatus::Invalid:
// though PoW was already tested, header can still be invalid. For instance, due to improper Timestamp
ThrowUnexpected();
break;
NodeProcessor::DataStatus::Enum eStatus = m_This.m_Processor.OnStateSilent(v[i-1], m_pInfo->m_ID.m_Key, idLast, true);
if (NodeProcessor::DataStatus::Invalid == eStatus)
ThrowUnexpected();
}
}
else
{
for (size_t i = 0; i < v.size(); i++)
{
NodeProcessor::DataStatus::Enum eStatus = m_This.m_Processor.OnStateSilent(v[i], m_pInfo->m_ID.m_Key, idLast, true);
switch (eStatus)
{
case NodeProcessor::DataStatus::Invalid:
ThrowUnexpected();
break;

case NodeProcessor::DataStatus::Accepted:
// no break;
case NodeProcessor::DataStatus::Accepted:
// no break;

default:
break; // suppress warning
default:
break; // suppress warning
}
}
}

// Proactive next-header request: create task for next batch BEFORE releasing current task.
// This overlaps the network round-trip with RefreshCongestions processing.
// The task key will match what EnumCongestions creates, so it survives RefreshCongestions.
if (m_This.m_Processor.IsFastSync() && !v.empty() && v[0].m_Number.v > 1)
{
Block::SystemState::ID nextId;
nextId.m_Number.v = v[0].m_Number.v - 1;
nextId.m_Hash = v[0].m_Prev;
m_This.m_Processor.RequestData(nextId, false, t.m_sidTrg);
}

BEAM_LOG_INFO() << "Hdr pack received " << msg.m_Prefix.m_Number.v << "-" << idLast;

ModifyRatingWrtData(sizeof(msg.m_Prefix) + msg.m_vElements.size() * sizeof(msg.m_vElements.front()));
Expand Down
14 changes: 10 additions & 4 deletions node/node.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct Node
uint32_t m_PeersDbFlush_ms = 1000 * 60; // 1 minute
} m_Timeout;

uint32_t m_MaxConcurrentBlocksRequest = 18;
uint32_t m_MaxConcurrentBlocksRequest = 200000; // high limit to allow parallel body downloads during fast sync
uint32_t m_MaxPoolTransactions = 100 * 1000;
uint32_t m_MaxDeferredTransactions = 100 * 1000;
uint32_t m_MiningThreads = 0; // by default disabled
Expand All @@ -87,7 +87,7 @@ struct Node
// Number of verification threads for CPU-hungry cryptography. Currently used for block validation only.
// 0: single threaded
// negative: number of cores minus number of mining threads.
int m_VerificationThreads = 0;
int m_VerificationThreads = -1; // use all available cores for verification

struct RollbackLimit
{
Expand Down Expand Up @@ -123,8 +123,8 @@ struct Node
size_t m_Chocking = 1024 * 1024;
size_t m_Drown = 1024*1024 * 20;

size_t m_MaxBodyPackSize = 1024 * 1024 * 5;
uint32_t m_MaxBodyPackCount = 3000;
size_t m_MaxBodyPackSize = 1024 * 1024 * 50; // 50MB for faster sync
uint32_t m_MaxBodyPackCount = 16000; // increased pack count

} m_BandwidthCtl;

Expand Down Expand Up @@ -578,6 +578,12 @@ struct Node
io::Timer::Ptr m_pTimerRequest;
io::Timer::Ptr m_pTimerPeers;

// Adaptive timeout tracking
uint32_t m_AvgResponseTime_ms = 0;
uint32_t m_nResponseSamples = 0;
void UpdateResponseTime(uint32_t elapsed_ms);
uint32_t GetAdaptiveTimeout(bool bBlock) const;

Peer(Node& n) :m_This(n) {}

void TakeTasks();
Expand Down
Loading
Loading