Make InstantSend locks persistent

This commit is contained in:
Alexander Block 2019-03-11 06:40:28 +01:00
parent 293c9ad6a1
commit ac00c66287
3 changed files with 227 additions and 168 deletions

View File

@ -35,7 +35,7 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests)
quorumSigSharesManager = new CSigSharesManager();
quorumSigningManager = new CSigningManager(*llmqDb, unitTests);
chainLocksHandler = new CChainLocksHandler(scheduler);
quorumInstantSendManager = new CInstantSendManager(scheduler);
quorumInstantSendManager = new CInstantSendManager(scheduler, *llmqDb);
}
void DestroyLLMQSystem()

View File

@ -41,8 +41,122 @@ uint256 CInstantSendLock::GetRequestId() const
return hw.GetHash();
}
CInstantSendManager::CInstantSendManager(CScheduler* _scheduler) :
scheduler(_scheduler)
////////////////
void CInstantSendDb::WriteNewInstantSendLock(const uint256& hash, const CInstantSendLock& islock)
{
CDBBatch batch(db);
batch.Write(std::make_tuple(std::string("is_i"), hash), islock);
batch.Write(std::make_tuple(std::string("is_tx"), islock.txid), hash);
for (auto& in : islock.inputs) {
batch.Write(std::make_tuple(std::string("is_in"), in), hash);
}
db.WriteBatch(batch);
auto p = std::make_shared<CInstantSendLock>(islock);
islockCache.insert(hash, p);
txidCache.insert(islock.txid, hash);
for (auto& in : islock.inputs) {
outpointCache.insert(in, hash);
}
}
void CInstantSendDb::RemoveInstantSendLock(const uint256& hash, CInstantSendLockPtr islock)
{
if (!islock) {
islock = GetInstantSendLockByHash(hash);
if (!islock) {
return;
}
}
CDBBatch batch(db);
batch.Erase(std::make_tuple(std::string("is_i"), hash));
batch.Erase(std::make_tuple(std::string("is_tx"), islock->txid));
for (auto& in : islock->inputs) {
batch.Erase(std::make_tuple(std::string("is_in"), in));
}
db.WriteBatch(batch);
islockCache.erase(hash);
txidCache.erase(islock->txid);
for (auto& in : islock->inputs) {
outpointCache.erase(in);
}
}
CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByHash(const uint256& hash)
{
CInstantSendLockPtr ret;
if (islockCache.get(hash, ret)) {
return ret;
}
ret = std::make_shared<CInstantSendLock>();
bool exists = db.Read(std::make_tuple(std::string("is_i"), hash), *ret);
if (!exists) {
ret = nullptr;
}
islockCache.insert(hash, ret);
return ret;
}
CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByTxid(const uint256& txid)
{
uint256 islockHash;
bool found = txidCache.get(txid, islockHash);
if (found && islockHash.IsNull()) {
return nullptr;
}
if (!found) {
found = db.Read(std::make_tuple(std::string("is_tx"), txid), islockHash);
txidCache.insert(txid, islockHash);
}
if (!found) {
return nullptr;
}
return GetInstantSendLockByHash(islockHash);
}
CInstantSendLockPtr CInstantSendDb::GetInstantSendLockByInput(const COutPoint& outpoint)
{
uint256 islockHash;
bool found = outpointCache.get(outpoint, islockHash);
if (found && islockHash.IsNull()) {
return nullptr;
}
if (!found) {
found = db.Read(std::make_tuple(std::string("is_in"), outpoint), islockHash);
outpointCache.insert(outpoint, islockHash);
}
if (!found) {
return nullptr;
}
return GetInstantSendLockByHash(islockHash);
}
void CInstantSendDb::WriteLastChainLockBlock(const uint256& hash)
{
db.Write(std::make_tuple(std::string("is_lcb")), hash);
}
uint256 CInstantSendDb::GetLastChainLockBlock()
{
uint256 hashBlock;
db.Read(std::make_tuple(std::string("is_lcb")), hashBlock);
return hashBlock;
}
////////////////
CInstantSendManager::CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb) :
scheduler(_scheduler),
db(_llmqDb)
{
}
@ -287,14 +401,13 @@ void CInstantSendManager::TrySignInstantSendLock(const CTransaction& tx)
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: got all recovered sigs, creating CInstantSendLock\n", __func__,
tx.GetHash().ToString());
CInstantSendLockInfo islockInfo;
islockInfo.time = GetTimeMillis();
islockInfo.islock.txid = tx.GetHash();
CInstantSendLock islock;
islock.txid = tx.GetHash();
for (auto& in : tx.vin) {
islockInfo.islock.inputs.emplace_back(in.prevout);
islock.inputs.emplace_back(in.prevout);
}
auto id = islockInfo.islock.GetRequestId();
auto id = islock.GetRequestId();
if (quorumSigningManager->HasRecoveredSigForId(llmqType, id)) {
return;
@ -302,7 +415,7 @@ void CInstantSendManager::TrySignInstantSendLock(const CTransaction& tx)
{
LOCK(cs);
auto e = creatingInstantSendLocks.emplace(id, islockInfo);
auto e = creatingInstantSendLocks.emplace(id, std::move(islock));
if (!e.second) {
return;
}
@ -314,7 +427,7 @@ void CInstantSendManager::TrySignInstantSendLock(const CTransaction& tx)
void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CRecoveredSig& recoveredSig)
{
CInstantSendLockInfo islockInfo;
CInstantSendLock islock;
{
LOCK(cs);
@ -323,19 +436,19 @@ void CInstantSendManager::HandleNewInstantSendLockRecoveredSig(const llmq::CReco
return;
}
islockInfo = std::move(it->second);
islock = std::move(it->second);
creatingInstantSendLocks.erase(it);
txToCreatingInstantSendLocks.erase(islockInfo.islock.txid);
txToCreatingInstantSendLocks.erase(islock.txid);
}
if (islockInfo.islock.txid != recoveredSig.msgHash) {
if (islock.txid != recoveredSig.msgHash) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s: islock conflicts with %s, dropping own version", __func__,
islockInfo.islock.txid.ToString(), recoveredSig.msgHash.ToString());
islock.txid.ToString(), recoveredSig.msgHash.ToString());
return;
}
islockInfo.islock.sig = recoveredSig.sig;
ProcessInstantSendLock(-1, ::SerializeHash(islockInfo.islock), islockInfo.islock);
islock.sig = recoveredSig.sig;
ProcessInstantSendLock(-1, ::SerializeHash(islock), islock);
}
void CInstantSendManager::ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman)
@ -365,7 +478,10 @@ void CInstantSendManager::ProcessMessageInstantSendLock(CNode* pfrom, const llmq
auto hash = ::SerializeHash(islock);
LOCK(cs);
if (pendingInstantSendLocks.count(hash) || finalInstantSendLocks.count(hash)) {
if (db.GetInstantSendLockByHash(hash) != nullptr) {
return;
}
if (pendingInstantSendLocks.count(hash)) {
return;
}
@ -509,23 +625,20 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
g_connman->RemoveAskFor(hash);
}
CInstantSendLockInfo islockInfo;
islockInfo.time = GetTimeMillis();
islockInfo.islock = islock;
islockInfo.islockHash = hash;
CTransactionRef tx;
uint256 hashBlock;
// we ignore failure here as we must be able to propagate the lock even if we don't have the TX locally
if (GetTransaction(islock.txid, islockInfo.tx, Params().GetConsensus(), hashBlock)) {
if (GetTransaction(islock.txid, tx, Params().GetConsensus(), hashBlock)) {
if (!hashBlock.IsNull()) {
const CBlockIndex* pindexMined;
{
LOCK(cs_main);
islockInfo.pindexMined = mapBlockIndex.at(hashBlock);
pindexMined = mapBlockIndex.at(hashBlock);
}
// Let's see if the TX that was locked by this islock is already mined in a ChainLocked block. If yes,
// we can simply ignore the islock, as the ChainLock implies locking of all TXs in that chain
if (llmq::chainLocksHandler->HasChainLock(islockInfo.pindexMined->nHeight, islockInfo.pindexMined->GetBlockHash())) {
if (llmq::chainLocksHandler->HasChainLock(pindexMined->nHeight, pindexMined->GetBlockHash())) {
LogPrint("instantsend", "CInstantSendManager::%s -- txlock=%s, islock=%s: dropping islock as it already got a ChainLock in block %s, peer=%d\n", __func__,
islock.txid.ToString(), hash.ToString(), hashBlock.ToString(), from);
return;
@ -535,36 +648,29 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
{
LOCK(cs);
auto e = finalInstantSendLocks.emplace(hash, islockInfo);
if (!e.second) {
return;
}
auto islockInfoPtr = &e.first->second;
creatingInstantSendLocks.erase(islockInfoPtr->islock.GetRequestId());
txToCreatingInstantSendLocks.erase(islockInfoPtr->islock.txid);
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: processsing islock, peer=%d\n", __func__,
islock.txid.ToString(), hash.ToString(), from);
if (!txToInstantSendLock.emplace(islock.txid, islockInfoPtr).second) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: duplicate islock, other islock=%s, peer=%d\n", __func__,
islock.txid.ToString(), hash.ToString(), txToInstantSendLock[islock.txid]->islockHash.ToString(), from);
txToInstantSendLock.erase(hash);
creatingInstantSendLocks.erase(islock.GetRequestId());
txToCreatingInstantSendLocks.erase(islock.txid);
CInstantSendLockPtr otherIsLock;
if (db.GetInstantSendLockByHash(hash)) {
return;
}
for (size_t i = 0; i < islock.inputs.size(); i++) {
auto& in = islock.inputs[i];
if (!inputToInstantSendLock.emplace(in, islockInfoPtr).second) {
if (otherIsLock = db.GetInstantSendLockByTxid(islock.txid)) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: duplicate islock, other islock=%s, peer=%d\n", __func__,
islock.txid.ToString(), hash.ToString(), ::SerializeHash(*otherIsLock).ToString(), from);
}
for (auto& in : islock.inputs) {
if (otherIsLock = db.GetInstantSendLockByInput(in)) {
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: conflicting input in islock. input=%s, other islock=%s, peer=%d\n", __func__,
islock.txid.ToString(), hash.ToString(), in.ToStringShort(), inputToInstantSendLock[in]->islockHash.ToString(), from);
txToInstantSendLock.erase(hash);
for (size_t j = 0; j < i; j++) {
inputToInstantSendLock.erase(islock.inputs[j]);
}
return;
islock.txid.ToString(), hash.ToString(), in.ToStringShort(), ::SerializeHash(*otherIsLock).ToString(), from);
}
}
db.WriteNewInstantSendLock(hash, islock);
}
CInv inv(MSG_ISLOCK, hash);
@ -573,10 +679,10 @@ void CInstantSendManager::ProcessInstantSendLock(NodeId from, const uint256& has
RemoveMempoolConflictsForLock(hash, islock);
RetryLockMempoolTxs(islock.txid);
UpdateWalletTransaction(islock.txid);
UpdateWalletTransaction(islock.txid, tx);
}
void CInstantSendManager::UpdateWalletTransaction(const uint256& txid)
void CInstantSendManager::UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx)
{
#ifdef ENABLE_WALLET
if (!pwalletMain) {
@ -595,16 +701,9 @@ void CInstantSendManager::UpdateWalletTransaction(const uint256& txid)
}
#endif
LOCK(cs);
auto it = txToInstantSendLock.find(txid);
if (it == txToInstantSendLock.end()) {
return;
if (tx) {
GetMainSignals().NotifyTransactionLock(*tx);
}
if (it->second->tx == nullptr) {
return;
}
GetMainSignals().NotifyTransactionLock(*it->second->tx);
}
void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
@ -613,24 +712,6 @@ void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIn
return;
}
{
LOCK(cs);
auto it = txToInstantSendLock.find(tx.GetHash());
if (it == txToInstantSendLock.end()) {
return;
}
auto islockInfo = it->second;
if (islockInfo->tx == nullptr) {
islockInfo->tx = MakeTransactionRef(tx);
}
if (posInBlock == CMainSignals::SYNC_TRANSACTION_NOT_IN_BLOCK) {
UpdateISLockMinedBlock(islockInfo, nullptr);
return;
}
UpdateISLockMinedBlock(islockInfo, pindex);
}
if (IsLocked(tx.GetHash())) {
RetryLockMempoolTxs(tx.GetHash());
}
@ -638,80 +719,58 @@ void CInstantSendManager::SyncTransaction(const CTransaction& tx, const CBlockIn
void CInstantSendManager::NotifyChainLock(const CBlockIndex* pindex)
{
uint256 lastChainLockBlock;
{
LOCK(cs);
db.GetLastChainLockBlock();
}
// Let's find all islocks that correspond to TXs which are part of the freshly ChainLocked chain and then delete
// the islocks. We do this because the ChainLocks imply locking and thus it's not needed to further track
// or propagate the islocks
std::unordered_set<uint256> toDelete;
while (pindex && pindex != pindexLastChainLock) {
auto its = blockToInstantSendLocks.equal_range(pindex->GetBlockHash());
while (its.first != its.second) {
auto islockInfo = its.first->second;
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: removing islock as it got ChainLocked in block %s\n", __func__,
islockInfo->islock.txid.ToString(), islockInfo->islockHash.ToString(), pindex->GetBlockHash().ToString());
toDelete.emplace(its.first->second->islockHash);
++its.first;
// Let's find all islocks that correspond to TXs which are part of the freshly ChainLocked chain and then delete
// the islocks. We do this because the ChainLocks imply locking and thus it's not needed to further track
// or propagate the islocks
std::unordered_set<uint256> toDelete;
while (pindex && pindex->GetBlockHash() != lastChainLockBlock) {
CBlock block;
{
if (!ReadBlockFromDisk(block, pindex, Params().GetConsensus())) {
pindex = pindex->pprev;
continue;
}
pindex = pindex->pprev;
}
pindexLastChainLock = pindex;
for (auto& islockHash : toDelete) {
RemoveFinalISLock(islockHash);
LOCK(cs);
for (const auto& tx : block.vtx) {
auto islock = db.GetInstantSendLockByTxid(tx->GetHash());
if (!islock) {
continue;
}
auto hash = ::SerializeHash(*islock);
LogPrint("instantsend", "CInstantSendManager::%s -- txid=%s, islock=%s: removing islock as it got ChainLocked in block %s\n", __func__,
islock->txid.ToString(), hash.ToString(), pindex->GetBlockHash().ToString());
RemoveFinalISLock(hash, islock);
}
pindex = pindex->pprev;
}
{
LOCK(cs);
db.WriteLastChainLockBlock(pindex ? pindex->GetBlockHash() : uint256());
}
RetryLockMempoolTxs(uint256());
}
void CInstantSendManager::UpdateISLockMinedBlock(llmq::CInstantSendLockInfo* islockInfo, const CBlockIndex* pindex)
void CInstantSendManager::RemoveFinalISLock(const uint256& hash, const CInstantSendLockPtr& islock)
{
AssertLockHeld(cs);
if (islockInfo->pindexMined == pindex) {
return;
}
if (islockInfo->pindexMined) {
auto its = blockToInstantSendLocks.equal_range(islockInfo->pindexMined->GetBlockHash());
while (its.first != its.second) {
if (its.first->second == islockInfo) {
its.first = blockToInstantSendLocks.erase(its.first);
} else {
++its.first;
}
}
}
if (pindex) {
blockToInstantSendLocks.emplace(pindex->GetBlockHash(), islockInfo);
}
islockInfo->pindexMined = pindex;
}
void CInstantSendManager::RemoveFinalISLock(const uint256& hash)
{
AssertLockHeld(cs);
db.RemoveInstantSendLock(hash, islock);
auto it = finalInstantSendLocks.find(hash);
if (it == finalInstantSendLocks.end()) {
return;
}
auto& islockInfo = it->second;
txToInstantSendLock.erase(islockInfo.islock.txid);
for (auto& in : islockInfo.islock.inputs) {
for (auto& in : islock->inputs) {
auto inputRequestId = ::SerializeHash(std::make_pair(INPUTLOCK_REQUESTID_PREFIX, in));
inputRequestIds.erase(inputRequestId);
inputToInstantSendLock.erase(in);
}
UpdateISLockMinedBlock(&islockInfo, nullptr);
finalInstantSendLocks.erase(it);
}
void CInstantSendManager::RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock)
@ -798,7 +857,7 @@ bool CInstantSendManager::AlreadyHave(const CInv& inv)
}
LOCK(cs);
return finalInstantSendLocks.count(inv.hash) != 0 || pendingInstantSendLocks.count(inv.hash) != 0;
return db.GetInstantSendLockByHash(inv.hash) != nullptr || pendingInstantSendLocks.count(inv.hash) != 0;
}
bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CInstantSendLock& ret)
@ -808,11 +867,11 @@ bool CInstantSendManager::GetInstantSendLockByHash(const uint256& hash, llmq::CI
}
LOCK(cs);
auto it = finalInstantSendLocks.find(hash);
if (it == finalInstantSendLocks.end()) {
auto islock = db.GetInstantSendLockByHash(hash);
if (!islock) {
return false;
}
ret = it->second.islock;
ret = *islock;
return true;
}
@ -823,7 +882,7 @@ bool CInstantSendManager::IsLocked(const uint256& txHash)
}
LOCK(cs);
return txToInstantSendLock.count(txHash) != 0;
return db.GetInstantSendLockByTxid(txHash) != nullptr;
}
bool CInstantSendManager::IsConflicted(const CTransaction& tx)
@ -841,13 +900,13 @@ bool CInstantSendManager::GetConflictingTx(const CTransaction& tx, uint256& retC
LOCK(cs);
for (const auto& in : tx.vin) {
auto it = inputToInstantSendLock.find(in.prevout);
if (it == inputToInstantSendLock.end()) {
auto otherIsLock = db.GetInstantSendLockByInput(in.prevout);
if (!otherIsLock) {
continue;
}
if (it->second->islock.txid != tx.GetHash()) {
retConflictTxHash = it->second->islock.txid;
if (otherIsLock->txid != tx.GetHash()) {
retConflictTxHash = otherIsLock->txid;
return true;
}
}

View File

@ -8,6 +8,7 @@
#include "quorums_signing.h"
#include "coins.h"
#include "unordered_lru_cache.h"
#include "primitives/transaction.h"
#include <unordered_map>
@ -39,19 +40,29 @@ public:
uint256 GetRequestId() const;
};
class CInstantSendLockInfo
{
public:
// might be nullptr when islock is received before the TX itself
CTransactionRef tx;
CInstantSendLock islock;
// only valid when recovered sig was received
uint256 islockHash;
// time when it was created/received
int64_t time;
typedef std::shared_ptr<CInstantSendLock> CInstantSendLockPtr;
// might be null initially (when TX was not mined yet) and will later be filled by SyncTransaction
const CBlockIndex* pindexMined{nullptr};
class CInstantSendDb
{
private:
CDBWrapper& db;
unordered_lru_cache<uint256, CInstantSendLockPtr, StaticSaltedHasher, 10000> islockCache;
unordered_lru_cache<uint256, uint256, StaticSaltedHasher, 10000> txidCache;
unordered_lru_cache<COutPoint, uint256, SaltedOutpointHasher, 10000> outpointCache;
public:
CInstantSendDb(CDBWrapper& _db) : db(_db) {}
void WriteNewInstantSendLock(const uint256& hash, const CInstantSendLock& islock);
void RemoveInstantSendLock(const uint256& hash, CInstantSendLockPtr islock);
CInstantSendLockPtr GetInstantSendLockByHash(const uint256& hash);
CInstantSendLockPtr GetInstantSendLockByTxid(const uint256& txid);
CInstantSendLockPtr GetInstantSendLockByInput(const COutPoint& outpoint);
void WriteLastChainLockBlock(const uint256& hashBlock);
uint256 GetLastChainLockBlock();
};
class CInstantSendManager : public CRecoveredSigsListener
@ -59,6 +70,7 @@ class CInstantSendManager : public CRecoveredSigsListener
private:
CCriticalSection cs;
CScheduler* scheduler;
CInstantSendDb db;
/**
* Request ids of inputs that we signed. Used to determine if a recovered signature belongs to an
@ -71,27 +83,16 @@ private:
* recovered signatures for all inputs of a TX. At the same time, we initiate signing of our sigshare for the islock.
* When the recovered sig for the islock later arrives, we can finish the islock and propagate it.
*/
std::unordered_map<uint256, CInstantSendLockInfo, StaticSaltedHasher> creatingInstantSendLocks;
std::unordered_map<uint256, CInstantSendLock, StaticSaltedHasher> creatingInstantSendLocks;
// maps from txid to the in-progress islock
std::unordered_map<uint256, CInstantSendLockInfo*, StaticSaltedHasher> txToCreatingInstantSendLocks;
/**
* These are the final islocks, indexed by their own hash. The other maps are used to get from TXs, inputs and blocks
* to islocks.
*/
std::unordered_map<uint256, CInstantSendLockInfo, StaticSaltedHasher> finalInstantSendLocks;
std::unordered_map<uint256, CInstantSendLockInfo*, StaticSaltedHasher> txToInstantSendLock;
std::unordered_map<COutPoint, CInstantSendLockInfo*, SaltedOutpointHasher> inputToInstantSendLock;
std::unordered_multimap<uint256, CInstantSendLockInfo*, StaticSaltedHasher> blockToInstantSendLocks;
const CBlockIndex* pindexLastChainLock{nullptr};
std::unordered_map<uint256, CInstantSendLock*, StaticSaltedHasher> txToCreatingInstantSendLocks;
// Incoming and not verified yet
std::unordered_map<uint256, std::pair<NodeId, CInstantSendLock>> pendingInstantSendLocks;
bool hasScheduledProcessPending{false};
public:
CInstantSendManager(CScheduler* _scheduler);
CInstantSendManager(CScheduler* _scheduler, CDBWrapper& _llmqDb);
~CInstantSendManager();
void RegisterAsRecoveredSigsListener();
@ -116,12 +117,11 @@ public:
bool PreVerifyInstantSendLock(NodeId nodeId, const CInstantSendLock& islock, bool& retBan);
void ProcessPendingInstantSendLocks();
void ProcessInstantSendLock(NodeId from, const uint256& hash, const CInstantSendLock& islock);
void UpdateWalletTransaction(const uint256& txid);
void UpdateWalletTransaction(const uint256& txid, const CTransactionRef& tx);
void SyncTransaction(const CTransaction &tx, const CBlockIndex *pindex, int posInBlock);
void NotifyChainLock(const CBlockIndex* pindex);
void UpdateISLockMinedBlock(CInstantSendLockInfo* islockInfo, const CBlockIndex* pindex);
void RemoveFinalISLock(const uint256& hash);
void RemoveFinalISLock(const uint256& hash, const CInstantSendLockPtr& islock);
void RemoveMempoolConflictsForLock(const uint256& hash, const CInstantSendLock& islock);
void RetryLockMempoolTxs(const uint256& lockedParentTx);