Merge #18309: zmq: Add support to listen on multiple interfaces

e66870c5a4c2adbd30dca67d409fd5cd98697587 zmq: Append address to notify log output (nthumann)
241803da211265444e65f254f24dd184f2457fa9 test: Add zmq test to support multiple interfaces (nthumann)
a0b2e5cb6aa8db0563fac7d67a949b9baefe3a25 doc: Add release notes to support multiple interfaces (nthumann)
b1c3f180ecb63f3960506d202feebaa4271058ae doc: Adjust ZMQ usage to support multiple interfaces (nthumann)
347c94f551c3f144c44e00373e4dd61ff6d908b7 zmq: Add support to listen on multiple interfaces (Nicolas Thumann)

Pull request description:

  This PR adds support for ZeroMQ to listen on multiple interfaces, just like the RPC server.
  Currently, if you specify more than one e.g. `zmqpubhashblock` paramter, only the first one will be used. Therefore a user may be forced to listen on all interfaces (e.g. `zmqpubhashblock=0.0.0.0:28332`), which can result in an increased attack surface.
  With this PR a user can specify multiple interfaces to listen on, e.g.
  `-zmqpubhashblock=tcp://127.0.0.1:28332 -zmqpubhashblock=tcp://192.168.1.123:28332`.

ACKs for top commit:
  laanwj:
    Code review ACK e66870c5a4c2adbd30dca67d409fd5cd98697587
  instagibbs:
    reACK e66870c5a4

Tree-SHA512: f38ab4a6ff00dc821e5f4842508cefadb701e70bb3893992c1b32049be20247c8aa9476a1f886050c5f17fe7f2ce99ee30193ce2c81a7482a5a51f8fc22300c7
This commit is contained in:
Wladimir J. van der Laan 2020-10-01 17:41:22 +02:00 committed by PastaPastaPasta
parent 08349ee1da
commit e29a35a997
5 changed files with 41 additions and 12 deletions

View File

@ -0,0 +1,4 @@
Command-line options
-----------------------------
The same ZeroMQ notification (e.g. `-zmqpubhashtx=address`) can now be specified multiple times to publish the same notification to different ZeroMQ sockets.

View File

@ -80,6 +80,7 @@ Currently, the following notifications are supported:
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.
The same notification can be specified more than once.
The option to set the PUB socket's outbound message high water mark
(SNDHWM) may be set individually for each notification:
@ -108,6 +109,7 @@ The high water mark value must be an integer greater than or equal to 0.
For instance:
$ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubhashtx=tcp://192.168.1.2:28332 \
-zmqpubrawtx=ipc:///tmp/dashd.tx.raw \
-zmqpubhashtxhwm=10000

View File

@ -55,10 +55,8 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
for (const auto& entry : factories)
{
std::string arg("-zmq" + entry.first);
if (gArgs.IsArgSet(arg))
{
const auto& factory = entry.second;
const std::string address = gArgs.GetArg(arg, "");
const auto& factory = entry.second;
for (const std::string& address : gArgs.GetArgs(arg)) {
std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
notifier->SetType(entry.first);
notifier->SetAddress(address);

View File

@ -198,7 +198,7 @@ bool CZMQAbstractPublishNotifier::SendZmqMessage(const char *command, const void
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashblock %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -218,7 +218,7 @@ bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish hashtx %s to %s\n", hash.GetHex(), this->address);
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
@ -279,7 +279,7 @@ bool CZMQPublishHashRecoveredSigNotifier::NotifyRecoveredSig(const std::shared_p
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawblock %s to %s\n", pindex->GetBlockHash().GetHex(), this->address);
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
@ -344,7 +344,7 @@ bool CZMQPublishRawChainLockSigNotifier::NotifyChainLock(const CBlockIndex *pind
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtx %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction;
return SendZmqMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
@ -353,7 +353,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
{
uint256 hash = transaction->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlock %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlock %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *transaction;
return SendZmqMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size());
@ -362,7 +362,7 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac
bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTransactionRef& transaction, const std::shared_ptr<const llmq::CInstantSendLock>& islock)
{
uint256 hash = transaction->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlocksig %s\n", hash.GetHex());
LogPrint(BCLog::ZMQ, "zmq: Publish rawtxlocksig %s to %s\n", hash.GetHex(), this->address);
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *transaction;
ss << *islock;
@ -372,7 +372,7 @@ bool CZMQPublishRawTransactionLockSigNotifier::NotifyTransactionLock(const CTran
bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const std::shared_ptr<const CGovernanceVote>& vote)
{
uint256 nHash = vote->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, vote = %d\n", nHash.ToString(), vote->ToString());
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, vote = %d\n", nHash.ToString(), this->address, vote->ToString());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *vote;
return SendZmqMessage(MSG_RAWGVOTE, &(*ss.begin()), ss.size());
@ -381,7 +381,7 @@ bool CZMQPublishRawGovernanceVoteNotifier::NotifyGovernanceVote(const std::share
bool CZMQPublishRawGovernanceObjectNotifier::NotifyGovernanceObject(const std::shared_ptr<const CGovernanceObject>& govobj)
{
uint256 nHash = govobj->GetHash();
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s, type = %d\n", nHash.ToString(), ToUnderlying(govobj->GetObjectType()));
LogPrint(BCLog::ZMQ, "zmq: Publish rawgovernanceobject: hash = %s to %s, type = %d\n", nHash.ToString(), this->address, ToUnderlying(govobj->GetObjectType()));
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << *govobj;
return SendZmqMessage(MSG_RAWGOBJ, &(*ss.begin()), ss.size());

View File

@ -78,6 +78,7 @@ class ZMQTest (BitcoinTestFramework):
def run_test(self):
try:
self._zmq_test()
self.test_multiple_interfaces()
finally:
# Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context")
@ -131,5 +132,29 @@ class ZMQTest (BitcoinTestFramework):
assert_equal(self.nodes[1].getzmqnotifications(), [])
def test_multiple_interfaces(self):
import zmq
# Set up two subscribers with different addresses
subscribers = []
for i in range(2):
address = 'tcp://127.0.0.1:%d' % (28334 + i)
socket = self.zmq_context.socket(zmq.SUB)
socket.set(zmq.RCVTIMEO, 60000)
hashblock = ZMQSubscriber(socket, b"hashblock")
socket.connect(address)
subscribers.append({'address': address, 'hashblock': hashblock})
self.restart_node(0, ['-zmqpub%s=%s' % (subscriber['hashblock'].topic.decode(), subscriber['address']) for subscriber in subscribers])
# Relax so that the subscriber is ready before publishing zmq messages
sleep(0.2)
# Generate 1 block in nodes[0] and receive all notifications
self.nodes[0].generatetoaddress(1, ADDRESS_BCRT1_UNSPENDABLE)
# Should receive the same block hash on both subscribers
assert_equal(self.nodes[0].getbestblockhash(), subscribers[0]['hashblock'].receive().hex())
assert_equal(self.nodes[0].getbestblockhash(), subscribers[1]['hashblock'].receive().hex())
if __name__ == '__main__':
ZMQTest().main()