diff --git a/contrib/zmq/zmq_sub.py b/contrib/zmq/zmq_sub.py index 2dbe2923b..62c3d34fb 100755 --- a/contrib/zmq/zmq_sub.py +++ b/contrib/zmq/zmq_sub.py @@ -1,45 +1,51 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python import array import binascii import zmq +import struct port = 28332 zmqContext = zmq.Context() zmqSubSocket = zmqContext.socket(zmq.SUB) -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtxlock") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx") -zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtxlock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtxlock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawblock") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtx") +zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtxlock") zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) try: while True: msg = zmqSubSocket.recv_multipart() - topic = str(msg[0]) + topic = str(msg[0].decode("utf-8")) body = msg[1] + sequence = "Unknown"; + + if len(msg[-1]) == 4: + msgSequence = struct.unpack(' mapPublishNotifiers; +static const char *MSG_HASHBLOCK = "hashblock"; +static const char *MSG_HASHTX = "hashtx"; +static const char *MSG_HASHTXLOCK = "hashtxlock"; +static const char *MSG_RAWBLOCK = "rawblock"; +static const char *MSG_RAWTX = "rawtx"; +static const char *MSG_RAWTXLOCK = "rawtxlock"; + // Internal function to send multipart message static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) { @@ -69,6 +76,7 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext) if (rc!=0) { zmqError("Failed to bind address"); + zmq_close(psocket); return false; } @@ -117,6 +125,23 @@ void CZMQAbstractPublishNotifier::Shutdown() psocket = 0; } +bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size) +{ + assert(psocket); + + /* send three parts, command & data & a LE 4byte sequence number */ + unsigned char msgseq[sizeof(uint32_t)]; + WriteLE32(&msgseq[0], nSequence); + int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0); + if (rc == -1) + return false; + + /* increment memory only sequence number after sending */ + nSequence++; + + return true; +} + bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) { uint256 hash = pindex->GetBlockHash(); @@ -124,8 +149,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHBLOCK, data, 32); } bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -135,8 +159,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHTX, data, 32); } bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction) @@ -146,8 +169,7 @@ bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransa char data[32]; for (unsigned int i = 0; i < 32; i++) data[31 - i] = hash.begin()[i]; - int rc = zmq_send_multipart(psocket, "hashtxlock", 10, data, 32, 0); - return rc == 0; + return SendMessage(MSG_HASHTXLOCK, data, 32); } bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) @@ -168,8 +190,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) ss << block; } - int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) @@ -178,8 +199,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; - int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size()); } bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction) @@ -188,6 +208,5 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac LogPrint("zmq", "zmq: Publish rawtxlock %s\n", hash.GetHex()); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); ss << transaction; - int rc = zmq_send_multipart(psocket, "rawtxlock", 9, &(*ss.begin()), ss.size(), 0); - return rc == 0; + return SendMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size()); } diff --git a/src/zmq/zmqpublishnotifier.h b/src/zmq/zmqpublishnotifier.h index c92150472..c2617c0f0 100644 --- a/src/zmq/zmqpublishnotifier.h +++ b/src/zmq/zmqpublishnotifier.h @@ -11,7 +11,19 @@ class CBlockIndex; class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier { +private: + uint32_t nSequence; // upcounting per message sequence number + public: + + /* send zmq multipart message + parts: + * command + * data + * message sequence number + */ + bool SendMessage(const char *command, const void* data, size_t size); + bool Initialize(void *pcontext); void Shutdown(); };