Merge bitcoin#14060: ZMQ: add options to configure outbound message high water mark, aka SNDHWM (#4360)

* Merge #14060: ZMQ: add options to configure outbound message high water mark, aka SNDHWM

a4edb168b635b6f5c36324e44961cd42cf9bbbaa ZMQ: add options to configure outbound message high water mark, aka SNDHWM (mruddy)

Pull request description:

  ZMQ: add options to configure outbound message high water mark, aka SNDHWM

  This is my attempt at https://github.com/bitcoin/bitcoin/pull/13315

Tree-SHA512: a4cc3bcf179776899261a97c8c4f31f35d1d8950fd71a09a79c5c064879b38e600b26824c89c4091d941502ed5b0255390882f7d44baf9e6dc49d685a86e8edb

* High watermark settings for Dash-specific messages

Signed-off-by: Dzutte <dzutte.tomsk@gmail.com>

Co-authored-by: Wladimir J. van der Laan <laanwj@gmail.com>
This commit is contained in:
Dzutte 2021-09-08 09:39:06 -07:00 committed by GitHub
parent 22dd7bfa37
commit 39933ecd8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 117 additions and 11 deletions

View File

@ -41,6 +41,7 @@ class ZMQHandler():
self.zmqContext = zmq.asyncio.Context() self.zmqContext = zmq.asyncio.Context()
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")

View File

@ -81,10 +81,35 @@ Currently, the following notifications are supported:
The socket type is PUB and the address must be a valid ZeroMQ socket The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification. address. The same address can be used in more than one notification.
The option to set the PUB socket's outbound message high water mark
(SNDHWM) may be set individually for each notification:
-zmqpubhashtxhwm=n
-zmqpubhashblockhwm=n
-zmqpubhashchainlockhwm=n
-zmqpubhashtxlockhwm=n
-zmqpubhashgovernancevotehwm=n
-zmqpubhashgovernanceobjecthwm=n
-zmqpubhashinstantsenddoublespendhwm=n
-zmqpubhashrecoveredsighwm=n
-zmqpubrawblockhwm=n
-zmqpubrawtxhwm=n
-zmqpubrawchainlockhwm=n
-zmqpubrawchainlocksighwm=n
-zmqpubrawtxlockhwm=n
-zmqpubrawtxlocksighwm=n
-zmqpubrawgovernancevotehwm=n
-zmqpubrawgovernanceobjecthwm=n
-zmqpubrawinstantsenddoublespendhwm=n
-zmqpubrawrecoveredsighwm=n
The high water mark value must be an integer greater than or equal to 0.
For instance: For instance:
$ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \ $ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \
-zmqpubrawtx=ipc:///tmp/dashd.tx.raw -zmqpubrawtx=ipc:///tmp/dashd.tx.raw \
-zmqpubhashtxhwm=10000
Each PUB notification has a topic and body, where the header Each PUB notification has a topic and body, where the header
corresponds to the notification type. For instance, for the corresponds to the notification type. For instance, for the

View File

@ -99,6 +99,7 @@
#include <openssl/crypto.h> #include <openssl/crypto.h>
#if ENABLE_ZMQ #if ENABLE_ZMQ
#include <zmq/zmqabstractnotifier.h>
#include <zmq/zmqnotificationinterface.h> #include <zmq/zmqnotificationinterface.h>
#include <zmq/zmqrpc.h> #include <zmq/zmqrpc.h>
#endif #endif
@ -596,6 +597,24 @@ void SetupServerArgs()
gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtx=<address>", "Enable publish raw transaction in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtxlock=<address>", "Enable publish raw transaction (locked via InstantSend) in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtxlocksig=<address>", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in <address>", false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashblockhwm=<n>", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashchainlockhwm=<n>", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashgovernanceobjecthwm=<n>", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashgovernancevotehwm=<n>", strprintf("Set publish hash governance vote outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashinstantsenddoublespendhwm=<n>", strprintf("Set publish hash InstantSend double spend outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashrecoveredsighwm=<n>", strprintf("Set publish hash recovered signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashtxhwm=<n>", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubhashtxlockhwm=<n>", strprintf("Set publish hash transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawblockhwm=<n>", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawchainlockhwm=<n>", strprintf("Set publish raw chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawchainlocksighwm=<n>", strprintf("Set publish raw chain lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawgovernanceobjecthwm=<n>", strprintf("Set publish raw governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawgovernancevotehwm=<n>", strprintf("Set publish raw governance vote outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawinstantsenddoublespendhwm=<n>", strprintf("Set publish raw InstantSend double spend outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawrecoveredsighwm=<n>", strprintf("Set publish raw recovered signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtxhwm=<n>", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtxlockhwm=<n>", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
gArgs.AddArg("-zmqpubrawtxlocksighwm=<n>", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ);
#else #else
hidden_args.emplace_back("-zmqpubhashblock=<address>"); hidden_args.emplace_back("-zmqpubhashblock=<address>");
hidden_args.emplace_back("-zmqpubhashchainlock=<address>"); hidden_args.emplace_back("-zmqpubhashchainlock=<address>");
@ -615,6 +634,24 @@ void SetupServerArgs()
hidden_args.emplace_back("-zmqpubrawtx=<address>"); hidden_args.emplace_back("-zmqpubrawtx=<address>");
hidden_args.emplace_back("-zmqpubrawtxlock=<address>"); hidden_args.emplace_back("-zmqpubrawtxlock=<address>");
hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>"); hidden_args.emplace_back("-zmqpubrawtxlocksig=<address>");
hidden_args.emplace_back("-zmqpubhashblockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashchainlockhwm=<n>");
hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm=<n>");
hidden_args.emplace_back("-zmqpubhashgovernancevotehwm=<n>");
hidden_args.emplace_back("-zmqpubhashinstantsenddoublespendhwm=<n>");
hidden_args.emplace_back("-zmqpubhashrecoveredsighwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxhwm=<n>");
hidden_args.emplace_back("-zmqpubhashtxlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawblockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawchainlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawchainlocksighwm=<n>");
hidden_args.emplace_back("-zmqpubrawgovernanceobjecthwm=<n>");
hidden_args.emplace_back("-zmqpubrawgovernancevotehwm=<n>");
hidden_args.emplace_back("-zmqpubrawinstantsenddoublespendhwm=<n>");
hidden_args.emplace_back("-zmqpubrawrecoveredsighwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlockhwm=<n>");
hidden_args.emplace_back("-zmqpubrawtxlocksighwm=<n>");
#endif #endif
gArgs.AddArg("-checkblockindex", strprintf("Do a full consistency check for mapBlockIndex, setBlockIndexCandidates, chainActive and mapBlocksUnlinked occasionally. (default: %u)", defaultChainParams->DefaultConsistencyChecks()), true, OptionsCategory::DEBUG_TEST); gArgs.AddArg("-checkblockindex", strprintf("Do a full consistency check for mapBlockIndex, setBlockIndexCandidates, chainActive and mapBlocksUnlinked occasionally. (default: %u)", defaultChainParams->DefaultConsistencyChecks()), true, OptionsCategory::DEBUG_TEST);

View File

@ -7,6 +7,8 @@
#include <cassert> #include <cassert>
const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM;
CZMQAbstractNotifier::~CZMQAbstractNotifier() CZMQAbstractNotifier::~CZMQAbstractNotifier()
{ {
assert(!psocket); assert(!psocket);

View File

@ -29,7 +29,9 @@ using CZMQNotifierFactory = std::unique_ptr<CZMQAbstractNotifier> (*)();
class CZMQAbstractNotifier class CZMQAbstractNotifier
{ {
public: public:
CZMQAbstractNotifier() : psocket(nullptr) { } static const int DEFAULT_ZMQ_SNDHWM {1000};
CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { }
virtual ~CZMQAbstractNotifier(); virtual ~CZMQAbstractNotifier();
template <typename T> template <typename T>
@ -42,6 +44,12 @@ public:
void SetType(const std::string &t) { type = t; } void SetType(const std::string &t) { type = t; }
std::string GetAddress() const { return address; } std::string GetAddress() const { return address; }
void SetAddress(const std::string &a) { address = a; } void SetAddress(const std::string &a) { address = a; }
int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; }
void SetOutboundMessageHighWaterMark(const int sndhwm) {
if (sndhwm >= 0) {
outbound_message_high_water_mark = sndhwm;
}
}
virtual bool Initialize(void *pcontext) = 0; virtual bool Initialize(void *pcontext) = 0;
virtual void Shutdown() = 0; virtual void Shutdown() = 0;
@ -59,6 +67,7 @@ protected:
void *psocket; void *psocket;
std::string type; std::string type;
std::string address; std::string address;
int outbound_message_high_water_mark; // aka SNDHWM
}; };
#endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H

View File

@ -64,6 +64,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
std::unique_ptr<CZMQAbstractNotifier> notifier = factory(); std::unique_ptr<CZMQAbstractNotifier> notifier = factory();
notifier->SetType(entry.first); notifier->SetType(entry.first);
notifier->SetAddress(address); notifier->SetAddress(address);
notifier->SetOutboundMessageHighWaterMark(static_cast<int>(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM)));
notifiers.push_back(std::move(notifier)); notifiers.push_back(std::move(notifier));
} }
} }

View File

@ -107,8 +107,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
return false; return false;
} }
int rc = zmq_bind(psocket, address.c_str()); LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark);
if (rc!=0)
int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark));
if (rc != 0)
{
zmqError("Failed to set outbound message high water mark");
zmq_close(psocket);
return false;
}
rc = zmq_bind(psocket, address.c_str());
if (rc != 0)
{ {
zmqError("Failed to bind address"); zmqError("Failed to bind address");
zmq_close(psocket); zmq_close(psocket);
@ -151,7 +161,7 @@ void CZMQAbstractPublishNotifier::Shutdown()
if (count == 1) if (count == 1)
{ {
LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address);
int linger = 0; int linger = 0;
zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
zmq_close(psocket); zmq_close(psocket);

View File

@ -14,7 +14,7 @@ class CGovernanceObject;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{ {
private: private:
uint32_t nSequence; //!< upcounting per message sequence number uint32_t nSequence {0U}; //!< upcounting per message sequence number
public: public:

View File

@ -23,7 +23,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
"[\n" "[\n"
" { (json object)\n" " { (json object)\n"
" \"type\": \"pubhashtx\", (string) Type of notification\n" " \"type\": \"pubhashtx\", (string) Type of notification\n"
" \"address\": \"...\" (string) Address of the publisher\n" " \"address\": \"...\", (string) Address of the publisher\n"
" \"hwm\": n (numeric) Outbound message high water mark\n"
" },\n" " },\n"
" ...\n" " ...\n"
"]\n" "]\n"
@ -39,6 +40,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request)
UniValue obj(UniValue::VOBJ); UniValue obj(UniValue::VOBJ);
obj.pushKV("type", n->GetType()); obj.pushKV("type", n->GetType());
obj.pushKV("address", n->GetAddress()); obj.pushKV("address", n->GetAddress());
obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark());
result.push_back(obj); result.push_back(obj);
} }
} }

View File

@ -117,10 +117,10 @@ class ZMQTest (BitcoinTestFramework):
self.log.info("Test the getzmqnotifications RPC") self.log.info("Test the getzmqnotifications RPC")
assert_equal(self.nodes[0].getzmqnotifications(), [ assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashblock", "address": ADDRESS}, {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000},
{"type": "pubhashtx", "address": ADDRESS}, {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000},
{"type": "pubrawblock", "address": ADDRESS}, {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000},
{"type": "pubrawtx", "address": ADDRESS}, {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000},
]) ])
assert_equal(self.nodes[1].getzmqnotifications(), []) assert_equal(self.nodes[1].getzmqnotifications(), [])

View File

@ -126,6 +126,7 @@ class DashZMQTest (DashTestFramework):
self.test_chainlock_publishers() self.test_chainlock_publishers()
self.test_instantsend_publishers() self.test_instantsend_publishers()
self.test_governance_publishers() self.test_governance_publishers()
self.test_getzmqnotifications()
finally: finally:
# Destroy the ZMQ context. # Destroy the ZMQ context.
self.log.debug("Destroying ZMQ context") self.log.debug("Destroying ZMQ context")
@ -381,6 +382,24 @@ class DashZMQTest (DashTestFramework):
# Unsubscribe from governance messages # Unsubscribe from governance messages
self.unsubscribe(governance_publishers) self.unsubscribe(governance_publishers)
def test_getzmqnotifications(self):
# Test getzmqnotifications RPC
assert_equal(self.nodes[0].getzmqnotifications(), [
{"type": "pubhashchainlock", "address": self.address, "hwm": 1000},
{"type": "pubhashgovernanceobject", "address": self.address, "hwm": 1000},
{"type": "pubhashgovernancevote", "address": self.address, "hwm": 1000},
{"type": "pubhashinstantsenddoublespend", "address": self.address, "hwm": 1000},
{"type": "pubhashrecoveredsig", "address": self.address, "hwm": 1000},
{"type": "pubhashtxlock", "address": self.address, "hwm": 1000},
{"type": "pubrawchainlock", "address": self.address, "hwm": 1000},
{"type": "pubrawchainlocksig", "address": self.address, "hwm": 1000},
{"type": "pubrawgovernanceobject", "address": self.address, "hwm": 1000},
{"type": "pubrawgovernancevote", "address": self.address, "hwm": 1000},
{"type": "pubrawinstantsenddoublespend", "address": self.address, "hwm": 1000},
{"type": "pubrawrecoveredsig", "address": self.address, "hwm": 1000},
{"type": "pubrawtxlock", "address": self.address, "hwm": 1000},
{"type": "pubrawtxlocksig", "address": self.address, "hwm": 1000},
])
if __name__ == '__main__': if __name__ == '__main__':
DashZMQTest().main() DashZMQTest().main()