From 89f6f759108f5bd7cae70dc454b24904a2ae13ab Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Wed, 8 May 2019 12:12:54 +0300 Subject: [PATCH] Implement zmq notifications for chainlocked blocks (#2899) * Unify zmq message order * Implement `zmqpubhashchainlock` and `zmqpubrawchainlock` --- contrib/zmq/zmq_sub.py | 36 +++++++++++------- contrib/zmq/zmq_sub3.4.py | 36 +++++++++++------- doc/zmq.md | 16 ++++---- src/zmq/zmqabstractnotifier.cpp | 5 +++ src/zmq/zmqabstractnotifier.h | 1 + src/zmq/zmqnotificationinterface.cpp | 19 ++++++++++ src/zmq/zmqnotificationinterface.h | 1 + src/zmq/zmqpublishnotifier.cpp | 57 ++++++++++++++++++++++------ src/zmq/zmqpublishnotifier.h | 12 ++++++ 9 files changed, 136 insertions(+), 47 deletions(-) diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index 035a39765a..85b9a730df 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -43,12 +43,14 @@ class ZMQHandler(): self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote") @@ -69,38 +71,44 @@ class ZMQHandler(): if topic == b"hashblock": print('- HASH BLOCK ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"hashchainlock": + print('- HASH CHAINLOCK ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashtx": print ('- HASH TX ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashtxlock": print('- HASH TX LOCK ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"rawblock": - print('- RAW BLOCK HEADER ('+sequence+') -') - print(binascii.hexlify(body[:80]).decode("utf-8")) - elif topic == b"rawtx": - print('- RAW TX ('+sequence+') -') - print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"rawtxlock": - print('- RAW TX LOCK ('+sequence+') -') - print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"rawinstantsenddoublespend": - print('- RAW IS DOUBLE SPEND ('+sequence+') -') - print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashgovernancevote": print('- HASH GOVERNANCE VOTE ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashgovernanceobject": print('- HASH GOVERNANCE OBJECT ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"hashinstantsenddoublespend": + print('- HASH IS DOUBLE SPEND ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"rawblock": + print('- RAW BLOCK HEADER ('+sequence+') -') + print(binascii.hexlify(body[:80]).decode("utf-8")) + elif topic == b"rawchainlock": + print('- RAW CHAINLOCK ('+sequence+') -') + print(binascii.hexlify(body[:80]).decode("utf-8")) + elif topic == b"rawtx": + print('- RAW TX ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"rawtxlock": + print('- RAW TX LOCK ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) elif topic == b"rawgovernancevote": print('- RAW GOVERNANCE VOTE ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) elif topic == b"rawgovernanceobject": print('- RAW GOVERNANCE OBJECT ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"hashinstantsenddoublespend": - print('- HASH IS DOUBLE SPEND ('+sequence+') -') + elif topic == b"rawinstantsenddoublespend": + print('- RAW IS DOUBLE SPEND ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) # schedule ourselves to receive the next message asyncio.ensure_future(self.handle()) diff --git a/contrib/zmq/zmq_sub3.4.py b/contrib/zmq/zmq_sub3.4.py index 015e691820..f84bb6fcd0 100755 --- a/contrib/zmq/zmq_sub3.4.py +++ b/contrib/zmq/zmq_sub3.4.py @@ -47,12 +47,14 @@ class ZMQHandler(): self.zmqSubSocket = self.zmqContext.socket(zmq.SUB) self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock") + self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock") self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote") @@ -72,38 +74,44 @@ class ZMQHandler(): if topic == b"hashblock": print('- HASH BLOCK ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"hashchainlock": + print('- HASH CHAINLOCK ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashtx": print ('- HASH TX ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashtxlock": print('- HASH TX LOCK ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"rawblock": - print('- RAW BLOCK HEADER ('+sequence+') -') - print(binascii.hexlify(body[:80]).decode("utf-8")) - elif topic == b"rawtx": - print('- RAW TX ('+sequence+') -') - print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"rawtxlock": - print('- RAW TX LOCK ('+sequence+') -') - print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"rawinstantsenddoublespend": - print('- RAW IS DOUBLE SPEND ('+sequence+') -') - print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashgovernancevote": print('- HASH GOVERNANCE VOTE ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) elif topic == b"hashgovernanceobject": print('- HASH GOVERNANCE OBJECT ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"hashinstantsenddoublespend": + print('- HASH IS DOUBLE SPEND ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"rawblock": + print('- RAW BLOCK HEADER ('+sequence+') -') + print(binascii.hexlify(body[:80]).decode("utf-8")) + elif topic == b"rawchainlock": + print('- RAW CHAINLOCK ('+sequence+') -') + print(binascii.hexlify(body[:80]).decode("utf-8")) + elif topic == b"rawtx": + print('- RAW TX ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) + elif topic == b"rawtxlock": + print('- RAW TX LOCK ('+sequence+') -') + print(binascii.hexlify(body).decode("utf-8")) elif topic == b"rawgovernancevote": print('- RAW GOVERNANCE VOTE ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) elif topic == b"rawgovernanceobject": print('- RAW GOVERNANCE OBJECT ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) - elif topic == b"hashinstantsenddoublespend": - print('- HASH IS DOUBLE SPEND ('+sequence+') -') + elif topic == b"rawinstantsenddoublespend": + print('- RAW IS DOUBLE SPEND ('+sequence+') -') print(binascii.hexlify(body).decode("utf-8")) # schedule ourselves to receive the next message asyncio.ensure_future(self.handle()) diff --git a/doc/zmq.md b/doc/zmq.md index 627f46c2d1..8b90fee8ae 100644 --- a/doc/zmq.md +++ b/doc/zmq.md @@ -56,18 +56,20 @@ the command line or in the configuration file. Currently, the following notifications are supported: + -zmqpubhashblock=address + -zmqpubhashchainlock=address -zmqpubhashtx=address -zmqpubhashtxlock=address - -zmqpubhashblock=address - -zmqpubrawblock=address - -zmqpubrawtx=address - -zmqpubrawtxlock=address -zmqpubhashgovernancevote=address -zmqpubhashgovernanceobject=address - -zmqpubrawgovernancevote=address - -zmqpubhashgovernanceobject=address - -zmqpubrawinstantsenddoublespend=address -zmqpubhashinstantsenddoublespend=address + -zmqpubrawblock=address + -zmqpubrawchainlock=address + -zmqpubrawtx=address + -zmqpubrawtxlock=address + -zmqpubrawgovernancevote=address + -zmqpubrawgovernanceobject=address + -zmqpubrawinstantsenddoublespend=address The socket type is PUB and the address must be a valid ZeroMQ socket address. The same address can be used in more than one notification. diff --git a/src/zmq/zmqabstractnotifier.cpp b/src/zmq/zmqabstractnotifier.cpp index 6e280eefea..72aa19ecbb 100644 --- a/src/zmq/zmqabstractnotifier.cpp +++ b/src/zmq/zmqabstractnotifier.cpp @@ -16,6 +16,11 @@ bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/) return true; } +bool CZMQAbstractNotifier::NotifyChainLock(const CBlockIndex * /*CBlockIndex*/) +{ + return true; +} + bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/) { return true; diff --git a/src/zmq/zmqabstractnotifier.h b/src/zmq/zmqabstractnotifier.h index 6ce9d12531..029be2a161 100644 --- a/src/zmq/zmqabstractnotifier.h +++ b/src/zmq/zmqabstractnotifier.h @@ -35,6 +35,7 @@ public: virtual void Shutdown() = 0; virtual bool NotifyBlock(const CBlockIndex *pindex); + virtual bool NotifyChainLock(const CBlockIndex *pindex); virtual bool NotifyTransaction(const CTransaction &transaction); virtual bool NotifyTransactionLock(const CTransaction &transaction); virtual bool NotifyGovernanceVote(const CGovernanceVote &vote); diff --git a/src/zmq/zmqnotificationinterface.cpp b/src/zmq/zmqnotificationinterface.cpp index 8a15fd1a9e..2641d98e00 100644 --- a/src/zmq/zmqnotificationinterface.cpp +++ b/src/zmq/zmqnotificationinterface.cpp @@ -36,12 +36,14 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create() std::list notifiers; factories["pubhashblock"] = CZMQAbstractNotifier::Create; + factories["pubhashchainlock"] = CZMQAbstractNotifier::Create; factories["pubhashtx"] = CZMQAbstractNotifier::Create; factories["pubhashtxlock"] = CZMQAbstractNotifier::Create; factories["pubhashgovernancevote"] = CZMQAbstractNotifier::Create; factories["pubhashgovernanceobject"] = CZMQAbstractNotifier::Create; factories["pubhashinstantsenddoublespend"] = CZMQAbstractNotifier::Create; factories["pubrawblock"] = CZMQAbstractNotifier::Create; + factories["pubrawchainlock"] = CZMQAbstractNotifier::Create; factories["pubrawtx"] = CZMQAbstractNotifier::Create; factories["pubrawtxlock"] = CZMQAbstractNotifier::Create; factories["pubrawgovernancevote"] = CZMQAbstractNotifier::Create; @@ -152,6 +154,23 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co } } +void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex) +{ + for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) + { + CZMQAbstractNotifier *notifier = *i; + if (notifier->NotifyChainLock(pindex)) + { + i++; + } + else + { + notifier->Shutdown(); + i = notifiers.erase(i); + } + } +} + void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock) { for (std::list::iterator i = notifiers.begin(); i!=notifiers.end(); ) diff --git a/src/zmq/zmqnotificationinterface.h b/src/zmq/zmqnotificationinterface.h index f552751e97..a94dd29338 100644 --- a/src/zmq/zmqnotificationinterface.h +++ b/src/zmq/zmqnotificationinterface.h @@ -26,6 +26,7 @@ protected: // CValidationInterface void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock) override; void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override; + void NotifyChainLock(const CBlockIndex *pindex) override; void NotifyTransactionLock(const CTransaction &tx) override; void NotifyGovernanceVote(const CGovernanceVote& vote) override; void NotifyGovernanceObject(const CGovernanceObject& object) override; diff --git a/src/zmq/zmqpublishnotifier.cpp b/src/zmq/zmqpublishnotifier.cpp index 9f46331c0d..f2a7b61b88 100644 --- a/src/zmq/zmqpublishnotifier.cpp +++ b/src/zmq/zmqpublishnotifier.cpp @@ -10,18 +10,20 @@ static std::multimap mapPublishNotifiers; -static const char *MSG_HASHBLOCK = "hashblock"; -static const char *MSG_HASHTX = "hashtx"; -static const char *MSG_HASHTXLOCK = "hashtxlock"; -static const char *MSG_HASHGVOTE = "hashgovernancevote"; -static const char *MSG_HASHGOBJ = "hashgovernanceobject"; -static const char *MSG_HASHISCON = "hashinstantsenddoublespend"; -static const char *MSG_RAWBLOCK = "rawblock"; -static const char *MSG_RAWTX = "rawtx"; -static const char *MSG_RAWTXLOCK = "rawtxlock"; -static const char *MSG_RAWGVOTE = "rawgovernancevote"; -static const char *MSG_RAWGOBJ = "rawgovernanceobject"; -static const char *MSG_RAWISCON = "rawinstantsenddoublespend"; +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHCHAINLOCK = "hashchainlock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_HASHTXLOCK = "hashtxlock"; +static const char *MSG_HASHGVOTE = "hashgovernancevote"; +static const char *MSG_HASHGOBJ = "hashgovernanceobject"; +static const char *MSG_HASHISCON = "hashinstantsenddoublespend"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWCHAINLOCK = "rawchainlock"; +static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_RAWTXLOCK = "rawtxlock"; +static const char *MSG_RAWGVOTE = "rawgovernancevote"; +static const char *MSG_RAWGOBJ = "rawgovernanceobject"; +static const char *MSG_RAWISCON = "rawinstantsenddoublespend"; // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) @@ -159,6 +161,16 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) return SendMessage(MSG_HASHBLOCK, data, 32); } +bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex) +{ + uint256 hash = pindex->GetBlockHash(); + LogPrint("zmq", "zmq: Publish hashchainlock %s\n", hash.GetHex()); + char data[32]; + for (unsigned int i = 0; i < 32; i++) + data[31 - i] = hash.begin()[i]; + return SendMessage(MSG_HASHCHAINLOCK, data, 32); +} + bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); @@ -234,6 +246,27 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } +bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex) +{ + LogPrint("zmq", "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex()); + + const Consensus::Params& consensusParams = Params().GetConsensus(); + CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); + { + LOCK(cs_main); + CBlock block; + if(!ReadBlockFromDisk(block, pindex, consensusParams)) + { + zmqError("Can't read block from disk"); + return false; + } + + ss << block; + } + + return SendMessage(MSG_RAWCHAINLOCK, &(*ss.begin()), ss.size()); +} + bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) { uint256 hash = transaction.GetHash(); diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index e51b6a3145..eaf7e2d592 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -36,6 +36,12 @@ public: bool NotifyBlock(const CBlockIndex *pindex) override; }; +class CZMQPublishHashChainLockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyChainLock(const CBlockIndex *pindex) override; +}; + class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier { public: @@ -72,6 +78,12 @@ public: bool NotifyBlock(const CBlockIndex *pindex) override; }; +class CZMQPublishRawChainLockNotifier : public CZMQAbstractPublishNotifier +{ +public: + bool NotifyChainLock(const CBlockIndex *pindex) override; +}; + class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier { public: