mirror of
https://github.com/dashpay/dash.git
synced 2024-12-27 21:12:48 +01:00
transactions -> objects
+ corresponding changes in comments
This commit is contained in:
parent
a7b38efb98
commit
775e4ba823
@ -57,10 +57,10 @@
|
|||||||
# error "Dash Core cannot be compiled without assertions."
|
# error "Dash Core cannot be compiled without assertions."
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/** Maximum number of in-flight transactions from a peer */
|
/** Maximum number of in-flight objects from a peer */
|
||||||
static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
|
static constexpr int32_t MAX_PEER_OBJECT_IN_FLIGHT = 100;
|
||||||
/** Maximum number of announced transactions from a peer */
|
/** Maximum number of announced objects from a peer */
|
||||||
static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
|
static constexpr int32_t MAX_PEER_OBJECT_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
|
||||||
/** How many microseconds to delay requesting transactions from inbound peers */
|
/** How many microseconds to delay requesting transactions from inbound peers */
|
||||||
static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
|
static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}};
|
||||||
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
|
/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
|
||||||
@ -272,69 +272,67 @@ struct CNodeState {
|
|||||||
int64_t m_last_block_announcement;
|
int64_t m_last_block_announcement;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* State associated with transaction download.
|
* State associated with objects download.
|
||||||
*
|
*
|
||||||
* Tx download algorithm:
|
* Tx download algorithm:
|
||||||
*
|
*
|
||||||
* When inv comes in, queue up (process_time, txid) inside the peer's
|
* When inv comes in, queue up (process_time, inv) inside the peer's
|
||||||
* CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
|
* CNodeState (m_object_process_time) as long as m_object_announced for the peer
|
||||||
* isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
|
* isn't too big (MAX_PEER_OBJECT_ANNOUNCEMENTS).
|
||||||
*
|
*
|
||||||
* The process_time for a transaction is set to nNow for outbound peers,
|
* The process_time for a objects is set to nNow for outbound peers,
|
||||||
* nNow + 2 seconds for inbound peers. This is the time at which we'll
|
* nNow + 2 seconds for inbound peers. This is the time at which we'll
|
||||||
* consider trying to request the transaction from the peer in
|
* consider trying to request the objects from the peer in
|
||||||
* SendMessages(). The delay for inbound peers is to allow outbound peers
|
* SendMessages(). The delay for inbound peers is to allow outbound peers
|
||||||
* a chance to announce before we request from inbound peers, to prevent
|
* a chance to announce before we request from inbound peers, to prevent
|
||||||
* an adversary from using inbound connections to blind us to a
|
* an adversary from using inbound connections to blind us to a
|
||||||
* transaction (InvBlock).
|
* objects (InvBlock).
|
||||||
*
|
*
|
||||||
* When we call SendMessages() for a given peer,
|
* When we call SendMessages() for a given peer,
|
||||||
* we will loop over the transactions in m_tx_process_time, looking
|
* we will loop over the objects in m_object_process_time, looking
|
||||||
* at the transactions whose process_time <= nNow. We'll request each
|
* at the objects whose process_time <= nNow. We'll request each
|
||||||
* such transaction that we don't have already and that hasn't been
|
* such objects that we don't have already and that hasn't been
|
||||||
* requested from another peer recently, up until we hit the
|
* requested from another peer recently, up until we hit the
|
||||||
* MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
|
* MAX_PEER_OBJECT_IN_FLIGHT limit for the peer. Then we'll update
|
||||||
* g_already_asked_for for each requested txid, storing the time of the
|
* g_already_asked_for for each requested inv, storing the time of the
|
||||||
* GETDATA request. We use g_already_asked_for to coordinate transaction
|
* GETDATA request. We use g_already_asked_for to coordinate objects
|
||||||
* requests amongst our peers.
|
* requests amongst our peers.
|
||||||
*
|
*
|
||||||
* For transactions that we still need but we have already recently
|
* For objects that we still need but we have already recently
|
||||||
* requested from some other peer, we'll reinsert (process_time, txid)
|
* requested from some other peer, we'll reinsert (process_time, inv)
|
||||||
* back into the peer's m_tx_process_time at the point in the future at
|
* back into the peer's m_object_process_time at the point in the future at
|
||||||
* which the most recent GETDATA request would time out (ie
|
* which the most recent GETDATA request would time out (ie
|
||||||
* GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
|
* GetObjectInterval + the request time stored in g_already_asked_for).
|
||||||
* We add an additional delay for inbound peers, again to prefer
|
* We add an additional delay for inbound peers, again to prefer
|
||||||
* attempting download from outbound peers first.
|
* attempting download from outbound peers first.
|
||||||
* We also add an extra small random delay up to 2 seconds
|
* We also add an extra small random delay up to 2 seconds
|
||||||
* to avoid biasing some peers over others. (e.g., due to fixed ordering
|
* to avoid biasing some peers over others. (e.g., due to fixed ordering
|
||||||
* of peer processing in ThreadMessageHandler).
|
* of peer processing in ThreadMessageHandler).
|
||||||
*
|
*
|
||||||
* When we receive a transaction from a peer, we remove the txid from the
|
* When we receive a objects from a peer, we remove the inv from the
|
||||||
* peer's m_tx_in_flight set and from their recently announced set
|
* peer's m_object_in_flight set and from their recently announced set
|
||||||
* (m_tx_announced). We also clear g_already_asked_for for that entry, so
|
* (m_object_announced). We also clear g_already_asked_for for that entry, so
|
||||||
* that if somehow the transaction is not accepted but also not added to
|
* that if somehow the objects is not accepted but also not added to
|
||||||
* the reject filter, then we will eventually redownload from other
|
* the reject filter, then we will eventually redownload from other
|
||||||
* peers.
|
* peers.
|
||||||
*
|
|
||||||
* DASH: For Dash, this does not only handles TXs but also all Dash specific objects
|
|
||||||
*/
|
*/
|
||||||
struct TxDownloadState {
|
struct ObjectDownloadState {
|
||||||
/* Track when to attempt download of announced transactions (process
|
/* Track when to attempt download of announced objects (process
|
||||||
* time in micros -> txid)
|
* time in micros -> inv)
|
||||||
*/
|
*/
|
||||||
std::multimap<std::chrono::microseconds, CInv> m_tx_process_time;
|
std::multimap<std::chrono::microseconds, CInv> m_object_process_time;
|
||||||
|
|
||||||
//! Store all the transactions a peer has recently announced
|
//! Store all the objects a peer has recently announced
|
||||||
std::set<CInv> m_tx_announced;
|
std::set<CInv> m_object_announced;
|
||||||
|
|
||||||
//! Store transactions which were requested by us, with timestamp
|
//! Store objects which were requested by us, with timestamp
|
||||||
std::map<CInv, std::chrono::microseconds> m_tx_in_flight;
|
std::map<CInv, std::chrono::microseconds> m_object_in_flight;
|
||||||
|
|
||||||
//! Periodically check for stuck getdata requests
|
//! Periodically check for stuck getdata requests
|
||||||
std::chrono::microseconds m_check_expiry_timer{0};
|
std::chrono::microseconds m_check_expiry_timer{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
TxDownloadState m_tx_download;
|
ObjectDownloadState m_object_download;
|
||||||
|
|
||||||
CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
|
CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
|
||||||
fCurrentlyConnected = false;
|
fCurrentlyConnected = false;
|
||||||
@ -667,8 +665,8 @@ void EraseObjectRequest(CNodeState* nodestate, const CInv& inv) EXCLUSIVE_LOCKS_
|
|||||||
g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime<std::chrono::microseconds>()));
|
g_erased_object_requests.insert(std::make_pair(inv.hash, GetTime<std::chrono::microseconds>()));
|
||||||
|
|
||||||
if (nodestate) {
|
if (nodestate) {
|
||||||
nodestate->m_tx_download.m_tx_announced.erase(inv);
|
nodestate->m_object_download.m_object_announced.erase(inv);
|
||||||
nodestate->m_tx_download.m_tx_in_flight.erase(inv);
|
nodestate->m_object_download.m_object_in_flight.erase(inv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -755,21 +753,21 @@ std::chrono::microseconds CalculateObjectGetDataTime(const CInv& inv, std::chron
|
|||||||
void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
void RequestObject(CNodeState* state, const CInv& inv, std::chrono::microseconds current_time, bool fForce = false) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
AssertLockHeld(cs_main);
|
AssertLockHeld(cs_main);
|
||||||
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
CNodeState::ObjectDownloadState& peer_download_state = state->m_object_download;
|
||||||
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
if (peer_download_state.m_object_announced.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS ||
|
||||||
peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
peer_download_state.m_object_process_time.size() >= MAX_PEER_OBJECT_ANNOUNCEMENTS ||
|
||||||
peer_download_state.m_tx_announced.count(inv)) {
|
peer_download_state.m_object_announced.count(inv)) {
|
||||||
// Too many queued announcements from this peer, or we already have
|
// Too many queued announcements from this peer, or we already have
|
||||||
// this announcement
|
// this announcement
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
peer_download_state.m_tx_announced.insert(inv);
|
peer_download_state.m_object_announced.insert(inv);
|
||||||
|
|
||||||
// Calculate the time to try requesting this transaction. Use
|
// Calculate the time to try requesting this transaction. Use
|
||||||
// fPreferredDownload as a proxy for outbound peers.
|
// fPreferredDownload as a proxy for outbound peers.
|
||||||
std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload);
|
std::chrono::microseconds process_time = CalculateObjectGetDataTime(inv, current_time, !state->fPreferredDownload);
|
||||||
|
|
||||||
peer_download_state.m_tx_process_time.emplace(process_time, inv);
|
peer_download_state.m_object_process_time.emplace(process_time, inv);
|
||||||
|
|
||||||
if (fForce) {
|
if (fForce) {
|
||||||
// make sure this object is actually requested ASAP
|
// make sure this object is actually requested ASAP
|
||||||
@ -797,7 +795,7 @@ size_t GetRequestedObjectCount(NodeId nodeId)
|
|||||||
if (!state) {
|
if (!state) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
return state->m_tx_download.m_tx_process_time.size();
|
return state->m_object_download.m_object_process_time.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
// This function is used for testing the stale tip eviction logic, see
|
// This function is used for testing the stale tip eviction logic, see
|
||||||
@ -3430,19 +3428,19 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||||||
CNodeState *state = State(pfrom->GetId());
|
CNodeState *state = State(pfrom->GetId());
|
||||||
std::vector<CInv> vInv;
|
std::vector<CInv> vInv;
|
||||||
vRecv >> vInv;
|
vRecv >> vInv;
|
||||||
if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
if (vInv.size() <= MAX_PEER_OBJECT_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||||
for (CInv &inv : vInv) {
|
for (CInv &inv : vInv) {
|
||||||
if (inv.IsKnownType()) {
|
if (inv.IsKnownType()) {
|
||||||
// If we receive a NOTFOUND message for a txid we requested, erase
|
// If we receive a NOTFOUND message for a txid we requested, erase
|
||||||
// it from our data structures for this peer.
|
// it from our data structures for this peer.
|
||||||
auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv);
|
auto in_flight_it = state->m_object_download.m_object_in_flight.find(inv);
|
||||||
if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
|
if (in_flight_it == state->m_object_download.m_object_in_flight.end()) {
|
||||||
// Skip any further work if this is a spurious NOTFOUND
|
// Skip any further work if this is a spurious NOTFOUND
|
||||||
// message.
|
// message.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
|
state->m_object_download.m_object_in_flight.erase(in_flight_it);
|
||||||
state->m_tx_download.m_tx_announced.erase(inv);
|
state->m_object_download.m_object_announced.erase(inv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4297,40 +4295,40 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
|
|||||||
//
|
//
|
||||||
|
|
||||||
// For robustness, expire old requests after a long timeout, so that
|
// For robustness, expire old requests after a long timeout, so that
|
||||||
// we can resume downloading transactions from a peer even if they
|
// we can resume downloading objects from a peer even if they
|
||||||
// were unresponsive in the past.
|
// were unresponsive in the past.
|
||||||
// Eventually we should consider disconnecting peers, but this is
|
// Eventually we should consider disconnecting peers, but this is
|
||||||
// conservative.
|
// conservative.
|
||||||
if (state.m_tx_download.m_check_expiry_timer <= current_time) {
|
if (state.m_object_download.m_check_expiry_timer <= current_time) {
|
||||||
for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) {
|
for (auto it=state.m_object_download.m_object_in_flight.begin(); it != state.m_object_download.m_object_in_flight.end();) {
|
||||||
if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) {
|
if (it->second <= current_time - GetObjectExpiryInterval(it->first.type)) {
|
||||||
LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId());
|
LogPrint(BCLog::NET, "timeout of inflight object %s from peer=%d\n", it->first.ToString(), pto->GetId());
|
||||||
state.m_tx_download.m_tx_announced.erase(it->first);
|
state.m_object_download.m_object_announced.erase(it->first);
|
||||||
state.m_tx_download.m_tx_in_flight.erase(it++);
|
state.m_object_download.m_object_in_flight.erase(it++);
|
||||||
} else {
|
} else {
|
||||||
++it;
|
++it;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// On average, we do this check every TX_EXPIRY_INTERVAL. Randomize
|
// On average, we do this check every GetObjectExpiryInterval. Randomize
|
||||||
// so that we're not doing this for all peers at the same time.
|
// so that we're not doing this for all peers at the same time.
|
||||||
state.m_tx_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX));
|
state.m_object_download.m_check_expiry_timer = current_time + GetObjectExpiryInterval(MSG_TX)/2 + GetRandMicros(GetObjectExpiryInterval(MSG_TX));
|
||||||
}
|
}
|
||||||
|
|
||||||
// DASH this code also handles non-TXs (Dash specific messages)
|
// DASH this code also handles non-TXs (Dash specific messages)
|
||||||
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
|
auto& object_process_time = state.m_object_download.m_object_process_time;
|
||||||
while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
while (!object_process_time.empty() && object_process_time.begin()->first <= current_time && state.m_object_download.m_object_in_flight.size() < MAX_PEER_OBJECT_IN_FLIGHT) {
|
||||||
const CInv inv = tx_process_time.begin()->second;
|
const CInv inv = object_process_time.begin()->second;
|
||||||
// Erase this entry from tx_process_time (it may be added back for
|
// Erase this entry from object_process_time (it may be added back for
|
||||||
// processing at a later time, see below)
|
// processing at a later time, see below)
|
||||||
tx_process_time.erase(tx_process_time.begin());
|
object_process_time.erase(object_process_time.begin());
|
||||||
if (g_erased_object_requests.count(inv.hash)) {
|
if (g_erased_object_requests.count(inv.hash)) {
|
||||||
LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId());
|
LogPrint(BCLog::NET, "%s -- GETDATA skipping inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId());
|
||||||
state.m_tx_download.m_tx_announced.erase(inv);
|
state.m_object_download.m_object_announced.erase(inv);
|
||||||
state.m_tx_download.m_tx_in_flight.erase(inv);
|
state.m_object_download.m_object_in_flight.erase(inv);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (!AlreadyHave(inv)) {
|
if (!AlreadyHave(inv)) {
|
||||||
// If this transaction was last requested more than 1 minute ago,
|
// If this object was last requested more than GetObjectInterval ago,
|
||||||
// then request.
|
// then request.
|
||||||
const auto last_request_time = GetObjectRequestTime(inv.hash);
|
const auto last_request_time = GetObjectRequestTime(inv.hash);
|
||||||
if (last_request_time <= current_time - GetObjectInterval(inv.type)) {
|
if (last_request_time <= current_time - GetObjectInterval(inv.type)) {
|
||||||
@ -4341,20 +4339,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
|
|||||||
vGetData.clear();
|
vGetData.clear();
|
||||||
}
|
}
|
||||||
UpdateObjectRequestTime(inv.hash, current_time);
|
UpdateObjectRequestTime(inv.hash, current_time);
|
||||||
state.m_tx_download.m_tx_in_flight.emplace(inv, current_time);
|
state.m_object_download.m_object_in_flight.emplace(inv, current_time);
|
||||||
} else {
|
} else {
|
||||||
// This transaction is in flight from someone else; queue
|
// This object is in flight from someone else; queue
|
||||||
// up processing to happen after the download times out
|
// up processing to happen after the download times out
|
||||||
// (with a slight delay for inbound peers, to prefer
|
// (with a slight delay for inbound peers, to prefer
|
||||||
// requests to outbound peers).
|
// requests to outbound peers).
|
||||||
const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload);
|
const auto next_process_time = CalculateObjectGetDataTime(inv, current_time, !state.fPreferredDownload);
|
||||||
tx_process_time.emplace(next_process_time, inv);
|
object_process_time.emplace(next_process_time, inv);
|
||||||
LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId());
|
LogPrint(BCLog::NET, "%s -- GETDATA re-queue inv=(%s), next_process_time=%d, delta=%d, peer=%d\n", __func__, inv.ToString(), next_process_time.count(), (next_process_time - current_time).count(), pto->GetId());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// We have already seen this transaction, no need to download.
|
// We have already seen this object, no need to download.
|
||||||
state.m_tx_download.m_tx_announced.erase(inv);
|
state.m_object_download.m_object_announced.erase(inv);
|
||||||
state.m_tx_download.m_tx_in_flight.erase(inv);
|
state.m_object_download.m_object_in_flight.erase(inv);
|
||||||
LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId());
|
LogPrint(BCLog::NET, "%s -- GETDATA already seen inv=(%s), peer=%d\n", __func__, inv.ToString(), pto->GetId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user