mirror of
https://github.com/dashpay/dash.git
synced 2024-12-28 13:32:47 +01:00
Generalize TX request code
This commit is contained in:
parent
8c11a8e698
commit
29d3b75f28
@ -315,18 +315,20 @@ struct CNodeState {
|
|||||||
* that if somehow the transaction is not accepted but also not added to
|
* that if somehow the transaction is not accepted but also not added to
|
||||||
* the reject filter, then we will eventually redownload from other
|
* the reject filter, then we will eventually redownload from other
|
||||||
* peers.
|
* peers.
|
||||||
|
*
|
||||||
|
* DASH: For Dash, this does not only handles TXs but also all Dash specific objects
|
||||||
*/
|
*/
|
||||||
struct TxDownloadState {
|
struct TxDownloadState {
|
||||||
/* Track when to attempt download of announced transactions (process
|
/* Track when to attempt download of announced transactions (process
|
||||||
* time in micros -> txid)
|
* time in micros -> txid)
|
||||||
*/
|
*/
|
||||||
std::multimap<int64_t, uint256> m_tx_process_time;
|
std::multimap<int64_t, CInv> m_tx_process_time;
|
||||||
|
|
||||||
//! Store all the transactions a peer has recently announced
|
//! Store all the transactions a peer has recently announced
|
||||||
std::set<uint256> m_tx_announced;
|
std::set<CInv> m_tx_announced;
|
||||||
|
|
||||||
//! Store transactions which were requested by us, with timestamp
|
//! Store transactions which were requested by us, with timestamp
|
||||||
std::map<uint256, int64_t> m_tx_in_flight;
|
std::map<CInv, int64_t> m_tx_in_flight;
|
||||||
|
|
||||||
//! Periodically check for stuck getdata requests
|
//! Periodically check for stuck getdata requests
|
||||||
int64_t m_check_expiry_timer{0};
|
int64_t m_check_expiry_timer{0};
|
||||||
@ -654,35 +656,40 @@ void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vector<con
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} // namespace
|
||||||
|
|
||||||
void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
void EraseObjectRequest(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
g_already_asked_for.erase(txid);
|
AssertLockHeld(cs_main);
|
||||||
|
g_already_asked_for.erase(hash);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
int64_t GetObjectRequestTime(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
auto it = g_already_asked_for.find(txid);
|
AssertLockHeld(cs_main);
|
||||||
|
auto it = g_already_asked_for.find(hash);
|
||||||
if (it != g_already_asked_for.end()) {
|
if (it != g_already_asked_for.end()) {
|
||||||
return it->second;
|
return it->second;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
void UpdateObjectRequestTime(const uint256& hash, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
auto it = g_already_asked_for.find(txid);
|
AssertLockHeld(cs_main);
|
||||||
|
auto it = g_already_asked_for.find(hash);
|
||||||
if (it == g_already_asked_for.end()) {
|
if (it == g_already_asked_for.end()) {
|
||||||
g_already_asked_for.insert(std::make_pair(txid, request_time));
|
g_already_asked_for.insert(std::make_pair(hash, request_time));
|
||||||
} else {
|
} else {
|
||||||
g_already_asked_for.update(it, request_time);
|
g_already_asked_for.update(it, request_time);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
int64_t CalculateObjectGetDataTime(const uint256& hash, int64_t current_time, bool use_inbound_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
|
AssertLockHeld(cs_main);
|
||||||
int64_t process_time;
|
int64_t process_time;
|
||||||
int64_t last_request_time = GetTxRequestTime(txid);
|
int64_t last_request_time = GetObjectRequestTime(hash);
|
||||||
// First time requesting this tx
|
// First time requesting this tx
|
||||||
if (last_request_time == 0) {
|
if (last_request_time == 0) {
|
||||||
process_time = current_time;
|
process_time = current_time;
|
||||||
@ -698,26 +705,35 @@ int64_t CalculateTxGetDataTime(const uint256& txid, int64_t current_time, bool u
|
|||||||
return process_time;
|
return process_time;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
void RequestObject(CNodeState* state, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
{
|
{
|
||||||
|
AssertLockHeld(cs_main);
|
||||||
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
|
||||||
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
||||||
peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS ||
|
||||||
peer_download_state.m_tx_announced.count(txid)) {
|
peer_download_state.m_tx_announced.count(inv)) {
|
||||||
// Too many queued announcements from this peer, or we already have
|
// Too many queued announcements from this peer, or we already have
|
||||||
// this announcement
|
// this announcement
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
peer_download_state.m_tx_announced.insert(txid);
|
peer_download_state.m_tx_announced.insert(inv);
|
||||||
|
|
||||||
// Calculate the time to try requesting this transaction. Use
|
// Calculate the time to try requesting this transaction. Use
|
||||||
// fPreferredDownload as a proxy for outbound peers.
|
// fPreferredDownload as a proxy for outbound peers.
|
||||||
int64_t process_time = CalculateTxGetDataTime(txid, nNow, !state->fPreferredDownload);
|
int64_t process_time = CalculateObjectGetDataTime(inv.hash, nNow, !state->fPreferredDownload);
|
||||||
|
|
||||||
peer_download_state.m_tx_process_time.emplace(process_time, txid);
|
peer_download_state.m_tx_process_time.emplace(process_time, inv);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
|
||||||
|
{
|
||||||
|
AssertLockHeld(cs_main);
|
||||||
|
auto* state = State(nodeId);
|
||||||
|
if (!state) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
RequestObject(state, inv, nNow);
|
||||||
|
}
|
||||||
|
|
||||||
// This function is used for testing the stale tip eviction logic, see
|
// This function is used for testing the stale tip eviction logic, see
|
||||||
// DoS_tests.cpp
|
// DoS_tests.cpp
|
||||||
@ -2411,7 +2427,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||||||
doubleRequestDelay = 10 * 1000000;
|
doubleRequestDelay = 10 * 1000000;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
RequestTx(State(pfrom->GetId()), inv.hash, nNow);
|
RequestObject(State(pfrom->GetId()), inv, nNow);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2697,9 +2713,9 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||||||
CValidationState state;
|
CValidationState state;
|
||||||
|
|
||||||
CNodeState* nodestate = State(pfrom->GetId());
|
CNodeState* nodestate = State(pfrom->GetId());
|
||||||
nodestate->m_tx_download.m_tx_announced.erase(inv.hash);
|
nodestate->m_tx_download.m_tx_announced.erase(inv);
|
||||||
nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash);
|
nodestate->m_tx_download.m_tx_in_flight.erase(inv);
|
||||||
EraseTxRequest(inv.hash);
|
EraseObjectRequest(inv.hash);
|
||||||
|
|
||||||
if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */,
|
if (!AlreadyHave(inv) && AcceptToMemoryPool(mempool, state, ptx, &fMissingInputs /* pfMissingInputs */,
|
||||||
false /* bypass_limits */, 0 /* nAbsurdFee */)) {
|
false /* bypass_limits */, 0 /* nAbsurdFee */)) {
|
||||||
@ -2747,11 +2763,11 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||||||
for (const CTxIn& txin : tx.vin) {
|
for (const CTxIn& txin : tx.vin) {
|
||||||
CInv _inv(MSG_TX, txin.prevout.hash);
|
CInv _inv(MSG_TX, txin.prevout.hash);
|
||||||
pfrom->AddInventoryKnown(_inv);
|
pfrom->AddInventoryKnown(_inv);
|
||||||
if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow);
|
if (!AlreadyHave(_inv)) RequestObject(State(pfrom->GetId()), _inv, nNow);
|
||||||
// We don't know if the previous tx was a regular or a mixing one, try both
|
// We don't know if the previous tx was a regular or a mixing one, try both
|
||||||
CInv _inv2(MSG_DSTX, txin.prevout.hash);
|
CInv _inv2(MSG_DSTX, txin.prevout.hash);
|
||||||
pfrom->AddInventoryKnown(_inv2);
|
pfrom->AddInventoryKnown(_inv2);
|
||||||
if (!AlreadyHave(_inv2)) RequestTx(State(pfrom->GetId()), _inv2.hash, nNow);
|
if (!AlreadyHave(_inv2)) RequestObject(State(pfrom->GetId()), _inv2, nNow);
|
||||||
}
|
}
|
||||||
AddOrphanTx(ptx, pfrom->GetId());
|
AddOrphanTx(ptx, pfrom->GetId());
|
||||||
|
|
||||||
@ -3367,17 +3383,17 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
|
|||||||
vRecv >> vInv;
|
vRecv >> vInv;
|
||||||
if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) {
|
||||||
for (CInv &inv : vInv) {
|
for (CInv &inv : vInv) {
|
||||||
if (inv.type == MSG_TX) {
|
if (inv.IsKnownType()) {
|
||||||
// If we receive a NOTFOUND message for a txid we requested, erase
|
// If we receive a NOTFOUND message for a txid we requested, erase
|
||||||
// it from our data structures for this peer.
|
// it from our data structures for this peer.
|
||||||
auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash);
|
auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv);
|
||||||
if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
|
if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) {
|
||||||
// Skip any further work if this is a spurious NOTFOUND
|
// Skip any further work if this is a spurious NOTFOUND
|
||||||
// message.
|
// message.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
|
state->m_tx_download.m_tx_in_flight.erase(in_flight_it);
|
||||||
state->m_tx_download.m_tx_announced.erase(inv.hash);
|
state->m_tx_download.m_tx_announced.erase(inv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4248,17 +4264,17 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
|
|||||||
state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL);
|
state.m_tx_download.m_check_expiry_timer = nNow + TX_EXPIRY_INTERVAL/2 + GetRand(TX_EXPIRY_INTERVAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DASH this code also handles non-TXs (Dash specific messages)
|
||||||
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
|
auto& tx_process_time = state.m_tx_download.m_tx_process_time;
|
||||||
while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
|
||||||
const uint256 txid = tx_process_time.begin()->second;
|
const CInv inv = tx_process_time.begin()->second;
|
||||||
// Erase this entry from tx_process_time (it may be added back for
|
// Erase this entry from tx_process_time (it may be added back for
|
||||||
// processing at a later time, see below)
|
// processing at a later time, see below)
|
||||||
tx_process_time.erase(tx_process_time.begin());
|
tx_process_time.erase(tx_process_time.begin());
|
||||||
CInv inv(MSG_TX, txid);
|
|
||||||
if (!AlreadyHave(inv)) {
|
if (!AlreadyHave(inv)) {
|
||||||
// If this transaction was last requested more than 1 minute ago,
|
// If this transaction was last requested more than 1 minute ago,
|
||||||
// then request.
|
// then request.
|
||||||
int64_t last_request_time = GetTxRequestTime(inv.hash);
|
int64_t last_request_time = GetObjectRequestTime(inv.hash);
|
||||||
if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
|
if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
|
||||||
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
|
LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
|
||||||
vGetData.push_back(inv);
|
vGetData.push_back(inv);
|
||||||
@ -4266,20 +4282,20 @@ bool PeerLogicValidation::SendMessages(CNode* pto, std::atomic<bool>& interruptM
|
|||||||
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
|
||||||
vGetData.clear();
|
vGetData.clear();
|
||||||
}
|
}
|
||||||
UpdateTxRequestTime(inv.hash, nNow);
|
UpdateObjectRequestTime(inv.hash, nNow);
|
||||||
state.m_tx_download.m_tx_in_flight.emplace(inv.hash, nNow);
|
state.m_tx_download.m_tx_in_flight.emplace(inv, nNow);
|
||||||
} else {
|
} else {
|
||||||
// This transaction is in flight from someone else; queue
|
// This transaction is in flight from someone else; queue
|
||||||
// up processing to happen after the download times out
|
// up processing to happen after the download times out
|
||||||
// (with a slight delay for inbound peers, to prefer
|
// (with a slight delay for inbound peers, to prefer
|
||||||
// requests to outbound peers).
|
// requests to outbound peers).
|
||||||
int64_t next_process_time = CalculateTxGetDataTime(txid, nNow, !state.fPreferredDownload);
|
int64_t next_process_time = CalculateObjectGetDataTime(txid, nNow, !state.fPreferredDownload);
|
||||||
tx_process_time.emplace(next_process_time, txid);
|
tx_process_time.emplace(next_process_time, txid);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// We have already seen this transaction, no need to download.
|
// We have already seen this transaction, no need to download.
|
||||||
state.m_tx_download.m_tx_announced.erase(inv.hash);
|
state.m_tx_download.m_tx_announced.erase(inv);
|
||||||
state.m_tx_download.m_tx_in_flight.erase(inv.hash);
|
state.m_tx_download.m_tx_in_flight.erase(inv);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -84,4 +84,7 @@ bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats);
|
|||||||
void Misbehaving(NodeId nodeid, int howmuch, const std::string& message="");
|
void Misbehaving(NodeId nodeid, int howmuch, const std::string& message="");
|
||||||
bool IsBanned(NodeId nodeid);
|
bool IsBanned(NodeId nodeid);
|
||||||
|
|
||||||
|
void EraseObjectRequest(const uint256& hash);
|
||||||
|
void RequestObject(NodeId nodeId, const CInv& inv, int64_t nNow);
|
||||||
|
|
||||||
#endif // BITCOIN_NET_PROCESSING_H
|
#endif // BITCOIN_NET_PROCESSING_H
|
||||||
|
Loading…
Reference in New Issue
Block a user