Implement 2-stage commit for CEvoDB to avoid inconsistencies after crashes (#2744)
* Let Commit() return void
The boolean return value will loose its meaning in the next commit
* Implement 2-stage commits for CDBTransaction and CScopedDBTransaction
CDBTransaction is changed to allow CDBBatch, CDBWrapper and other
CDBTransactions as parent instead of just CDBWrapper. This in turn allows
to implement multi-staged commits in CEvoDB.
We now have the "current transaction" which is started and ended (commit
or rollback) for each call to Connect-/DisconnectBlock. When the current
transaction is committed, it moves its contents into the "root transaction"
instead of directly writing to CDBWrapper.
CommitRootTransaction() then handles the final commitment to CDBWrapper. It
is called at the same time when the chainstate is flushed to disk, which
guarantees consistency between chainstate and CEvoDB.
* Allow to efficiently move values into parent transactions to avoid copies
When CDBTransaction<CDBTransaction<...>>::Commit() is called, we can avoid
copying values from this transaction to the parent transaction and instead
pass values by rvalue and let the contents be moved.
* Revert "Force FlushStateToDisk on ConnectTip/DisconnectTip while not in IBD (#2560)"
This reverts commit 6dfceaba5a
.
This commit is contained in:
parent
e63cdadc97
commit
521d4ae08f
@ -356,14 +356,16 @@ public:
|
||||
|
||||
};
|
||||
|
||||
template<typename Parent, typename CommitTarget>
|
||||
class CDBTransaction {
|
||||
private:
|
||||
CDBWrapper &db;
|
||||
protected:
|
||||
Parent &parent;
|
||||
CommitTarget &commitTarget;
|
||||
|
||||
struct KeyHolder {
|
||||
virtual ~KeyHolder() = default;
|
||||
virtual bool Less(const KeyHolder &b) const = 0;
|
||||
virtual void Erase(CDBBatch &batch) = 0;
|
||||
virtual void Erase(CommitTarget &commitTarget) = 0;
|
||||
};
|
||||
typedef std::unique_ptr<KeyHolder> KeyHolderPtr;
|
||||
|
||||
@ -376,15 +378,15 @@ private:
|
||||
auto *b2 = dynamic_cast<const KeyHolderImpl<K>*>(&b);
|
||||
return key < b2->key;
|
||||
}
|
||||
virtual void Erase(CDBBatch &batch) {
|
||||
batch.Erase(key);
|
||||
virtual void Erase(CommitTarget &commitTarget) {
|
||||
commitTarget.Erase(key);
|
||||
}
|
||||
K key;
|
||||
};
|
||||
|
||||
struct KeyValueHolder {
|
||||
virtual ~KeyValueHolder() = default;
|
||||
virtual void Write(CDBBatch &batch) = 0;
|
||||
virtual void Write(CommitTarget &parent) = 0;
|
||||
};
|
||||
typedef std::unique_ptr<KeyValueHolder> KeyValueHolderPtr;
|
||||
|
||||
@ -393,8 +395,13 @@ private:
|
||||
KeyValueHolderImpl(const KeyHolderImpl<K> &_key, const V &_value)
|
||||
: key(_key),
|
||||
value(_value) { }
|
||||
virtual void Write(CDBBatch &batch) {
|
||||
batch.Write(key.key, value);
|
||||
KeyValueHolderImpl(const KeyHolderImpl<K> &_key, V &&_value)
|
||||
: key(_key),
|
||||
value(std::forward<V>(_value)) { }
|
||||
virtual void Write(CommitTarget &commitTarget) {
|
||||
// we're moving the value instead of copying it. This means that Write() can only be called once per
|
||||
// KeyValueHolderImpl instance. Commit() clears the write maps, so this ok.
|
||||
commitTarget.Write(key.key, std::move(value));
|
||||
}
|
||||
const KeyHolderImpl<K> &key;
|
||||
V value;
|
||||
@ -434,22 +441,34 @@ private:
|
||||
return getMapForType<K>(deletes, create);
|
||||
}
|
||||
|
||||
public:
|
||||
CDBTransaction(CDBWrapper &_db) : db(_db) {}
|
||||
|
||||
template <typename K, typename V>
|
||||
void Write(const K& key, const V& value) {
|
||||
KeyHolderPtr k(new KeyHolderImpl<K>(key));
|
||||
KeyHolderImpl<K>* k2 = dynamic_cast<KeyHolderImpl<K>*>(k.get());
|
||||
KeyValueHolderPtr kv(new KeyValueHolderImpl<K,V>(*k2, value));
|
||||
template <typename K, typename KV>
|
||||
void writeImpl(KeyHolderImpl<K>* k, KV&& kv) {
|
||||
auto k2 = KeyHolderPtr(k);
|
||||
|
||||
KeyValueMap *ds = getDeletesMap<K>(false);
|
||||
if (ds)
|
||||
ds->erase(k);
|
||||
ds->erase(k2);
|
||||
|
||||
KeyValueMap *ws = getWritesMap<K>(true);
|
||||
ws->erase(k);
|
||||
ws->emplace(std::make_pair(std::move(k), std::move(kv)));
|
||||
ws->erase(k2);
|
||||
ws->emplace(std::make_pair(std::move(k2), std::forward<KV>(kv)));
|
||||
}
|
||||
|
||||
public:
|
||||
CDBTransaction(Parent &_parent, CommitTarget &_commitTarget) : parent(_parent), commitTarget(_commitTarget) {}
|
||||
|
||||
template <typename K, typename V>
|
||||
void Write(const K& key, const V& v) {
|
||||
auto k = new KeyHolderImpl<K>(key);
|
||||
auto kv = std::make_unique<KeyValueHolderImpl<K, V>>(*k, v);
|
||||
writeImpl(k, std::move(kv));
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
void Write(const K& key, V&& v) {
|
||||
auto k = new KeyHolderImpl<K>(key);
|
||||
auto kv = std::make_unique<KeyValueHolderImpl<K, typename std::remove_reference<V>::type>>(*k, std::forward<V>(v));
|
||||
writeImpl(k, std::move(kv));
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
@ -462,7 +481,7 @@ public:
|
||||
|
||||
KeyValueMap *ws = getWritesMap<K>(false);
|
||||
if (ws) {
|
||||
KeyValueMap::iterator it = ws->find(k);
|
||||
auto it = ws->find(k);
|
||||
if (it != ws->end()) {
|
||||
auto *impl = dynamic_cast<KeyValueHolderImpl<K, V> *>(it->second.get());
|
||||
if (!impl)
|
||||
@ -472,7 +491,7 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
return db.Read(key, value);
|
||||
return parent.Read(key, value);
|
||||
}
|
||||
|
||||
template <typename K>
|
||||
@ -487,7 +506,7 @@ public:
|
||||
if (ws && ws->count(k))
|
||||
return true;
|
||||
|
||||
return db.Exists(key);
|
||||
return parent.Exists(key);
|
||||
}
|
||||
|
||||
template <typename K>
|
||||
@ -506,21 +525,18 @@ public:
|
||||
deletes.clear();
|
||||
}
|
||||
|
||||
bool Commit() {
|
||||
CDBBatch batch(db);
|
||||
void Commit() {
|
||||
for (auto &p : deletes) {
|
||||
for (auto &p2 : p.second) {
|
||||
p2.first->Erase(batch);
|
||||
p2.first->Erase(commitTarget);
|
||||
}
|
||||
}
|
||||
for (auto &p : writes) {
|
||||
for (auto &p2 : p.second) {
|
||||
p2.second->Write(batch);
|
||||
p2.second->Write(commitTarget);
|
||||
}
|
||||
}
|
||||
bool ret = db.WriteBatch(batch, true);
|
||||
Clear();
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool IsClean() {
|
||||
@ -528,26 +544,29 @@ public:
|
||||
}
|
||||
};
|
||||
|
||||
template<typename Parent, typename CommitTarget>
|
||||
class CScopedDBTransaction {
|
||||
public:
|
||||
typedef CDBTransaction<Parent, CommitTarget> Transaction;
|
||||
|
||||
private:
|
||||
CDBTransaction &dbTransaction;
|
||||
Transaction &dbTransaction;
|
||||
std::function<void ()> commitHandler;
|
||||
std::function<void ()> rollbackHandler;
|
||||
bool didCommitOrRollback{};
|
||||
|
||||
public:
|
||||
CScopedDBTransaction(CDBTransaction &dbTx) : dbTransaction(dbTx) {}
|
||||
CScopedDBTransaction(Transaction &dbTx) : dbTransaction(dbTx) {}
|
||||
~CScopedDBTransaction() {
|
||||
if (!didCommitOrRollback)
|
||||
Rollback();
|
||||
}
|
||||
bool Commit() {
|
||||
void Commit() {
|
||||
assert(!didCommitOrRollback);
|
||||
didCommitOrRollback = true;
|
||||
bool result = dbTransaction.Commit();
|
||||
dbTransaction.Commit();
|
||||
if (commitHandler)
|
||||
commitHandler();
|
||||
return result;
|
||||
}
|
||||
void Rollback() {
|
||||
assert(!didCommitOrRollback);
|
||||
@ -557,9 +576,9 @@ public:
|
||||
rollbackHandler();
|
||||
}
|
||||
|
||||
static std::unique_ptr<CScopedDBTransaction> Begin(CDBTransaction &dbTx) {
|
||||
static std::unique_ptr<CScopedDBTransaction<Parent, CommitTarget>> Begin(Transaction &dbTx) {
|
||||
assert(dbTx.IsClean());
|
||||
return std::unique_ptr<CScopedDBTransaction>(new CScopedDBTransaction(dbTx));
|
||||
return std::make_unique<CScopedDBTransaction<Parent, CommitTarget>>(dbTx);
|
||||
}
|
||||
|
||||
void SetCommitHandler(const std::function<void ()> &h) {
|
||||
|
@ -8,10 +8,21 @@ CEvoDB* evoDb;
|
||||
|
||||
CEvoDB::CEvoDB(size_t nCacheSize, bool fMemory, bool fWipe) :
|
||||
db(fMemory ? "" : (GetDataDir() / "evodb"), nCacheSize, fMemory, fWipe),
|
||||
dbTransaction(db)
|
||||
rootBatch(db),
|
||||
rootDBTransaction(db, rootBatch),
|
||||
curDBTransaction(rootDBTransaction, rootDBTransaction)
|
||||
{
|
||||
}
|
||||
|
||||
bool CEvoDB::CommitRootTransaction()
|
||||
{
|
||||
assert(curDBTransaction.IsClean());
|
||||
rootDBTransaction.Commit();
|
||||
bool ret = db.WriteBatch(rootBatch);
|
||||
rootBatch.Clear();
|
||||
return ret;
|
||||
}
|
||||
|
||||
bool CEvoDB::VerifyBestBlock(const uint256& hash)
|
||||
{
|
||||
// Make sure evodb is consistent.
|
||||
|
@ -16,15 +16,22 @@ class CEvoDB
|
||||
private:
|
||||
CCriticalSection cs;
|
||||
CDBWrapper db;
|
||||
CDBTransaction dbTransaction;
|
||||
|
||||
typedef CDBTransaction<CDBWrapper, CDBBatch> RootTransaction;
|
||||
typedef CDBTransaction<RootTransaction, RootTransaction> CurTransaction;
|
||||
typedef CScopedDBTransaction<RootTransaction, RootTransaction> ScopedTransaction;
|
||||
|
||||
CDBBatch rootBatch;
|
||||
RootTransaction rootDBTransaction;
|
||||
CurTransaction curDBTransaction;
|
||||
|
||||
public:
|
||||
CEvoDB(size_t nCacheSize, bool fMemory = false, bool fWipe = false);
|
||||
|
||||
std::unique_ptr<CScopedDBTransaction> BeginTransaction()
|
||||
std::unique_ptr<ScopedTransaction> BeginTransaction()
|
||||
{
|
||||
LOCK(cs);
|
||||
auto t = CScopedDBTransaction::Begin(dbTransaction);
|
||||
auto t = ScopedTransaction::Begin(curDBTransaction);
|
||||
return t;
|
||||
}
|
||||
|
||||
@ -32,28 +39,28 @@ public:
|
||||
bool Read(const K& key, V& value)
|
||||
{
|
||||
LOCK(cs);
|
||||
return dbTransaction.Read(key, value);
|
||||
return curDBTransaction.Read(key, value);
|
||||
}
|
||||
|
||||
template <typename K, typename V>
|
||||
void Write(const K& key, const V& value)
|
||||
{
|
||||
LOCK(cs);
|
||||
dbTransaction.Write(key, value);
|
||||
curDBTransaction.Write(key, value);
|
||||
}
|
||||
|
||||
template <typename K>
|
||||
bool Exists(const K& key)
|
||||
{
|
||||
LOCK(cs);
|
||||
return dbTransaction.Exists(key);
|
||||
return curDBTransaction.Exists(key);
|
||||
}
|
||||
|
||||
template <typename K>
|
||||
void Erase(const K& key)
|
||||
{
|
||||
LOCK(cs);
|
||||
dbTransaction.Erase(key);
|
||||
curDBTransaction.Erase(key);
|
||||
}
|
||||
|
||||
CDBWrapper& GetRawDB()
|
||||
@ -61,6 +68,8 @@ public:
|
||||
return db;
|
||||
}
|
||||
|
||||
bool CommitRootTransaction();
|
||||
|
||||
bool VerifyBestBlock(const uint256& hash);
|
||||
void WriteBestBlock(const uint256& hash);
|
||||
};
|
||||
|
@ -2394,6 +2394,9 @@ bool static FlushStateToDisk(CValidationState &state, FlushStateMode mode, int n
|
||||
// Flush the chainstate (which may refer to block index entries).
|
||||
if (!pcoinsTip->Flush())
|
||||
return AbortNode(state, "Failed to write to coin database");
|
||||
if (!evoDb->CommitRootTransaction()) {
|
||||
return AbortNode(state, "Failed to commit EvoDB");
|
||||
}
|
||||
nLastFlush = nNow;
|
||||
}
|
||||
if (fDoFullFlush || ((mode == FLUSH_STATE_ALWAYS || mode == FLUSH_STATE_PERIODIC) && nNow > nLastSetChain + (int64_t)DATABASE_WRITE_INTERVAL * 1000000)) {
|
||||
@ -2499,12 +2502,11 @@ bool static DisconnectTip(CValidationState& state, const CChainParams& chainpara
|
||||
return error("DisconnectTip(): DisconnectBlock %s failed", pindexDelete->GetBlockHash().ToString());
|
||||
bool flushed = view.Flush();
|
||||
assert(flushed);
|
||||
bool committed = dbTx->Commit();
|
||||
assert(committed);
|
||||
dbTx->Commit();
|
||||
}
|
||||
LogPrint("bench", "- Disconnect block: %.2fms\n", (GetTimeMicros() - nStart) * 0.001);
|
||||
// Write the chain state to disk, if necessary.
|
||||
if (!FlushStateToDisk(state, IsInitialBlockDownload() ? FLUSH_STATE_IF_NEEDED : FLUSH_STATE_ALWAYS))
|
||||
if (!FlushStateToDisk(state, FLUSH_STATE_IF_NEEDED))
|
||||
return false;
|
||||
// Resurrect mempool transactions from the disconnected block.
|
||||
std::vector<uint256> vHashUpdate;
|
||||
@ -2589,13 +2591,12 @@ bool static ConnectTip(CValidationState& state, const CChainParams& chainparams,
|
||||
LogPrint("bench", " - Connect total: %.2fms [%.2fs]\n", (nTime3 - nTime2) * 0.001, nTimeConnectTotal * 0.000001);
|
||||
bool flushed = view.Flush();
|
||||
assert(flushed);
|
||||
bool committed = dbTx->Commit();
|
||||
assert(committed);
|
||||
dbTx->Commit();
|
||||
}
|
||||
int64_t nTime4 = GetTimeMicros(); nTimeFlush += nTime4 - nTime3;
|
||||
LogPrint("bench", " - Flush: %.2fms [%.2fs]\n", (nTime4 - nTime3) * 0.001, nTimeFlush * 0.000001);
|
||||
// Write the chain state to disk, if necessary.
|
||||
if (!FlushStateToDisk(state, IsInitialBlockDownload() ? FLUSH_STATE_IF_NEEDED : FLUSH_STATE_ALWAYS))
|
||||
if (!FlushStateToDisk(state, FLUSH_STATE_IF_NEEDED))
|
||||
return false;
|
||||
int64_t nTime5 = GetTimeMicros(); nTimeChainState += nTime5 - nTime4;
|
||||
LogPrint("bench", " - Writing chainstate: %.2fms [%.2fs]\n", (nTime5 - nTime4) * 0.001, nTimeChainState * 0.000001);
|
||||
|
Loading…
Reference in New Issue
Block a user