[ZMQ] append a message sequence number to every ZMQ notification (#1082)

* Zmq sequence (#1)

* Fixes ZMQ startup with bad arguments.

pr 7621

* [ZMQ] append a message sequence number to every ZMQ notification

- pr 7762
- contrib/zmq/zmq_sub.py to python 3 compatible

* typo in MSG_RAWTXLOCK

MMSG_RAWTXLOCK to MSG_RAWTXLOCK

* s/Bitcoind/dashd/
This commit is contained in:
CHAE-PIL LIM 2016-10-18 05:09:21 +09:00 committed by UdjinM6
parent 7a45c9308a
commit 9db6b97979
6 changed files with 83 additions and 33 deletions

View File

@ -1,45 +1,51 @@
#!/usr/bin/env python2 #!/usr/bin/env python
import array import array
import binascii import binascii
import zmq import zmq
import struct
port = 28332 port = 28332
zmqContext = zmq.Context() zmqContext = zmq.Context()
zmqSubSocket = zmqContext.socket(zmq.SUB) zmqSubSocket = zmqContext.socket(zmq.SUB)
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashblock") zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtx") zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "hashtxlock") zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"hashtxlock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawblock") zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawblock")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtx") zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtx")
zmqSubSocket.setsockopt(zmq.SUBSCRIBE, "rawtxlock") zmqSubSocket.setsockopt(zmq.SUBSCRIBE, b"rawtxlock")
zmqSubSocket.connect("tcp://127.0.0.1:%i" % port) zmqSubSocket.connect("tcp://127.0.0.1:%i" % port)
try: try:
while True: while True:
msg = zmqSubSocket.recv_multipart() msg = zmqSubSocket.recv_multipart()
topic = str(msg[0]) topic = str(msg[0].decode("utf-8"))
body = msg[1] body = msg[1]
sequence = "Unknown";
if len(msg[-1]) == 4:
msgSequence = struct.unpack('<I', msg[-1])[-1]
sequence = str(msgSequence)
if topic == "hashblock": if topic == "hashblock":
print "- HASH BLOCK -" print('- HASH BLOCK ('+sequence+') -')
print binascii.hexlify(body) print(binascii.hexlify(body).decode("utf-8"))
elif topic == "hashtx": elif topic == "hashtx":
print '- HASH TX -' print ('- HASH TX ('+sequence+') -')
print binascii.hexlify(body) print(binascii.hexlify(body).decode("utf-8"))
elif topic == "hashtxlock": elif topic == "hashtxlock":
print '- HASH TX LOCK -' print('- HASH TX LOCK ('+sequence+') -')
print binascii.hexlify(body) print(binascii.hexlify(body).decode("utf-8"))
elif topic == "rawblock": elif topic == "rawblock":
print "- RAW BLOCK HEADER -" print('- RAW BLOCK HEADER ('+sequence+') -')
print binascii.hexlify(body[:80]) print(binascii.hexlify(body[:80]).decode("utf-8"))
elif topic == "rawtx": elif topic == "rawtx":
print '- RAW TX -' print('- RAW TX ('+sequence+') -')
print binascii.hexlify(body) print(binascii.hexlify(body).decode("utf-8"))
elif topic == "rawtxlock": elif topic == "rawtxlock":
print '- RAW TX LOCK -' print('- RAW TX LOCK ('+sequence+') -')
print binascii.hexlify(body) print(binascii.hexlify(body).decode("utf-8"))
except KeyboardInterrupt: except KeyboardInterrupt:
zmqContext.destroy() zmqContext.destroy()

View File

@ -58,6 +58,15 @@ The following outputs are affected by this change:
- REST `/rest/block/` (JSON format when including extended tx details) - REST `/rest/block/` (JSON format when including extended tx details)
- `bitcoin-tx -json` - `bitcoin-tx -json`
### ZMQ
Each ZMQ notification now contains an up-counting sequence number that allows
listeners to detect lost notifications.
The sequence number is always the last element in a multi-part ZMQ notification and
therefore backward compatible.
Each message type has its own counter.
(https://github.com/bitcoin/bitcoin/pull/7762)
### Configuration and command-line options ### Configuration and command-line options
### Block and transaction handling ### Block and transaction handling

View File

@ -101,3 +101,8 @@ using other means such as firewalling.
Note that when the block chain tip changes, a reorganisation may occur Note that when the block chain tip changes, a reorganisation may occur
and just the tip will be notified. It is up to the subscriber to and just the tip will be notified. It is up to the subscriber to
retrieve the chain from the last known block to the new tip. retrieve the chain from the last known block to the new tip.
There are several possibilities that ZMQ notification can get lost
during transmission depending on the communication type your are
using. Dashd appends an up-counting sequence number to each
notification which allows listeners to detect lost notifications.

View File

@ -102,7 +102,6 @@ bool CZMQNotificationInterface::Initialize()
if (i!=notifiers.end()) if (i!=notifiers.end())
{ {
Shutdown();
return false; return false;
} }

View File

@ -9,6 +9,13 @@
static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers; static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
static const char *MSG_HASHBLOCK = "hashblock";
static const char *MSG_HASHTX = "hashtx";
static const char *MSG_HASHTXLOCK = "hashtxlock";
static const char *MSG_RAWBLOCK = "rawblock";
static const char *MSG_RAWTX = "rawtx";
static const char *MSG_RAWTXLOCK = "rawtxlock";
// Internal function to send multipart message // Internal function to send multipart message
static int zmq_send_multipart(void *sock, const void* data, size_t size, ...) static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
{ {
@ -69,6 +76,7 @@ bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
if (rc!=0) if (rc!=0)
{ {
zmqError("Failed to bind address"); zmqError("Failed to bind address");
zmq_close(psocket);
return false; return false;
} }
@ -117,6 +125,23 @@ void CZMQAbstractPublishNotifier::Shutdown()
psocket = 0; psocket = 0;
} }
bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
{
assert(psocket);
/* send three parts, command & data & a LE 4byte sequence number */
unsigned char msgseq[sizeof(uint32_t)];
WriteLE32(&msgseq[0], nSequence);
int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
if (rc == -1)
return false;
/* increment memory only sequence number after sending */
nSequence++;
return true;
}
bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
{ {
uint256 hash = pindex->GetBlockHash(); uint256 hash = pindex->GetBlockHash();
@ -124,8 +149,7 @@ bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
char data[32]; char data[32];
for (unsigned int i = 0; i < 32; i++) for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i]; data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashblock", 9, data, 32, 0); return SendMessage(MSG_HASHBLOCK, data, 32);
return rc == 0;
} }
bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction) bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@ -135,8 +159,7 @@ bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &t
char data[32]; char data[32];
for (unsigned int i = 0; i < 32; i++) for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i]; data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashtx", 6, data, 32, 0); return SendMessage(MSG_HASHTX, data, 32);
return rc == 0;
} }
bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction) bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction)
@ -146,8 +169,7 @@ bool CZMQPublishHashTransactionLockNotifier::NotifyTransactionLock(const CTransa
char data[32]; char data[32];
for (unsigned int i = 0; i < 32; i++) for (unsigned int i = 0; i < 32; i++)
data[31 - i] = hash.begin()[i]; data[31 - i] = hash.begin()[i];
int rc = zmq_send_multipart(psocket, "hashtxlock", 10, data, 32, 0); return SendMessage(MSG_HASHTXLOCK, data, 32);
return rc == 0;
} }
bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex) bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
@ -168,8 +190,7 @@ bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
ss << block; ss << block;
} }
int rc = zmq_send_multipart(psocket, "rawblock", 8, &(*ss.begin()), ss.size(), 0); return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
return rc == 0;
} }
bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction) bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
@ -178,8 +199,7 @@ bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &tr
LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex()); LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction; ss << transaction;
int rc = zmq_send_multipart(psocket, "rawtx", 5, &(*ss.begin()), ss.size(), 0); return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());
return rc == 0;
} }
bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction) bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransaction &transaction)
@ -188,6 +208,5 @@ bool CZMQPublishRawTransactionLockNotifier::NotifyTransactionLock(const CTransac
LogPrint("zmq", "zmq: Publish rawtxlock %s\n", hash.GetHex()); LogPrint("zmq", "zmq: Publish rawtxlock %s\n", hash.GetHex());
CDataStream ss(SER_NETWORK, PROTOCOL_VERSION); CDataStream ss(SER_NETWORK, PROTOCOL_VERSION);
ss << transaction; ss << transaction;
int rc = zmq_send_multipart(psocket, "rawtxlock", 9, &(*ss.begin()), ss.size(), 0); return SendMessage(MSG_RAWTXLOCK, &(*ss.begin()), ss.size());
return rc == 0;
} }

View File

@ -11,7 +11,19 @@ class CBlockIndex;
class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier class CZMQAbstractPublishNotifier : public CZMQAbstractNotifier
{ {
private:
uint32_t nSequence; // upcounting per message sequence number
public: public:
/* send zmq multipart message
parts:
* command
* data
* message sequence number
*/
bool SendMessage(const char *command, const void* data, size_t size);
bool Initialize(void *pcontext); bool Initialize(void *pcontext);
void Shutdown(); void Shutdown();
}; };