Merge #9605: Use CScheduler for wallet flushing, remove ThreadFlushWalletDB
0235be1 Rename FlushWalletDB -> CompactWalletDB, add function description (Matt Corallo) 735d9b5 Use CScheduler for wallet flushing, remove ThreadFlushWalletDB (Matt Corallo) 73296f5 CScheduler boost->std::function, use millisecs for times, not secs (Matt Corallo) Tree-SHA512: c04f97beab65706c444c126be229d02887df9b0972d8fb15ca1f779ef0e628cf7ecef2bf533c650d9b44645b63e01de22f17266a05907e778938d64cc6e19de6
This commit is contained in:
parent
9c8c12ed47
commit
8d249d4dff
@ -2087,7 +2087,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler)
|
||||
|
||||
#ifdef ENABLE_WALLET
|
||||
if (pwalletMain)
|
||||
pwalletMain->postInitProcess(threadGroup);
|
||||
pwalletMain->postInitProcess(scheduler);
|
||||
#endif
|
||||
|
||||
threadGroup.create_thread(boost::bind(&ThreadSendAlert, boost::ref(connman)));
|
||||
|
@ -2409,7 +2409,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
|
||||
threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
|
||||
|
||||
// Dump network addresses
|
||||
scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL);
|
||||
scheduler.scheduleEvery(std::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL * 1000);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -104,20 +104,20 @@ void CScheduler::schedule(CScheduler::Function f, boost::chrono::system_clock::t
|
||||
newTaskScheduled.notify_one();
|
||||
}
|
||||
|
||||
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaSeconds)
|
||||
void CScheduler::scheduleFromNow(CScheduler::Function f, int64_t deltaMilliSeconds)
|
||||
{
|
||||
schedule(f, boost::chrono::system_clock::now() + boost::chrono::seconds(deltaSeconds));
|
||||
schedule(f, boost::chrono::system_clock::now() + boost::chrono::milliseconds(deltaMilliSeconds));
|
||||
}
|
||||
|
||||
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaSeconds)
|
||||
static void Repeat(CScheduler* s, CScheduler::Function f, int64_t deltaMilliSeconds)
|
||||
{
|
||||
f();
|
||||
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaSeconds), deltaSeconds);
|
||||
s->scheduleFromNow(boost::bind(&Repeat, s, f, deltaMilliSeconds), deltaMilliSeconds);
|
||||
}
|
||||
|
||||
void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaSeconds)
|
||||
void CScheduler::scheduleEvery(CScheduler::Function f, int64_t deltaMilliSeconds)
|
||||
{
|
||||
scheduleFromNow(boost::bind(&Repeat, this, f, deltaSeconds), deltaSeconds);
|
||||
scheduleFromNow(boost::bind(&Repeat, this, f, deltaMilliSeconds), deltaMilliSeconds);
|
||||
}
|
||||
|
||||
size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
|
||||
|
@ -10,7 +10,6 @@
|
||||
// boost::thread / boost::function / boost::chrono should be ported to
|
||||
// std::thread / std::function / std::chrono when we support C++11.
|
||||
//
|
||||
#include <boost/function.hpp>
|
||||
#include <boost/chrono/chrono.hpp>
|
||||
#include <boost/thread.hpp>
|
||||
#include <map>
|
||||
@ -23,7 +22,7 @@
|
||||
//
|
||||
// CScheduler* s = new CScheduler();
|
||||
// s->scheduleFromNow(doSomething, 11); // Assuming a: void doSomething() { }
|
||||
// s->scheduleFromNow(boost::bind(Class::func, this, argument), 3);
|
||||
// s->scheduleFromNow(std::bind(Class::func, this, argument), 3);
|
||||
// boost::thread* t = new boost::thread(boost::bind(CScheduler::serviceQueue, s));
|
||||
//
|
||||
// ... then at program shutdown, clean up the thread running serviceQueue:
|
||||
@ -39,20 +38,20 @@ public:
|
||||
CScheduler();
|
||||
~CScheduler();
|
||||
|
||||
typedef boost::function<void(void)> Function;
|
||||
typedef std::function<void(void)> Function;
|
||||
|
||||
// Call func at/after time t
|
||||
void schedule(Function f, boost::chrono::system_clock::time_point t);
|
||||
|
||||
// Convenience method: call f once deltaSeconds from now
|
||||
void scheduleFromNow(Function f, int64_t deltaSeconds);
|
||||
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
|
||||
|
||||
// Another convenience method: call f approximately
|
||||
// every deltaSeconds forever, starting deltaSeconds from now.
|
||||
// To be more precise: every time f is finished, it
|
||||
// is rescheduled to run deltaSeconds later. If you
|
||||
// need more accurate scheduling, don't use this method.
|
||||
void scheduleEvery(Function f, int64_t deltaSeconds);
|
||||
void scheduleEvery(Function f, int64_t deltaMilliSeconds);
|
||||
|
||||
// To keep things as simple as possible, there is no unschedule.
|
||||
|
||||
|
@ -21,6 +21,7 @@
|
||||
#include "primitives/transaction.h"
|
||||
#include "script/script.h"
|
||||
#include "script/sign.h"
|
||||
#include "scheduler.h"
|
||||
#include "timedata.h"
|
||||
#include "txmempool.h"
|
||||
#include "util.h"
|
||||
@ -5047,17 +5048,17 @@ bool CWallet::InitLoadWallet()
|
||||
return true;
|
||||
}
|
||||
|
||||
std::atomic<bool> CWallet::fFlushThreadRunning(false);
|
||||
std::atomic<bool> CWallet::fFlushScheduled(false);
|
||||
|
||||
void CWallet::postInitProcess(boost::thread_group& threadGroup)
|
||||
void CWallet::postInitProcess(CScheduler& scheduler)
|
||||
{
|
||||
// Add wallet transactions that aren't already in a block to mempool
|
||||
// Do this here as mempool requires genesis block to be loaded
|
||||
ReacceptWalletTransactions();
|
||||
|
||||
// Run a thread to flush wallet periodically
|
||||
if (!CWallet::fFlushThreadRunning.exchange(true)) {
|
||||
threadGroup.create_thread(ThreadFlushWalletDB);
|
||||
if (!CWallet::fFlushScheduled.exchange(true)) {
|
||||
scheduler.scheduleEvery(MaybeCompactWalletDB, 500);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -33,7 +33,6 @@
|
||||
#include <vector>
|
||||
|
||||
#include <boost/shared_ptr.hpp>
|
||||
#include <boost/thread.hpp>
|
||||
|
||||
extern CWallet* pwalletMain;
|
||||
|
||||
@ -84,6 +83,7 @@ class CCoinControl;
|
||||
class COutput;
|
||||
class CReserveKey;
|
||||
class CScript;
|
||||
class CScheduler;
|
||||
class CTxMemPool;
|
||||
class CWalletTx;
|
||||
|
||||
@ -654,7 +654,7 @@ private:
|
||||
class CWallet : public CCryptoKeyStore, public CValidationInterface
|
||||
{
|
||||
private:
|
||||
static std::atomic<bool> fFlushThreadRunning;
|
||||
static std::atomic<bool> fFlushScheduled;
|
||||
|
||||
/**
|
||||
* Select a set of coins such that nValueRet >= nTargetValue and at least
|
||||
@ -1124,7 +1124,7 @@ public:
|
||||
* Wallet post-init setup
|
||||
* Gives the wallet a chance to register repetitive tasks and complete post-init tasks
|
||||
*/
|
||||
void postInitProcess(boost::thread_group& threadGroup);
|
||||
void postInitProcess(CScheduler& scheduler);
|
||||
|
||||
/* Wallets parameter interaction */
|
||||
static bool ParameterInteraction();
|
||||
|
@ -808,38 +808,33 @@ DBErrors CWalletDB::ZapWalletTx(std::vector<CWalletTx>& vWtx)
|
||||
return DB_LOAD_OK;
|
||||
}
|
||||
|
||||
void ThreadFlushWalletDB()
|
||||
void MaybeCompactWalletDB()
|
||||
{
|
||||
// Make this thread recognisable as the wallet flushing thread
|
||||
RenameThread("dash-wallet");
|
||||
|
||||
static bool fOneThread;
|
||||
if (fOneThread)
|
||||
static std::atomic<bool> fOneThread;
|
||||
if (fOneThread.exchange(true)) {
|
||||
return;
|
||||
fOneThread = true;
|
||||
if (!GetBoolArg("-flushwallet", DEFAULT_FLUSHWALLET))
|
||||
return;
|
||||
|
||||
unsigned int nLastSeen = CWalletDB::GetUpdateCounter();
|
||||
unsigned int nLastFlushed = CWalletDB::GetUpdateCounter();
|
||||
int64_t nLastWalletUpdate = GetTime();
|
||||
while (true)
|
||||
{
|
||||
MilliSleep(500);
|
||||
|
||||
if (nLastSeen != CWalletDB::GetUpdateCounter())
|
||||
{
|
||||
nLastSeen = CWalletDB::GetUpdateCounter();
|
||||
nLastWalletUpdate = GetTime();
|
||||
}
|
||||
|
||||
if (nLastFlushed != CWalletDB::GetUpdateCounter() && GetTime() - nLastWalletUpdate >= 2)
|
||||
{
|
||||
const std::string& strFile = pwalletMain->strWalletFile;
|
||||
if (CDB::PeriodicFlush(strFile))
|
||||
nLastFlushed = CWalletDB::GetUpdateCounter();
|
||||
}
|
||||
}
|
||||
if (!GetBoolArg("-flushwallet", DEFAULT_FLUSHWALLET)) {
|
||||
return;
|
||||
}
|
||||
|
||||
static unsigned int nLastSeen = CWalletDB::GetUpdateCounter();
|
||||
static unsigned int nLastFlushed = CWalletDB::GetUpdateCounter();
|
||||
static int64_t nLastWalletUpdate = GetTime();
|
||||
|
||||
if (nLastSeen != CWalletDB::GetUpdateCounter())
|
||||
{
|
||||
nLastSeen = CWalletDB::GetUpdateCounter();
|
||||
nLastWalletUpdate = GetTime();
|
||||
}
|
||||
|
||||
if (nLastFlushed != CWalletDB::GetUpdateCounter() && GetTime() - nLastWalletUpdate >= 2)
|
||||
{
|
||||
const std::string& strFile = pwalletMain->strWalletFile;
|
||||
if (CDB::PeriodicFlush(strFile))
|
||||
nLastFlushed = CWalletDB::GetUpdateCounter();
|
||||
}
|
||||
fOneThread = false;
|
||||
}
|
||||
|
||||
//
|
||||
|
@ -156,6 +156,7 @@ private:
|
||||
void operator=(const CWalletDB&);
|
||||
};
|
||||
|
||||
void ThreadFlushWalletDB();
|
||||
//! Compacts BDB state so that wallet.dat is self-contained (if there are changes)
|
||||
void MaybeCompactWalletDB();
|
||||
|
||||
#endif // BITCOIN_WALLET_WALLETDB_H
|
||||
|
Loading…
Reference in New Issue
Block a user