Implement zmq notifications for chainlocked blocks (#2899)

* Unify zmq message order

* Implement `zmqpubhashchainlock` and `zmqpubrawchainlock`
This commit is contained in:
UdjinM6 2019-05-08 12:12:54 +03:00 committed by GitHub
parent 66a2cdeafc
commit 89f6f75910
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 136 additions and 47 deletions

View File

@ -43,12 +43,14 @@ class ZMQHandler():
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
@ -69,38 +71,44 @@ class ZMQHandler():
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashchainlock":
print('- HASH CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtx":
print ('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtxlock":
print('- HASH TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawchainlock":
print('- RAW CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernancevote":
print('- RAW GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernanceobject":
print('- RAW GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

View File

@ -47,12 +47,14 @@ class ZMQHandler():
self.zmqSubSocket = self.zmqContext.socket(zmq.SUB)
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernancevote")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashgovernanceobject")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "hashinstantsenddoublespend")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawblock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawchainlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtx")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawtxlock")
self.zmqSubSocket.setsockopt_string(zmq.SUBSCRIBE, "rawgovernancevote")
@ -72,38 +74,44 @@ class ZMQHandler():
if topic == b"hashblock":
print('- HASH BLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashchainlock":
print('- HASH CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtx":
print ('- HASH TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashtxlock":
print('- HASH TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernancevote":
print('- HASH GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashgovernanceobject":
print('- HASH GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawblock":
print('- RAW BLOCK HEADER ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawchainlock":
print('- RAW CHAINLOCK ('+sequence+') -')
print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == b"rawtx":
print('- RAW TX ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawtxlock":
print('- RAW TX LOCK ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernancevote":
print('- RAW GOVERNANCE VOTE ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"rawgovernanceobject":
print('- RAW GOVERNANCE OBJECT ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
elif topic == b"hashinstantsenddoublespend":
print('- HASH IS DOUBLE SPEND ('+sequence+') -')
elif topic == b"rawinstantsenddoublespend":
print('- RAW IS DOUBLE SPEND ('+sequence+') -')
print(binascii.hexlify(body).decode("utf-8"))
# schedule ourselves to receive the next message
asyncio.ensure_future(self.handle())

View File

@ -56,18 +56,20 @@ the command line or in the configuration file.
Currently, the following notifications are supported:
-zmqpubhashblock=address
-zmqpubhashchainlock=address
-zmqpubhashtx=address
-zmqpubhashtxlock=address
-zmqpubhashblock=address
-zmqpubrawblock=address
-zmqpubrawtx=address
-zmqpubrawtxlock=address
-zmqpubhashgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubrawgovernancevote=address
-zmqpubhashgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
-zmqpubhashinstantsenddoublespend=address
-zmqpubrawblock=address
-zmqpubrawchainlock=address
-zmqpubrawtx=address
-zmqpubrawtxlock=address
-zmqpubrawgovernancevote=address
-zmqpubrawgovernanceobject=address
-zmqpubrawinstantsenddoublespend=address
The socket type is PUB and the address must be a valid ZeroMQ socket
address. The same address can be used in more than one notification.

View File

@ -16,6 +16,11 @@ bool CZMQAbstractNotifier::NotifyBlock(const CBlockIndex * /*CBlockIndex*/)
return true;
}
bool CZMQAbstractNotifier::NotifyChainLock(const CBlockIndex * /*CBlockIndex*/)
{
return true;
}
bool CZMQAbstractNotifier::NotifyTransaction(const CTransaction &/*transaction*/)
{
return true;

View File

@ -35,6 +35,7 @@ public:
virtual void Shutdown() = 0;
virtual bool NotifyBlock(const CBlockIndex *pindex);
virtual bool NotifyChainLock(const CBlockIndex *pindex);
virtual bool NotifyTransaction(const CTransaction &transaction);
virtual bool NotifyTransactionLock(const CTransaction &transaction);
virtual bool NotifyGovernanceVote(const CGovernanceVote &vote);

View File

@ -36,12 +36,14 @@ CZMQNotificationInterface* CZMQNotificationInterface::Create()
std::list<CZMQAbstractNotifier*> notifiers;
factories["pubhashblock"] = CZMQAbstractNotifier::Create<CZMQPublishHashBlockNotifier>;
factories["pubhashchainlock"] = CZMQAbstractNotifier::Create<CZMQPublishHashChainLockNotifier>;
factories["pubhashtx"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionNotifier>;
factories["pubhashtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishHashTransactionLockNotifier>;
factories["pubhashgovernancevote"] = CZMQAbstractNotifier::Create<CZMQPublishHashGovernanceVoteNotifier>;
factories["pubhashgovernanceobject"] = CZMQAbstractNotifier::Create<CZMQPublishHashGovernanceObjectNotifier>;
factories["pubhashinstantsenddoublespend"] = CZMQAbstractNotifier::Create<CZMQPublishHashInstantSendDoubleSpendNotifier>;
factories["pubrawblock"] = CZMQAbstractNotifier::Create<CZMQPublishRawBlockNotifier>;
factories["pubrawchainlock"] = CZMQAbstractNotifier::Create<CZMQPublishRawChainLockNotifier>;
factories["pubrawtx"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionNotifier>;
factories["pubrawtxlock"] = CZMQAbstractNotifier::Create<CZMQPublishRawTransactionLockNotifier>;
factories["pubrawgovernancevote"] = CZMQAbstractNotifier::Create<CZMQPublishRawGovernanceVoteNotifier>;
@ -152,6 +154,23 @@ void CZMQNotificationInterface::UpdatedBlockTip(const CBlockIndex *pindexNew, co
}
}
void CZMQNotificationInterface::NotifyChainLock(const CBlockIndex *pindex)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )
{
CZMQAbstractNotifier *notifier = *i;
if (notifier->NotifyChainLock(pindex))
{
i++;
}
else
{
notifier->Shutdown();
i = notifiers.erase(i);
}
}
}
void CZMQNotificationInterface::SyncTransaction(const CTransaction& tx, const CBlockIndex* pindex, int posInBlock)
{
for (std::list<CZMQAbstractNotifier*>::iterator i = notifiers.begin(); i!=notifiers.end(); )

View File

@ -26,6 +26,7 @@ protected:
// CValidationInterface
void SyncTransaction(const CTransaction& tx, const CBlockIndex *pindex, int posInBlock) override;
void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
void NotifyChainLock(const CBlockIndex *pindex) override;
void NotifyTransactionLock(const CTransaction &tx) override;
void NotifyGovernanceVote(const CGovernanceVote& vote) override;
void NotifyGovernanceObject(const CGovernanceObject& object) override;

View File

@ -10,18 +10,20 @@
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_HASHGVOTE = "hashgovernancevote";
static const char *MSG_HASHGOBJ = "hashgovernanceobject";
static const char *MSG_HASHISCON = "hashinstantsenddoublespend";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";
static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend";
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHCHAINLOCK = "hashchainlock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_HASHGVOTE = "hashgovernancevote";
static const char *MSG_HASHGOBJ = "hashgovernanceobject";
static const char *MSG_HASHISCON = "hashinstantsenddoublespend";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWCHAINLOCK = "rawchainlock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";
static const char *MSG_RAWGVOTE = "rawgovernancevote";
static const char *MSG_RAWGOBJ = "rawgovernanceobject";
static const char *MSG_RAWISCON = "rawinstantsenddoublespend";
// Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
@ -159,6 +161,16 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
return SendMessage(MSG_HASHBLOCK, data, 32);
}
bool CZMQPublishHashChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex)
{
uint256 hash = pindex->GetBlockHash();
LogPrint("zmq", "zmq: Publish hashchainlock %s\n", hash.GetHex());
char data[32];
for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i];
return SendMessage(MSG_HASHCHAINLOCK, data, 32);
}
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();
@ -234,6 +246,27 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
}
bool CZMQPublishRawChainLockNotifier::NotifyChainLock(const CBlockIndex *pindex)
{
LogPrint("zmq", "zmq: Publish rawchainlock %s\n", pindex->GetBlockHash().GetHex());
const Consensus::Params& consensusParams = Params().GetConsensus();
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
{
LOCK(cs_main);
CBlock block;
if(!ReadBlockFromDisk(block, pindex, consensusParams))
{
zmqError("Can't read block from disk");
return false;
}
ss << block;
}
return SendMessage(MSG_RAWCHAINLOCK, &(*ss.begin()), ss.size());
}
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
{
uint256 hash = transaction.GetHash();

View File

@ -36,6 +36,12 @@ public:
bool NotifyBlock(const CBlockIndex *pindex) override;
};
class CZMQPublishHashChainLockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyChainLock(const CBlockIndex *pindex) override;
};
class CZMQPublishHashTransactionNotifier : public CZMQAbstractPublishNotifier
{
public:
@ -72,6 +78,12 @@ public:
bool NotifyBlock(const CBlockIndex *pindex) override;
};
class CZMQPublishRawChainLockNotifier : public CZMQAbstractPublishNotifier
{
public:
bool NotifyChainLock(const CBlockIndex *pindex) override;
};
class CZMQPublishRawTransactionNotifier : public CZMQAbstractPublishNotifier
{
public: