Backport Bitcoin PR#8969: Decouple peer-processing-logic from block-connection-logic (#2) (#1558)

* Move MarkBlockAsReceived out of ProcessNewMessage

* Remove network state wipe from UnloadBlockIndex.

UnloadBlockIndex is only used during init if we end up reindexing
to clear our block state so that we can start over. However, at
that time no connections have been brought up as CConnman hasn't
been started yet, so all of the network processing state logic is
empty when its called.

Additionally, the initialization of the recentRejects set is moved
to InitPeerLogic.

* Move all calls to CheckBlockIndex out of net-processing logic

This will result in many more calls to CheckBlockIndex when
connecting a list of headers (eg in ::HEADERS messages processing)
but its only enabled in debug mode, and that should mostly just be
during IBD, so it should be OK.

* Move FlushStateToDisk call out of ProcessMessages::TX into ATMP

* Move nTimeBestReceived updating into net processing code
This commit is contained in:
Oleg Girko 2017-08-01 16:11:32 +01:00 committed by UdjinM6
parent a3c8cb20df
commit e7e106e228
3 changed files with 43 additions and 25 deletions

View File

@ -1314,6 +1314,10 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler)
} // (!fDisableWallet) } // (!fDisableWallet)
#endif // ENABLE_WALLET #endif // ENABLE_WALLET
// ********************************************************* Step 6: network initialization // ********************************************************* Step 6: network initialization
// Note that we absolutely cannot open any actual connections
// until the very end ("start node") as the UTXO/block state
// is not yet setup and may end up being set up twice if we
// need to reindex later.
assert(!g_connman); assert(!g_connman);
g_connman = std::unique_ptr<CConnman>(new CConnman()); g_connman = std::unique_ptr<CConnman>(new CConnman());

View File

@ -71,7 +71,7 @@ CCriticalSection cs_main;
BlockMap mapBlockIndex; BlockMap mapBlockIndex;
CChain chainActive; CChain chainActive;
CBlockIndex *pindexBestHeader = NULL; CBlockIndex *pindexBestHeader = NULL;
int64_t nTimeBestReceived = 0; int64_t nTimeBestReceived = 0; // Used only to inform the wallet of when we last received a block
CWaitableCriticalSection csBestBlock; CWaitableCriticalSection csBestBlock;
CConditionVariable cvBlockChange; CConditionVariable cvBlockChange;
int nScriptCheckThreads = 0; int nScriptCheckThreads = 0;
@ -630,6 +630,16 @@ CBlockIndex* FindForkInGlobalIndex(const CChain& chain, const CBlockLocator& loc
CCoinsViewCache *pcoinsTip = NULL; CCoinsViewCache *pcoinsTip = NULL;
CBlockTreeDB *pblocktree = NULL; CBlockTreeDB *pblocktree = NULL;
enum FlushStateMode {
FLUSH_STATE_NONE,
FLUSH_STATE_IF_NEEDED,
FLUSH_STATE_PERIODIC,
FLUSH_STATE_ALWAYS
};
// See definition for documentation
bool static FlushStateToDisk(CValidationState &state, FlushStateMode mode);
////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////
// //
// mapOrphanTransactions // mapOrphanTransactions
@ -1556,6 +1566,9 @@ bool AcceptToMemoryPool(CTxMemPool& pool, CValidationState &state, const CTransa
BOOST_FOREACH(const uint256& hashTx, vHashTxToUncache) BOOST_FOREACH(const uint256& hashTx, vHashTxToUncache)
pcoinsTip->Uncache(hashTx); pcoinsTip->Uncache(hashTx);
} }
// After we've (potentially) uncached entries, ensure our coins cache is still within its size limits
CValidationState stateDummy;
FlushStateToDisk(stateDummy, FLUSH_STATE_PERIODIC);
return res; return res;
} }
@ -2814,13 +2827,6 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
return true; return true;
} }
enum FlushStateMode {
FLUSH_STATE_NONE,
FLUSH_STATE_IF_NEEDED,
FLUSH_STATE_PERIODIC,
FLUSH_STATE_ALWAYS
};
/** /**
* Update the on-disk chain state. * Update the on-disk chain state.
* The caches and indexes are flushed depending on the mode we're called with * The caches and indexes are flushed depending on the mode we're called with
@ -2941,7 +2947,6 @@ void static UpdateTip(CBlockIndex *pindexNew) {
chainActive.SetTip(pindexNew); chainActive.SetTip(pindexNew);
// New best block // New best block
nTimeBestReceived = GetTime();
mempool.AddTransactionsUpdated(1); mempool.AddTransactionsUpdated(1);
LogPrintf("%s: new best=%s height=%d log2_work=%.8g tx=%lu date=%s progress=%f cache=%.1fMiB(%utx)\n", __func__, LogPrintf("%s: new best=%s height=%d log2_work=%.8g tx=%lu date=%s progress=%f cache=%.1fMiB(%utx)\n", __func__,
@ -3876,6 +3881,8 @@ static bool AcceptBlockHeader(const CBlockHeader& block, CValidationState& state
if (ppindex) if (ppindex)
*ppindex = pindex; *ppindex = pindex;
CheckBlockIndex(chainparams.GetConsensus());
return true; return true;
} }
@ -3903,6 +3910,11 @@ static bool AcceptBlock(const CBlock& block, CValidationState& state, const CCha
// not process unrequested blocks. // not process unrequested blocks.
bool fTooFarAhead = (pindex->nHeight > int(chainActive.Height() + MIN_BLOCKS_TO_KEEP)); bool fTooFarAhead = (pindex->nHeight > int(chainActive.Height() + MIN_BLOCKS_TO_KEEP));
// TODO: Decouple this function from the block download logic by removing fRequested
// This requires some new chain datastructure to efficiently look up if a
// block is in a chain leading to a candidate for best tip, despite not
// being such a candidate itself.
// TODO: deal better with return value and error conditions for duplicate // TODO: deal better with return value and error conditions for duplicate
// and unrequested blocks. // and unrequested blocks.
if (fAlreadyHave) return true; if (fAlreadyHave) return true;
@ -3963,13 +3975,11 @@ bool ProcessNewBlock(CValidationState& state, const CChainParams& chainparams, C
{ {
{ {
LOCK(cs_main); LOCK(cs_main);
bool fRequested = MarkBlockAsReceived(pblock->GetHash());
fRequested |= fForceProcessing;
// Store to disk // Store to disk
CBlockIndex *pindex = NULL; CBlockIndex *pindex = NULL;
bool fNewBlock = false; bool fNewBlock = false;
bool ret = AcceptBlock(*pblock, state, chainparams, &pindex, fRequested, dbp, &fNewBlock); bool ret = AcceptBlock(*pblock, state, chainparams, &pindex, fForceProcessing, dbp, &fNewBlock);
if (pindex && pfrom) { if (pindex && pfrom) {
mapBlockSource[pindex->GetBlockHash()] = pfrom->GetId(); mapBlockSource[pindex->GetBlockHash()] = pfrom->GetId();
if (fNewBlock) pfrom->nLastBlockTime = GetTime(); if (fNewBlock) pfrom->nLastBlockTime = GetTime();
@ -4398,6 +4408,9 @@ bool CVerifyDB::VerifyDB(const CChainParams& chainparams, CCoinsView *coinsview,
return true; return true;
} }
// May NOT be used after any connections are up as much
// of the peer-processing logic assumes a consistent
// block index state
void UnloadBlockIndex() void UnloadBlockIndex()
{ {
LOCK(cs_main); LOCK(cs_main);
@ -4408,18 +4421,12 @@ void UnloadBlockIndex()
mempool.clear(); mempool.clear();
mapOrphanTransactions.clear(); mapOrphanTransactions.clear();
mapOrphanTransactionsByPrev.clear(); mapOrphanTransactionsByPrev.clear();
nSyncStarted = 0;
mapBlocksUnlinked.clear(); mapBlocksUnlinked.clear();
vinfoBlockFile.clear(); vinfoBlockFile.clear();
nLastBlockFile = 0; nLastBlockFile = 0;
nBlockSequenceId = 1; nBlockSequenceId = 1;
mapBlockSource.clear();
mapBlocksInFlight.clear();
nPreferredDownload = 0;
setDirtyBlockIndex.clear(); setDirtyBlockIndex.clear();
setDirtyFileInfo.clear(); setDirtyFileInfo.clear();
mapNodeState.clear();
recentRejects.reset(NULL);
versionbitscache.Clear(); versionbitscache.Clear();
for (int b = 0; b < VERSIONBITS_NUM_BITS; b++) { for (int b = 0; b < VERSIONBITS_NUM_BITS; b++) {
warningcache[b].clear(); warningcache[b].clear();
@ -4444,9 +4451,6 @@ bool InitBlockIndex(const CChainParams& chainparams)
{ {
LOCK(cs_main); LOCK(cs_main);
// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
// Check whether we're already initialized // Check whether we're already initialized
if (chainActive.Genesis() != NULL) if (chainActive.Genesis() != NULL)
return true; return true;
@ -4870,6 +4874,11 @@ std::string GetWarnings(const std::string& strFor)
// blockchain -> download logic notification // blockchain -> download logic notification
// //
PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn) : connman(connmanIn) {
// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
}
void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {
const int nNewHeight = pindexNew->nHeight; const int nNewHeight = pindexNew->nHeight;
connman->SetBestHeight(nNewHeight); connman->SetBestHeight(nNewHeight);
@ -4896,6 +4905,8 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB
} }
}); });
} }
nTimeBestReceived = GetTime();
} }
void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationState& state) { void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationState& state) {
@ -5989,7 +6000,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
if (nDoS > 0) if (nDoS > 0)
Misbehaving(pfrom->GetId(), nDoS); Misbehaving(pfrom->GetId(), nDoS);
} }
FlushStateToDisk(state, FLUSH_STATE_PERIODIC);
} }
@ -6093,8 +6103,6 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
} }
} }
} }
CheckBlockIndex(chainparams.GetConsensus());
} }
NotifyHeaderTip(); NotifyHeaderTip();
@ -6116,6 +6124,12 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv,
// Such an unrequested block may still be processed, subject to the // Such an unrequested block may still be processed, subject to the
// conditions in AcceptBlock(). // conditions in AcceptBlock().
bool forceProcessing = pfrom->fWhitelisted && !IsInitialBlockDownload(); bool forceProcessing = pfrom->fWhitelisted && !IsInitialBlockDownload();
{
LOCK(cs_main);
// Also always process if we requested the block explicitly, as we may
// need it even though it is not a candidate for a new best tip.
forceProcessing |= MarkBlockAsReceived(block.GetHash());
}
ProcessNewBlock(state, chainparams, pfrom, &block, forceProcessing, NULL); ProcessNewBlock(state, chainparams, pfrom, &block, forceProcessing, NULL);
int nDoS; int nDoS;
if (state.IsInvalid(nDoS)) { if (state.IsInvalid(nDoS)) {

View File

@ -851,7 +851,7 @@ private:
CConnman* connman; CConnman* connman;
public: public:
PeerLogicValidation(CConnman* connmanIn) : connman(connmanIn) {} PeerLogicValidation(CConnman* connmanIn);
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload);
virtual void BlockChecked(const CBlock& block, const CValidationState& state); virtual void BlockChecked(const CBlock& block, const CValidationState& state);