diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index aa4c660a6b..9c8bb5e9fd 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -41,6 +41,7 @@ class ZMQHandler(): self.zmqContext = zmq.asyncio.Context() self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) + self.zmqSubSocket.setsockopt(zmq.RCVHWM, 0) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") diff --git a/doc/zmq.md b/doc/zmq.md index 69a6072390..b7b93aaf26 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -81,10 +81,35 @@ Currently, the following notifications are supported: The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. +The option to set the PUB socket's outbound message high water mark +(SNDHWM) may be set individually for each notification: + + -zmqpubhashtxhwm=n + -zmqpubhashblockhwm=n + -zmqpubhashchainlockhwm=n + -zmqpubhashtxlockhwm=n + -zmqpubhashgovernancevotehwm=n + -zmqpubhashgovernanceobjecthwm=n + -zmqpubhashinstantsenddoublespendhwm=n + -zmqpubhashrecoveredsighwm=n + -zmqpubrawblockhwm=n + -zmqpubrawtxhwm=n + -zmqpubrawchainlockhwm=n + -zmqpubrawchainlocksighwm=n + -zmqpubrawtxlockhwm=n + -zmqpubrawtxlocksighwm=n + -zmqpubrawgovernancevotehwm=n + -zmqpubrawgovernanceobjecthwm=n + -zmqpubrawinstantsenddoublespendhwm=n + -zmqpubrawrecoveredsighwm=n + +The high water mark value must be an integer greater than or equal to 0. + For instance: $ dashd -zmqpubhashtx=tcp://127.0.0.1:28332 \ - -zmqpubrawtx=ipc:///tmp/dashd.tx.raw + -zmqpubrawtx=ipc:///tmp/dashd.tx.raw \ + -zmqpubhashtxhwm=10000 Each PUB notification has a topic and body, where the header corresponds to the notification type. For instance, for the diff --git a/src/init.cpp b/src/init.cpp index a9d7e8e608..2e011b47a4 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -99,6 +99,7 @@ #include #if ENABLE_ZMQ +#include #include #include #endif @@ -596,6 +597,24 @@ void SetupServerArgs() gArgs.AddArg("-zmqpubrawtx=
", "Enable publish raw transaction in
", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtxlock=
", "Enable publish raw transaction (locked via InstantSend) in
", false, OptionsCategory::ZMQ); gArgs.AddArg("-zmqpubrawtxlocksig=
", "Enable publish raw transaction (locked via InstantSend) and ISLOCK in
", false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashblockhwm=", strprintf("Set publish hash block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashchainlockhwm=", strprintf("Set publish hash chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashgovernanceobjecthwm=", strprintf("Set publish hash governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashgovernancevotehwm=", strprintf("Set publish hash governance vote outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashinstantsenddoublespendhwm=", strprintf("Set publish hash InstantSend double spend outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashrecoveredsighwm=", strprintf("Set publish hash recovered signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashtxhwm=", strprintf("Set publish hash transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubhashtxlockhwm=", strprintf("Set publish hash transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawblockhwm=", strprintf("Set publish raw block outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawchainlockhwm=", strprintf("Set publish raw chain lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawchainlocksighwm=", strprintf("Set publish raw chain lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawgovernanceobjecthwm=", strprintf("Set publish raw governance object outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawgovernancevotehwm=", strprintf("Set publish raw governance vote outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawinstantsenddoublespendhwm=", strprintf("Set publish raw InstantSend double spend outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawrecoveredsighwm=", strprintf("Set publish raw recovered signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawtxhwm=", strprintf("Set publish raw transaction outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawtxlockhwm=", strprintf("Set publish raw transaction lock outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); + gArgs.AddArg("-zmqpubrawtxlocksighwm=", strprintf("Set publish raw transaction lock signature outbound message high water mark (default: %d)", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM), false, OptionsCategory::ZMQ); #else hidden_args.emplace_back("-zmqpubhashblock=
"); hidden_args.emplace_back("-zmqpubhashchainlock=
"); @@ -615,6 +634,24 @@ void SetupServerArgs() hidden_args.emplace_back("-zmqpubrawtx=
"); hidden_args.emplace_back("-zmqpubrawtxlock=
"); hidden_args.emplace_back("-zmqpubrawtxlocksig=
"); + hidden_args.emplace_back("-zmqpubhashblockhwm="); + hidden_args.emplace_back("-zmqpubhashchainlockhwm="); + hidden_args.emplace_back("-zmqpubhashgovernanceobjecthwm="); + hidden_args.emplace_back("-zmqpubhashgovernancevotehwm="); + hidden_args.emplace_back("-zmqpubhashinstantsenddoublespendhwm="); + hidden_args.emplace_back("-zmqpubhashrecoveredsighwm="); + hidden_args.emplace_back("-zmqpubhashtxhwm="); + hidden_args.emplace_back("-zmqpubhashtxlockhwm="); + hidden_args.emplace_back("-zmqpubrawblockhwm="); + hidden_args.emplace_back("-zmqpubrawchainlockhwm="); + hidden_args.emplace_back("-zmqpubrawchainlocksighwm="); + hidden_args.emplace_back("-zmqpubrawgovernanceobjecthwm="); + hidden_args.emplace_back("-zmqpubrawgovernancevotehwm="); + hidden_args.emplace_back("-zmqpubrawinstantsenddoublespendhwm="); + hidden_args.emplace_back("-zmqpubrawrecoveredsighwm="); + hidden_args.emplace_back("-zmqpubrawtxhwm="); + hidden_args.emplace_back("-zmqpubrawtxlockhwm="); + hidden_args.emplace_back("-zmqpubrawtxlocksighwm="); #endif gArgs.AddArg("-checkblockindex", strprintf("Do a full consistency check for mapBlockIndex, setBlockIndexCandidates, chainActive and mapBlocksUnlinked occasionally. (default: %u)", defaultChainParams->DefaultConsistencyChecks()), true, OptionsCategory::DEBUG_TEST); diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 296570274d..3b51e87fbe 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -7,6 +7,8 @@ #include +const int CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM; + CZMQAbstractNotifier::~CZMQAbstractNotifier() { assert(!psocket); diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 7683418454..bb39301f08 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -29,7 +29,9 @@ using CZMQNotifierFactory = std::unique_ptr (*)(); class CZMQAbstractNotifier { public: - CZMQAbstractNotifier() : psocket(nullptr) { } + static const int DEFAULT_ZMQ_SNDHWM {1000}; + + CZMQAbstractNotifier() : psocket(nullptr), outbound_message_high_water_mark(DEFAULT_ZMQ_SNDHWM) { } virtual ~CZMQAbstractNotifier(); template @@ -42,6 +44,12 @@ public: void SetType(const std::string &t) { type = t; } std::string GetAddress() const { return address; } void SetAddress(const std::string &a) { address = a; } + int GetOutboundMessageHighWaterMark() const { return outbound_message_high_water_mark; } + void SetOutboundMessageHighWaterMark(const int sndhwm) { + if (sndhwm >= 0) { + outbound_message_high_water_mark = sndhwm; + } + } virtual bool Initialize(void *pcontext) = 0; virtual void Shutdown() = 0; @@ -59,6 +67,7 @@ protected: void *psocket; std::string type; std::string address; + int outbound_message_high_water_mark; // aka SNDHWM }; #endif // BITCOIN_ZMQ_ZMQABSTRACTNOTIFIER_H diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 9b78c7adac..9a2808dad0 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -64,6 +64,7 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() std::unique_ptr notifier = factory(); notifier->SetType(entry.first); notifier->SetAddress(address); + notifier->SetOutboundMessageHighWaterMark(static_cast(gArgs.GetArg(arg + "hwm", CZMQAbstractNotifier::DEFAULT_ZMQ_SNDHWM))); notifiers.push_back(std::move(notifier)); } } diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 9b050582d4..0dbc5b60eb 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -107,8 +107,18 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) return false; } - int rc = zmq_bind(psocket, address.c_str()); - if (rc!=0) + LogPrint(BCLog::ZMQ, "zmq: Outbound message high water mark for %s at %s is %d\n", type, address, outbound_message_high_water_mark); + + int rc = zmq_setsockopt(psocket, ZMQ_SNDHWM, &outbound_message_high_water_mark, sizeof(outbound_message_high_water_mark)); + if (rc != 0) + { + zmqError("Failed to set outbound message high water mark"); + zmq_close(psocket); + return false; + } + + rc = zmq_bind(psocket, address.c_str()); + if (rc != 0) { zmqError("Failed to bind address"); zmq_close(psocket); @@ -151,7 +161,7 @@ void CZMQAbstractPublishNotifier::Shutdown() if (count == 1) { - LogPrint(BCLog::ZMQ, "Close socket at address %s\n", address); + LogPrint(BCLog::ZMQ, "zmq: Close socket at address %s\n", address); int linger = 0; zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger)); zmq_close(psocket); diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index 2e97624ac9..6fc488eba4 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -14,7 +14,7 @@ class CGovernanceObject; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { private: - uint32_t nSequence; //!< upcounting per message sequence number + uint32_t nSequence {0U}; //!< upcounting per message sequence number public: diff --git a/src/zmq/zmqrpc.cpp b/src/zmq/zmqrpc.cpp index d5f1619863..66b491427d 100644 --- a/src/zmq/zmqrpc.cpp +++ b/src/zmq/zmqrpc.cpp @@ -23,7 +23,8 @@ UniValue getzmqnotifications(const JSONRPCRequest& request) "[\n" " { (json object)\n" " \"type\": \"pubhashtx\", (string) Type of notification\n" - " \"address\": \"...\" (string) Address of the publisher\n" + " \"address\": \"...\", (string) Address of the publisher\n" + " \"hwm\": n (numeric) Outbound message high water mark\n" " },\n" " ...\n" "]\n" @@ -39,6 +40,7 @@ UniValue getzmqnotifications(const JSONRPCRequest& request) UniValue obj(UniValue::VOBJ); obj.pushKV("type", n->GetType()); obj.pushKV("address", n->GetAddress()); + obj.pushKV("hwm", n->GetOutboundMessageHighWaterMark()); result.push_back(obj); } } diff --git a/test/functional/interface_zmq.py b/test/functional/interface_zmq.py index 79bc98438f..f05751c2a5 100755 --- a/test/functional/interface_zmq.py +++ b/test/functional/interface_zmq.py @@ -117,10 +117,10 @@ class ZMQTest (BitcoinTestFramework): self.log.info("Test the getzmqnotifications RPC") assert_equal(self.nodes[0].getzmqnotifications(), [ - {"type": "pubhashblock", "address": ADDRESS}, - {"type": "pubhashtx", "address": ADDRESS}, - {"type": "pubrawblock", "address": ADDRESS}, - {"type": "pubrawtx", "address": ADDRESS}, + {"type": "pubhashblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubhashtx", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawblock", "address": ADDRESS, "hwm": 1000}, + {"type": "pubrawtx", "address": ADDRESS, "hwm": 1000}, ]) assert_equal(self.nodes[1].getzmqnotifications(), []) diff --git a/test/functional/interface_zmq_dash.py b/test/functional/interface_zmq_dash.py index 4941eaa38a..12b5fb24ef 100755 --- a/test/functional/interface_zmq_dash.py +++ b/test/functional/interface_zmq_dash.py @@ -126,6 +126,7 @@ class DashZMQTest (DashTestFramework): self.test_chainlock_publishers() self.test_instantsend_publishers() self.test_governance_publishers() + self.test_getzmqnotifications() finally: # Destroy the ZMQ context. self.log.debug("Destroying ZMQ context") @@ -381,6 +382,24 @@ class DashZMQTest (DashTestFramework): # Unsubscribe from governance messages self.unsubscribe(governance_publishers) + def test_getzmqnotifications(self): + # Test getzmqnotifications RPC + assert_equal(self.nodes[0].getzmqnotifications(), [ + {"type": "pubhashchainlock", "address": self.address, "hwm": 1000}, + {"type": "pubhashgovernanceobject", "address": self.address, "hwm": 1000}, + {"type": "pubhashgovernancevote", "address": self.address, "hwm": 1000}, + {"type": "pubhashinstantsenddoublespend", "address": self.address, "hwm": 1000}, + {"type": "pubhashrecoveredsig", "address": self.address, "hwm": 1000}, + {"type": "pubhashtxlock", "address": self.address, "hwm": 1000}, + {"type": "pubrawchainlock", "address": self.address, "hwm": 1000}, + {"type": "pubrawchainlocksig", "address": self.address, "hwm": 1000}, + {"type": "pubrawgovernanceobject", "address": self.address, "hwm": 1000}, + {"type": "pubrawgovernancevote", "address": self.address, "hwm": 1000}, + {"type": "pubrawinstantsenddoublespend", "address": self.address, "hwm": 1000}, + {"type": "pubrawrecoveredsig", "address": self.address, "hwm": 1000}, + {"type": "pubrawtxlock", "address": self.address, "hwm": 1000}, + {"type": "pubrawtxlocksig", "address": self.address, "hwm": 1000}, + ]) if __name__ == '__main__': DashZMQTest().main()