dash/src/zmq/zmqnotificationinterface.cpp
Pieter Wuille aff4b6d2c5 Merge #9243: Clean up mapArgs and mapMultiArgs Usage
c2f61be Add a ForceSetArg method for testing (Matt Corallo)
4e04814 Lock mapArgs/mapMultiArgs access in util (Matt Corallo)
4cd373a Un-expose mapArgs from utils.h (Matt Corallo)
71fde55 Get rid of mapArgs direct access in ZMQ construction (Matt Corallo)
0cf86a6 Introduce (and use) an IsArgSet accessor method (Matt Corallo)
2b5f085 Fix non-const mapMultiArgs[] access after init. (Matt Corallo)
c8042a4 Remove arguments to ParseConfigFile (Matt Corallo)
2018-01-18 07:31:23 +01:00

182 lines
5.2 KiB
C++

// Copyright (c) 2015 The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include "zmqnotificationinterface.h"
#include "zmqpublishnotifier.h"
#include "version.h"
#include "validation.h"
#include "streams.h"
#include "util.h"
void zmqError(const char *str)
{
LogPrint("zmq", "zmq: Error: %s, errno=%s\n", str, zmq_strerror(errno));
}
CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL)
{
}
CZMQNotificationInterface::~CZMQNotificationInterface()
{
Shutdown();
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
delete *i;
}
}
CZMQNotificationInterface* CZMQNotificationInterface::Create()
{
CZMQNotificationInterface* notificationInterface = NULL;
std::map<std::string, CZMQNotifierFactory> factories;
std::list<CZMQAbstractNotifier*> notifiers;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubhashtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionLockNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubrawtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionLockNotifier>;
for (std::map<std::string, CZMQNotifierFactory>::const_iterator i=factories.begin(); i!=factories.end(); ++i)
{
std::string arg("-zmq" + i->first);
if (IsArgSet(arg))
{
CZMQNotifierFactory factory = i->second;
std::string address = GetArg(arg, "");
CZMQAbstractNotifier *notifier = factory();
notifier->SetType(i->first);
notifier->SetAddress(address);
notifiers.push_back(notifier);
}
}
if (!notifiers.empty())
{
notificationInterface = new CZMQNotificationInterface();
notificationInterface->notifiers = notifiers;
if (!notificationInterface->Initialize())
{
delete notificationInterface;
notificationInterface = NULL;
}
}
return notificationInterface;
}
// Called at startup to conditionally set up ZMQ socket(s)
bool CZMQNotificationInterface::Initialize()
{
LogPrint("zmq", "zmq: Initialize notification interface\n");
assert(!pcontext);
pcontext = zmq_init(1);
if (!pcontext)
{
zmqError("Unable to initialize context");
return false;
}
std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin();
for (; i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->Initialize(pcontext))
{
LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress());
}
else
{
LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress());
break;
}
}
if (i!=notifiers.end())
{
return false;
}
return true;
}
// Called during shutdown sequence
void CZMQNotificationInterface::Shutdown()
{
LogPrint("zmq", "zmq: Shutdown notification interface\n");
if (pcontext)
{
for (std::list<CZMQAbstractNotifier*>::iterator i=notifiers.begin(); i!=notifiers.end(); ++i)
{
CZMQAbstractNotifier *notifier = *i;
LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress());
notifier->Shutdown();
}
zmq_ctx_destroy(pcontext);
pcontext = 0;
}
}
void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload)
{
if (fInitialDownload || pindexNew == pindexFork) // In IBD or blocks were disconnected without any new ones
return;
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyBlock(pindexNew))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyTransaction(tx))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::NotifyTransactionLock(const CTransaction &tx)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyTransactionLock(tx))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}