From e6a14b64d665eb1fafd03a6bbc8d14597ce1c83c Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Tue, 18 Nov 2014 12:06:32 -0500 Subject: [PATCH] Add ZeroMQ support. Notify blocks and transactions via ZeroMQ Continues Johnathan Corgan's work. Publishing multipart messages Bugfix: Add missing zmq header includes Bugfix: Adjust build system to link ZeroMQ code for Qt binaries --- configure.ac | 22 ++++ contrib/zmq/zmq_sub.py | 37 ++++++ doc/zmq.md | 98 +++++++++++++++ src/Makefile.am | 26 +++- src/Makefile.qt.include | 3 + src/Makefile.qttest.include | 3 + src/Makefile.test.include | 4 + src/init.cpp | 36 +++++- src/validationinterface.cpp | 1 - src/zmq/zmqabstractnotifier.cpp | 22 ++++ src/zmq/zmqabstractnotifier.h | 42 +++++++ src/zmq/zmqconfig.h | 24 ++++ src/zmq/zmqnotificationinterface.cpp | 155 ++++++++++++++++++++++++ src/zmq/zmqnotificationinterface.h | 35 ++++++ src/zmq/zmqpublishnotifier.cpp | 172 +++++++++++++++++++++++++++ src/zmq/zmqpublishnotifier.h | 41 +++++++ 16 files changed, 717 insertions(+), 4 deletions(-) create mode 100755 contrib/zmq/zmq_sub.py create mode 100644 doc/zmq.md create mode 100644 src/zmq/zmqabstractnotifier.cpp create mode 100644 src/zmq/zmqabstractnotifier.h create mode 100644 src/zmq/zmqconfig.h create mode 100644 src/zmq/zmqnotificationinterface.cpp create mode 100644 src/zmq/zmqnotificationinterface.h create mode 100644 src/zmq/zmqpublishnotifier.cpp create mode 100644 src/zmq/zmqpublishnotifier.h diff --git a/configure.ac b/configure.ac index 07ee28f84..9955ab9bf 100644 --- a/configure.ac +++ b/configure.ac @@ -137,6 +137,12 @@ AC_ARG_ENABLE([glibc-back-compat], [use_glibc_compat=$enableval], [use_glibc_compat=no]) +AC_ARG_ENABLE([zmq], + [AC_HELP_STRING([--disable-zmq], + [Disable ZMQ notifications])], + [use_zmq=$enableval], + [use_zmq=yes]) + AC_ARG_WITH([protoc-bindir],[AS_HELP_STRING([--with-protoc-bindir=BIN_DIR],[specify protoc bin path])], [protoc_bin_path=$withval], []) # Enable debug @@ -833,6 +839,22 @@ if test x$bitcoin_enable_qt != xno; then fi fi +# conditional search for and use libzmq +AC_MSG_CHECKING([whether to build ZMQ support]) +if test "x$use_zmq" = "xyes"; then + AC_MSG_RESULT([yes]) + PKG_CHECK_MODULES([ZMQ],[libzmq], + [AC_DEFINE([ENABLE_ZMQ],[1],[Define to 1 to enable ZMQ functions])], + [AC_DEFINE([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions]) + AC_MSG_WARN([libzmq not found, disabling]) + use_zmq=no]) +else + AC_MSG_RESULT([no, --disable-zmq used]) + AC_DEFINE_UNQUOTED([ENABLE_ZMQ],[0],[Define to 1 to enable ZMQ functions]) +fi + +AM_CONDITIONAL([ENABLE_ZMQ], [test "x$use_zmq" = "xyes"]) + AC_MSG_CHECKING([whether to build test_bitcoin]) if test x$use_tests = xyes; then AC_MSG_RESULT([yes]) diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py new file mode 100755 index 000000000..decf29d42 --- /dev/null +++ b/contrib/zmq/zmq_sub.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python2 + +import array +import binascii +import zmq + +port = 28332 + +zmqContext = zmq.Context() +zmqSubSocket = zmqContext.socket(zmq.SUB) +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx") +zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) + +try: + while True: + msg = zmqSubSocket.recv_multipart() + topic = str(msg[0]) + body = msg[1] + + if topic == "hashblock": + print "- HASH BLOCK -" + print binascii.hexlify(body) + elif topic == "hashtx": + print '- HASH TX -' + print binascii.hexlify(body) + elif topic == "rawblock": + print "- RAW BLOCK HEADER -" + print binascii.hexlify(body[:80]) + elif topic == "rawtx": + print '- RAW TX -' + print binascii.hexlify(body) + +except KeyboardInterrupt: + zmqContext.destroy() diff --git a/doc/zmq.md b/doc/zmq.md new file mode 100644 index 000000000..fd04f6d9f --- /dev/null +++ b/doc/zmq.md @@ -0,0 +1,98 @@ +# Block and Transaction Broadcasting With ZeroMQ + +[ZeroMQ](http://zeromq.org/) is a lightweight wrapper around TCP +connections, inter-process communications, and shared-memory, +providing various message-oriented semantics such as publish/subcribe, +request/reply, and push/pull. + +The Bitcoin Core daemon can be configured to act as a trusted "border +router", implementing the bitcoin wire protocol and relay, making +consensus decisions, maintaining the local blockchain database, +broadcasting locally generated transactions into the network, and +providing a queryable RPC interface to interact on a polled basis for +requesting blockchain related data. However, there exists only a +limited service to notify external software of events like the arrival +of new blocks or transactions. + +The ZeroMQ facility implements a notification interface through a +set of specific notifiers. Currently there are notifiers that publish +blocks and transactions. This read-only facility requires only the +connection of a corresponding ZeroMQ subscriber port in receiving +software; it is not authenticated nor is there any two-way protocol +involvement. Therefore, subscribers should validate the received data +since it may be out of date, incomplete or even invalid. + +ZeroMQ sockets are self-connecting and self-healing; that is, connects +made between two endpoints will be automatically restored after an +outage, and either end may be freely started or stopped in any order. + +Because ZeroMQ is message oriented, subscribers receive transactions +and blocks all-at-once and do not need to implement any sort of +buffering or reassembly. + +## Prerequisites + +The ZeroMQ feature in Bitcoin Core uses only a very small part of the +ZeroMQ C API, and is thus compatible with any version of ZeroMQ +from 2.1 onward, including all versions in the 3.x and 4.x release +series. Typically, it is packaged by distributions as something like +*libzmq-dev*. + +The C++ wrapper for ZeroMQ is *not* needed. + +## Enabling + +By default, the ZeroMQ port functionality is enabled. Two steps are +required to enable--compiling in the ZeroMQ code, and configuring +runtime operation on the command-line or configuration file. + + $ ./configure --enable-zmq (other options) + +This will produce a binary that is capable of providing the ZeroMQ +facility, but will not do so until also configured properly. + +## Usage + +Currently, the following notifications are supported: + + -zmqpubhashtx=address + -zmqpubhashblock=address + -zmqpubrawblock=address + -zmqpubrawtx=address + +The socket type is PUB and the address must be a valid ZeroMQ +socket address. The same address can be used in more than one notification. + +For instance: + + $ bitcoind -zmqpubhashtx=tcp://127.0.0.1:28332 -zmqpubrawtx=ipc:///tmp/bitcoind.tx.raw + +Each PUB notification has a topic and body, where the header +corresponds to the notification type. For instance, for the notification +`-zmqpubhashtx` the topic is `hashtx` (no null terminator) and the body is the +hexadecimal transaction hash (32 bytes). + +These options can also be provided in bitcoin.conf. + +ZeroMQ endpoint specifiers for TCP (and others) are documented in the +[ZeroMQ API](http://api.zeromq.org). + +Client side, then, the ZeroMQ subscriber socket must have the +ZMQ_SUBSCRIBE option set to one or either of these prefixes (for instance, just `hash`); without +doing so will result in no messages arriving. Please see `contrib/zmq/zmq_sub.py` +for a working example. + +## Remarks + +From the perspective of bitcoind, the ZeroMQ socket is write-only; PUB +sockets don't even have a read function. Thus, there is no state +introduced into bitcoind directly. Furthermore, no information is +broadcast that wasn't already received from the public P2P network. + +No authentication or authorization is done on connecting clients; it +is assumed that the ZeroMQ port is exposed only to trusted entities, +using other means such as firewalling. + +Note that when the block chain tip changes, a reorganisation may occur and just +the tip will be notified. It is up to the subscriber to retrieve the chain +from the last known block to the new tip. diff --git a/src/Makefile.am b/src/Makefile.am index 390e9f143..67e848be3 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -48,6 +48,9 @@ if ENABLE_WALLET BITCOIN_INCLUDES += $(BDB_CPPFLAGS) EXTRA_LIBRARIES += libbitcoin_wallet.a endif +if ENABLE_ZMQ +EXTRA_LIBRARIES += libbitcoin_zmq.a +endif if BUILD_BITCOIN_LIBS lib_LTLIBRARIES = libbitcoinconsensus.la @@ -157,7 +160,12 @@ BITCOIN_CORE_H = \ wallet/db.h \ wallet/wallet.h \ wallet/wallet_ismine.h \ - wallet/walletdb.h + wallet/walletdb.h \ + zmq/zmqabstractnotifier.h \ + zmq/zmqconfig.h\ + zmq/zmqnotificationinterface.h \ + zmq/zmqpublishnotifier.h + obj/build.h: FORCE @$(MKDIR_P) $(builddir)/obj @@ -199,6 +207,17 @@ libbitcoin_server_a_SOURCES = \ validationinterface.cpp \ $(BITCOIN_CORE_H) +if ENABLE_ZMQ +LIBBITCOIN_ZMQ=libbitcoin_zmq.a + +libbitcoin_zmq_a_CPPFLAGS = $(BITCOIN_INCLUDES) +libbitcoin_zmq_a_SOURCES = \ + zmq/zmqabstractnotifier.cpp \ + zmq/zmqnotificationinterface.cpp \ + zmq/zmqpublishnotifier.cpp +endif + + # wallet: shared between bitcoind and bitcoin-qt, but only linked # when wallet enabled libbitcoin_wallet_a_CPPFLAGS = $(BITCOIN_INCLUDES) @@ -320,12 +339,15 @@ bitcoind_LDADD = \ $(LIBMEMENV) \ $(LIBSECP256K1) +if ENABLE_ZMQ +bitcoind_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS) +endif + if ENABLE_WALLET bitcoind_LDADD += libbitcoin_wallet.a endif bitcoind_LDADD += $(BOOST_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) -# # bitcoin-cli binary # bitcoin_cli_SOURCES = bitcoin-cli.cpp diff --git a/src/Makefile.qt.include b/src/Makefile.qt.include index 8d60aca25..3e8eda178 100644 --- a/src/Makefile.qt.include +++ b/src/Makefile.qt.include @@ -361,6 +361,9 @@ qt_bitcoin_qt_LDADD = qt/libbitcoinqt.a $(LIBBITCOIN_SERVER) if ENABLE_WALLET qt_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET) endif +if ENABLE_ZMQ +qt_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS) +endif qt_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) $(LIBMEMENV) \ $(BOOST_LIBS) $(QT_LIBS) $(QT_DBUS_LIBS) $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \ $(EVENT_PTHREADS_LIBS) $(EVENT_LIBS) diff --git a/src/Makefile.qttest.include b/src/Makefile.qttest.include index 4250bb8f3..6554580be 100644 --- a/src/Makefile.qttest.include +++ b/src/Makefile.qttest.include @@ -30,6 +30,9 @@ qt_test_test_bitcoin_qt_LDADD = $(LIBBITCOINQT) $(LIBBITCOIN_SERVER) if ENABLE_WALLET qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_WALLET) endif +if ENABLE_ZMQ +qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_ZMQ) $(ZMQ_LIBS) +endif qt_test_test_bitcoin_qt_LDADD += $(LIBBITCOIN_CLI) $(LIBBITCOIN_COMMON) $(LIBBITCOIN_UTIL) $(LIBBITCOIN_CRYPTO) $(LIBBITCOIN_UNIVALUE) $(LIBLEVELDB) \ $(LIBMEMENV) $(BOOST_LIBS) $(QT_DBUS_LIBS) $(QT_TEST_LIBS) $(QT_LIBS) \ $(QR_LIBS) $(PROTOBUF_LIBS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) $(LIBSECP256K1) \ diff --git a/src/Makefile.test.include b/src/Makefile.test.include index cc60cd92b..cee35926a 100644 --- a/src/Makefile.test.include +++ b/src/Makefile.test.include @@ -100,6 +100,10 @@ endif test_test_bitcoin_LDADD += $(LIBBITCOIN_CONSENSUS) $(BDB_LIBS) $(SSL_LIBS) $(CRYPTO_LIBS) $(MINIUPNPC_LIBS) test_test_bitcoin_LDFLAGS = $(RELDFLAGS) $(AM_LDFLAGS) $(LIBTOOL_APP_LDFLAGS) -static +if ENABLE_ZMQ +test_test_bitcoin_LDADD += $(ZMQ_LIBS) +endif + nodist_test_test_bitcoin_SOURCES = $(GENERATED_TEST_FILES) $(BITCOIN_TESTS): $(GENERATED_TEST_FILES) diff --git a/src/init.cpp b/src/init.cpp index a12e38ff5..f03388120 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -38,7 +38,6 @@ #include "wallet/wallet.h" #include "wallet/walletdb.h" #endif - #include #include @@ -55,6 +54,10 @@ #include #include +#if ENABLE_ZMQ +#include "zmq/zmqnotificationinterface.h" +#endif + using namespace std; #ifdef ENABLE_WALLET @@ -62,6 +65,10 @@ CWallet* pwalletMain = NULL; #endif bool fFeeEstimatesInitialized = false; +#if ENABLE_ZMQ +static CZMQNotificationInterface* pzmqNotificationInterface = NULL; +#endif + #ifdef WIN32 // Win32 LevelDB doesn't use filedescriptors, and the ones used for // accessing block files don't count towards the fd_set size limit @@ -211,6 +218,16 @@ void Shutdown() if (pwalletMain) pwalletMain->Flush(true); #endif + +#if ENABLE_ZMQ + if (pzmqNotificationInterface) { + UnregisterValidationInterface(pzmqNotificationInterface); + pzmqNotificationInterface->Shutdown(); + delete pzmqNotificationInterface; + pzmqNotificationInterface = NULL; + } +#endif + #ifndef WIN32 try { boost::filesystem::remove(GetPidFile()); @@ -375,6 +392,14 @@ std::string HelpMessage(HelpMessageMode mode) " " + _("(1 = keep tx meta data e.g. account owner and payment request information, 2 = drop tx meta data)")); #endif +#if ENABLE_ZMQ + strUsage += HelpMessageGroup(_("ZeroMQ notification options:")); + strUsage += HelpMessageOpt("-zmqpubhashblock=
", _("Enable publish hash block in
")); + strUsage += HelpMessageOpt("-zmqpubhashtransaction=
", _("Enable publish hash transaction in
")); + strUsage += HelpMessageOpt("-zmqpubrawblock=
", _("Enable publish raw block in
")); + strUsage += HelpMessageOpt("-zmqpubrawtransaction=
", _("Enable publish raw transaction in
")); +#endif + strUsage += HelpMessageGroup(_("Debugging/Testing options:")); if (showDebug) { @@ -1125,6 +1150,15 @@ bool AppInit2(boost::thread_group& threadGroup, CScheduler& scheduler) BOOST_FOREACH(const std::string& strDest, mapMultiArgs["-seednode"]) AddOneShot(strDest); +#if ENABLE_ZMQ + pzmqNotificationInterface = CZMQNotificationInterface::CreateWithArguments(mapArgs); + + if (pzmqNotificationInterface) { + pzmqNotificationInterface->Initialize(); + RegisterValidationInterface(pzmqNotificationInterface); + } +#endif + // ********************************************************* Step 7: load block chain fReindex = GetBoolArg("-reindex", false); diff --git a/src/validationinterface.cpp b/src/validationinterface.cpp index bdb706907..81f3b775f 100644 --- a/src/validationinterface.cpp +++ b/src/validationinterface.cpp @@ -45,7 +45,6 @@ void UnregisterAllValidationInterfaces() { g_signals.SetBestChain.disconnect_all_slots(); g_signals.UpdatedTransaction.disconnect_all_slots(); g_signals.SyncTransaction.disconnect_all_slots(); - g_signals.UpdatedTransaction.disconnect_all_slots(); g_signals.UpdatedBlockTip.disconnect_all_slots(); } diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp new file mode 100644 index 000000000..744ec5923 --- /dev/null +++ b/src/zmq/zmqabstractnotifier.cpp @@ -0,0 +1,22 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqabstractnotifier.h" +#include "util.h" + + +CZMQAbstractNotifier::~CZMQAbstractNotifier() +{ + assert(!psocket); +} + +bool CZMQAbstractNotifier::NotifyBlock(const uint256 &/*hash*/) +{ + return true; +} + +bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) +{ + return true; +} diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h new file mode 100644 index 000000000..626d1ddf9 --- /dev/null +++ b/src/zmq/zmqabstractnotifier.h @@ -0,0 +1,42 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H +#define BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H + +#include "zmqconfig.h" + +class CZMQAbstractNotifier; +typedef CZMQAbstractNotifier* (*CZMQNotifierFactory)(); + +class CZMQAbstractNotifier +{ +public: + CZMQAbstractNotifier() : psocket(0) { } + virtual ~CZMQAbstractNotifier(); + + template + static CZMQAbstractNotifier* Create() + { + return new T(); + } + + std::string GetType() const { return type; } + void SetType(const std::string &t) { type = t; } + std::string GetAddress() const { return address; } + void SetAddress(const std::string &a) { address = a; } + + virtual bool Initialize(void *pcontext) = 0; + virtual void Shutdown() = 0; + + virtual bool NotifyBlock(const uint256 &hash); + virtual bool NotifyTransaction(const CTransaction &transaction); + +protected: + void *psocket; + std::string type; + std::string address; +}; + +#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqconfig.h b/src/zmq/zmqconfig.h new file mode 100644 index 000000000..6057f5d1a --- /dev/null +++ b/src/zmq/zmqconfig.h @@ -0,0 +1,24 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQCONFIG_H +#define BITCOIN_ZMQ_ZMQCONFIG_H + +#if defined(HAVE_CONFIG_H) +#include "config/bitcoin-config.h" +#endif + +#include +#include + +#if ENABLE_ZMQ +#include +#endif + +#include "primitives/block.h" +#include "primitives/transaction.h" + +void zmqError(const char *str); + +#endif // BITCOIN_ZMQ_ZMQCONFIG_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp new file mode 100644 index 000000000..71ccb59a4 --- /dev/null +++ b/src/zmq/zmqnotificationinterface.cpp @@ -0,0 +1,155 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqnotificationinterface.h" +#include "zmqpublishnotifier.h" + +#include "version.h" +#include "main.h" +#include "streams.h" +#include "util.h" + +void zmqError(const char *str) +{ + LogPrint("zmq", "Error: %s, errno=%s\n", str, zmq_strerror(errno)); +} + +CZMQNotificationInterface::CZMQNotificationInterface() : pcontext(NULL) +{ +} + +CZMQNotificationInterface::~CZMQNotificationInterface() +{ + // ensure Shutdown if Initialize is called + assert(!pcontext); + + for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + delete *i; + } +} + +CZMQNotificationInterface* CZMQNotificationInterface::CreateWithArguments(const std::map &args) +{ + CZMQNotificationInterface* notificationInterface = NULL; + std::map factories; + std::list notifiers; + + factories["pubhashblock"] = CZMQAbstractNotifier::Create; + factories["pubhashtx"] = CZMQAbstractNotifier::Create; + factories["pubrawblock"] = CZMQAbstractNotifier::Create; + factories["pubrawtx"] = CZMQAbstractNotifier::Create; + + for (std::map::const_iterator i=factories.begin(); i!=factories.end(); ++i) + { + std::map::const_iterator j = args.find("-zmq" + i->first); + if (j!=args.end()) + { + CZMQNotifierFactory factory = i->second; + std::string address = j->second; + CZMQAbstractNotifier *notifier = factory(); + notifier->SetType(i->first); + notifier->SetAddress(address); + notifiers.push_back(notifier); + } + } + + if (!notifiers.empty()) + { + notificationInterface = new CZMQNotificationInterface(); + notificationInterface->notifiers = notifiers; + } + + return notificationInterface; +} + +// Called at startup to conditionally set up ZMQ socket(s) +bool CZMQNotificationInterface::Initialize() +{ + LogPrint("zmq", "Initialize notification interface\n"); + assert(!pcontext); + + pcontext = zmq_init(1); + + if (!pcontext) + { + zmqError("Unable to initialize context"); + return false; + } + + std::list::iterator i=notifiers.begin(); + for (; i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->Initialize(pcontext)) + { + LogPrint("zmq", " Notifier %s ready (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + } + else + { + LogPrint("zmq", " Notifier %s failed (address = %s)\n", notifier->GetType(), notifier->GetAddress()); + break; + } + } + + if (i!=notifiers.end()) + { + Shutdown(); + return false; + } + + return false; +} + +// Called during shutdown sequence +void CZMQNotificationInterface::Shutdown() +{ + LogPrint("zmq", "Shutdown notification interface\n"); + if (pcontext) + { + for (std::list::iterator i=notifiers.begin(); i!=notifiers.end(); ++i) + { + CZMQAbstractNotifier *notifier = *i; + LogPrint("zmq", " Shutdown notifier %s at %s\n", notifier->GetType(), notifier->GetAddress()); + notifier->Shutdown(); + } + zmq_ctx_destroy(pcontext); + + pcontext = 0; + } +} + +void CZMQNotificationInterface::UpdatedBlockTip(const uint256 &hash) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyBlock(hash)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + +void CZMQNotificationInterface::SyncTransaction(const CTransaction &tx, const CBlock *pblock) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyTransaction(tx)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h new file mode 100644 index 000000000..afc0b8d24 --- /dev/null +++ b/src/zmq/zmqnotificationinterface.h @@ -0,0 +1,35 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H +#define BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H + +#include "validationinterface.h" +#include +#include + +class CZMQAbstractNotifier; + +class CZMQNotificationInterface : public CValidationInterface +{ +public: + virtual ~CZMQNotificationInterface(); + + static CZMQNotificationInterface* CreateWithArguments(const std::map &args); + + bool Initialize(); + void Shutdown(); + +protected: // CValidationInterface + void SyncTransaction(const CTransaction &tx, const CBlock *pblock); + void UpdatedBlockTip(const uint256 &newHashTip); + +private: + CZMQNotificationInterface(); + + void *pcontext; + std::list notifiers; +}; + +#endif // BITCOIN_ZMQ_ZMQNOTIFICATIONINTERFACE_H diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp new file mode 100644 index 000000000..0a6d7d0db --- /dev/null +++ b/src/zmq/zmqpublishnotifier.cpp @@ -0,0 +1,172 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include "zmqpublishnotifier.h" +#include "main.h" +#include "util.h" + +static std::multimap mapPublishNotifiers; + +// Internal function to send multipart message +static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) +{ + va_list args; + va_start(args, size); + + while (1) + { + zmq_msg_t msg; + + int rc = zmq_msg_init_size(&msg, size); + if (rc != 0) + { + zmqError("Unable to initialize ZMQ msg"); + return -1; + } + + void *buf = zmq_msg_data(&msg); + memcpy(buf, data, size); + + data = va_arg(args, const void*); + + rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0); + if (rc == -1) + { + zmqError("Unable to send ZMQ msg"); + zmq_msg_close(&msg); + return -1; + } + + zmq_msg_close(&msg); + + if (!data) + break; + + size = va_arg(args, size_t); + } + return 0; +} + +bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) +{ + assert(!psocket); + + // check if address is being used by other publish notifier + std::multimap::iterator i = mapPublishNotifiers.find(address); + + if (i==mapPublishNotifiers.end()) + { + psocket = zmq_socket(pcontext, ZMQ_PUB); + if (!psocket) + { + zmqError("Failed to create socket"); + return false; + } + + int rc = zmq_bind(psocket, address.c_str()); + if (rc!=0) + { + zmqError("Failed to bind address"); + return false; + } + + // register this notifier for the address, so it can be reused for other publish notifier + mapPublishNotifiers.insert(std::make_pair(address, this)); + return true; + } + else + { + LogPrint("zmq", " Reuse socket for address %s\n", address); + + psocket = i->second->psocket; + mapPublishNotifiers.insert(std::make_pair(address, this)); + + return true; + } +} + +void CZMQAbstractPublishNotifier::Shutdown() +{ + assert(psocket); + + int count = mapPublishNotifiers.count(address); + + // remove this notifier from the list of publishers using this address + typedef std::multimap::iterator iterator; + std::pair iterpair = mapPublishNotifiers.equal_range(address); + + for (iterator it = iterpair.first; it != iterpair.second; ++it) + { + if (it->second==this) + { + mapPublishNotifiers.erase(it); + break; + } + } + + if (count == 1) + { + LogPrint("zmq", "Close socket at address %s\n", address); + int linger = 0; + zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); + zmq_close(psocket); + } + + psocket = 0; +} + +bool CZMQPublishHashBlockNotifier::NotifyBlock(const uint256 &hash) +{ + LogPrint("zmq", "Publish hash block %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); + return rc == 0; +} + +bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "Publish hash transaction %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); + return rc == 0; +} + +bool CZMQPublishRawBlockNotifier::NotifyBlock(const uint256 &hash) +{ + LogPrint("zmq", "Publish raw block %s\n", hash.GetHex()); + + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + { + LOCK(cs_main); + + CBlock block; + CBlockIndex* pblockindex = mapBlockIndex[hash]; + + if(!ReadBlockFromDisk(block, pblockindex)) + { + zmqError("Can't read block from disk"); + return false; + } + + ss << block; + } + + int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); + return rc == 0; +} + +bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) +{ + uint256 hash = transaction.GetHash(); + LogPrint("zmq", "Publish raw transaction %s\n", hash.GetHex()); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + ss << transaction; + int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); + return rc == 0; +} diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h new file mode 100644 index 000000000..a0eb26f5e --- /dev/null +++ b/src/zmq/zmqpublishnotifier.h @@ -0,0 +1,41 @@ +// Copyright (c) 2015 The Bitcoin Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H +#define BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H + +#include "zmqabstractnotifier.h" + +class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier +{ +public: + bool Initialize(void *pcontext); + void Shutdown(); +}; + +class CZMQPublishHashBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const uint256 &hash); +}; + +class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +class CZMQPublishRawBlockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyBlock(const uint256 &hash); +}; + +class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyTransaction(const CTransaction &transaction); +}; + +#endif // BITCOIN_ZMQ_ZMQPUBLISHNOTIFIER_H