// Copyright (c) 2015 The Bitcoin Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include "zmqnotificationinterface.h" #include "zmqpublishnotifier.h" #include "version.h" #include "validation.h" #include "streams.h" #include "util.h" void zmqError(const char *str) { LogPrint(BCLog::ZMQ, "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno)); } CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL) { } CZMQNotificationInterface::~CZMQNotificationInterface() { Shutdown(); for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) { delete *i; } } CZMQNotificationInterface* CZMQNotificationInterface::Create() { CZMQNotificationInterface* notificationInterface = NULL; std::map factories; std::list notifiers; factories["pubhashblock"] = CZMQAbstractNotifier::Create; factories["pubhashchainlock"] = CZMQAbstractNotifier::Create; factories["pubhashtx"] = CZMQAbstractNotifier::Create; factories["pubhashtxlock"] = CZMQAbstractNotifier::Create; factories["pubhashgovernancevote"] = CZMQAbstractNotifier::Create; factories["pubhashgovernanceobject"] = CZMQAbstractNotifier::Create; factories["pubhashinstantsenddoublespend"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = CZMQAbstractNotifier::Create; factories["pubrawchainlock"] = CZMQAbstractNotifier::Create; factories["pubrawchainlocksig"] = CZMQAbstractNotifier::Create; factories["pubrawtx"] = CZMQAbstractNotifier::Create; factories["pubrawtxlock"] = CZMQAbstractNotifier::Create; factories["pubrawtxlocksig"] = CZMQAbstractNotifier::Create; factories["pubrawgovernancevote"] = CZMQAbstractNotifier::Create; factories["pubrawgovernanceobject"] = CZMQAbstractNotifier::Create; factories["pubrawinstantsenddoublespend"] = CZMQAbstractNotifier::Create; for (std::map::const_iterator i=factories.begin(); i!=factories.end(); ++i) { std::string arg("-zmq" + i->first); if (gArgs.IsArgSet(arg)) { CZMQNotifierFactory factory = i->second; std::string address = gArgs.GetArg(arg, ""); CZMQAbstractNotifier *notifier = factory(); notifier->SetType(i->first); notifier->SetAddress(address); notifiers.push_back(notifier); } } if (!notifiers.empty()) { notificationInterface = new CZMQNotificationInterface(); notificationInterface->notifiers = notifiers; if (!notificationInterface->Initialize()) { delete notificationInterface; notificationInterface = NULL; } } return notificationInterface; } // Called at startup to conditionally set up ZMQ socket(s) bool CZMQNotificationInterface::Initialize() { LogPrint(BCLog::ZMQ, "zmq: Initialize notification interface\n"); assert(!pcontext); pcontext = zmq_init(1); if (!pcontext) { zmqError("Unable to initialize context"); return false; } std::list::iterator i=notifiers.begin(); for (; i!=notifiers.end(); ++i) { CZMQAbstractNotifier *notifier = *i; if (notifier->Initialize(pcontext)) { LogPrint(BCLog::ZMQ, " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); } else { LogPrint(BCLog::ZMQ, " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); break; } } if (i!=notifiers.end()) { return false; } return true; } // Called during shutdown sequence void CZMQNotificationInterface::Shutdown() { LogPrint(BCLog::ZMQ, "zmq: Shutdown notification interface\n"); if (pcontext) { for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) { CZMQAbstractNotifier *notifier = *i; LogPrint(BCLog::ZMQ, " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); notifier->Shutdown(); } zmq_ctx_destroy(pcontext); pcontext = 0; } } void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones return; for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) { CZMQAbstractNotifier *notifier = *i; if (notifier->NotifyBlock(pindexNew)) { i++; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex, const llmq::CChainLockSig& clsig) { for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) { CZMQAbstractNotifier *notifier = *i; if (notifier->NotifyChainLock(pindex, clsig)) { i++; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } void CZMQNotificationInterface::TransactionAddedToMempool(const CTransactionRef& ptx) { // Used by BlockConnected and BlockDisconnected as well, because they're // all the same external callback. const CTransaction& tx = *ptx; for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) { CZMQAbstractNotifier *notifier = *i; if (notifier->NotifyTransaction(tx)) { i++; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } void CZMQNotificationInterface::BlockConnected(const std::shared_ptr& pblock, const CBlockIndex* pindexConnected, const std::vector& vtxConflicted) { for (const CTransactionRef& ptx : pblock->vtx) { // Do a normal notify for each transaction added in the block TransactionAddedToMempool(ptx); } } void CZMQNotificationInterface::BlockDisconnected(const std::shared_ptr& pblock, const CBlockIndex* pindexDisconnected) { for (const CTransactionRef& ptx : pblock->vtx) { // Do a normal notify for each transaction removed in block disconnection TransactionAddedToMempool(ptx); } } void CZMQNotificationInterface::NotifyTransactionLock(const CTransaction &tx, const llmq::CInstantSendLock& islock) { for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) { CZMQAbstractNotifier *notifier = *i; if (notifier->NotifyTransactionLock(tx, islock)) { i++; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } void CZMQNotificationInterface::NotifyGovernanceVote(const CGovernanceVote &vote) { for (std::list::iterator i = notifiers.begin(); i != notifiers.end(); ) { CZMQAbstractNotifier *notifier = *i; if (notifier->NotifyGovernanceVote(vote)) { i++; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } void CZMQNotificationInterface::NotifyGovernanceObject(const CGovernanceObject &object) { for (std::list::iterator i = notifiers.begin(); i != notifiers.end(); ) { CZMQAbstractNotifier *notifier = *i; if (notifier->NotifyGovernanceObject(object)) { i++; } else { notifier->Shutdown(); i = notifiers.erase(i); } } } void CZMQNotificationInterface::NotifyInstantSendDoubleSpendAttempt(const CTransaction ¤tTx, const CTransaction &previousTx) { for (auto it = notifiers.begin(); it != notifiers.end();) { CZMQAbstractNotifier *notifier = *it; if (notifier->NotifyInstantSendDoubleSpendAttempt(currentTx, previousTx)) { ++it; } else { notifier->Shutdown(); it = notifiers.erase(it); } } }