diff --git a/src/init.cpp b/src/init.cpp index 7d2bcb57b1..7a493cc195 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -176,6 +176,8 @@ void Interrupt(boost::thread_group& threadGroup) InterruptRPC(); InterruptREST(); InterruptTorControl(); + if (g_connman) + g_connman->Interrupt(); threadGroup.interrupt_all(); } @@ -1572,7 +1574,7 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) connOptions.nMaxOutboundTimeframe = nMaxOutboundTimeframe; connOptions.nMaxOutboundLimit = nMaxOutboundLimit; - if(!connman.Start(threadGroup, scheduler, strNodeError, connOptions)) + if (!connman.Start(scheduler, strNodeError, connOptions)) return InitError(strNodeError); // ********************************************************* Step 12: finished diff --git a/src/net.cpp b/src/net.cpp index 9c58577f10..a66679cd85 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1042,7 +1042,7 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { void CConnman::ThreadSocketHandler() { unsigned int nPrevNodeCount = 0; - while (true) + while (!interruptNet) { // // Disconnect nodes @@ -1180,7 +1180,8 @@ void CConnman::ThreadSocketHandler() int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - boost::this_thread::interruption_point(); + if (interruptNet) + return; if (nSelect == SOCKET_ERROR) { @@ -1193,7 +1194,8 @@ void CConnman::ThreadSocketHandler() } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); - MilliSleep(timeout.tv_usec/1000); + if (!interruptNet.sleep_for(std::chrono::milliseconds(timeout.tv_usec/1000))) + return; } // @@ -1219,7 +1221,8 @@ void CConnman::ThreadSocketHandler() } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - boost::this_thread::interruption_point(); + if (interruptNet) + return; // // Receive @@ -1241,7 +1244,7 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); if(notify) - messageHandlerCondition.notify_one(); + condMsgProc.notify_one(); pnode->nLastRecv = GetTime(); pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); @@ -1469,7 +1472,8 @@ void CConnman::ThreadDNSAddressSeed() // less influence on the network topology, and reduces traffic to the seeds. if ((addrman.size() > 0) && (!GetBoolArg("-forcednsseed", DEFAULT_FORCEDNSSEED))) { - MilliSleep(11 * 1000); + if (!interruptNet.sleep_for(std::chrono::seconds(11))) + return; LOCK(cs_vNodes); int nRelevant = 0; @@ -1580,10 +1584,12 @@ void CConnman::ThreadOpenConnections() OpenNetworkConnection(addr, false, NULL, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } @@ -1592,14 +1598,16 @@ void CConnman::ThreadOpenConnections() // Minimum time before next feeler connection (in microseconds). int64_t nNextFeeler = PoissonNextSend(nStart*1000*1000, FEELER_INTERVAL); - while (true) + while (!interruptNet) { ProcessOneShot(); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; CSemaphoreGrant grant(*semOutbound); - boost::this_thread::interruption_point(); + if (interruptNet) + return; // Add seed nodes if DNS seeds are all down (an infrastructure attack?). if (addrman.size() == 0 && (GetTime() - nStart > 60)) { @@ -1657,7 +1665,7 @@ void CConnman::ThreadOpenConnections() int64_t nANow = GetAdjustedTime(); int nTries = 0; - while (true) + while (!interruptNet) { CAddrInfo addr = addrman.Select(fFeeler); @@ -1700,7 +1708,8 @@ void CConnman::ThreadOpenConnections() if (fFeeler) { // Add small amount of random noise before connection to avoid synchronization. int randsleep = GetRandInt(FEELER_SLEEP_WINDOW * 1000); - MilliSleep(randsleep); + if (!interruptNet.sleep_for(std::chrono::milliseconds(randsleep))) + return; LogPrint("net", "Making feeler connection to %s\n", addrConnect.ToString()); } @@ -1779,11 +1788,12 @@ void CConnman::ThreadOpenAddedConnections() // OpenNetworkConnection can detect existing connections to that IP/port. CService service(LookupNumeric(info.strAddedNode.c_str(), Params().GetDefaultPort())); OpenNetworkConnection(CAddress(service, NODE_NONE), false, &grant, info.strAddedNode.c_str(), false); - MilliSleep(500); + if (!interruptNet.sleep_for(std::chrono::milliseconds(500))) + return; } } - - MilliSleep(120000); // Retry every 2 minutes + if (!interruptNet.sleep_for(std::chrono::minutes(2))) + return; } } @@ -1793,7 +1803,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai // // Initiate outbound network connection // - boost::this_thread::interruption_point(); + if (interruptNet) { + return false; + } if (!fNetworkActive) { return false; } @@ -1819,13 +1831,9 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai return true; } - void CConnman::ThreadMessageHandler() { - boost::mutex condition_mutex; - boost::unique_lock lock(condition_mutex); - - while (true) + while (!flagInterruptMsgProc) { std::vector vNodesCopy; { @@ -1860,7 +1868,8 @@ void CConnman::ThreadMessageHandler() } } } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; // Send messages { @@ -1868,7 +1877,8 @@ void CConnman::ThreadMessageHandler() if (lockSend) GetNodeSignals().SendMessages(pnode, *this); } - boost::this_thread::interruption_point(); + if (flagInterruptMsgProc) + return; } { @@ -1877,8 +1887,10 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) - messageHandlerCondition.timed_wait(lock, boost::posix_time::microsec_clock::universal_time() + boost::posix_time::milliseconds(100)); + if (fSleep) { + std::unique_lock lock(mutexMsgProc); + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + } } } @@ -2070,6 +2082,7 @@ CConnman::CConnman(uint64_t nSeed0In, uint64_t nSeed1In) : nSeed0(nSeed0In), nSe nMaxOutbound = 0; nBestHeight = 0; clientInterface = NULL; + flagInterruptMsgProc = false; } NodeId CConnman::GetNewNodeId() @@ -2077,7 +2090,7 @@ NodeId CConnman::GetNewNodeId() return nLastNodeId.fetch_add(1, std::memory_order_relaxed); } -bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options connOptions) +bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options connOptions) { nTotalBytesRecv = 0; nTotalBytesSent = 0; @@ -2144,24 +2157,26 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st // // Start threads // + interruptNet.reset(); + flagInterruptMsgProc = false; // Send and receive from sockets, accept connections - threadGroup.create_thread(boost::bind(&TraceThread >, "net", boost::function(boost::bind(&CConnman::ThreadSocketHandler, this)))); + threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); if (!GetBoolArg("-dnsseed", true)) LogPrintf("DNS seeding disabled\n"); else - threadGroup.create_thread(boost::bind(&TraceThread >, "dnsseed", boost::function(boost::bind(&CConnman::ThreadDNSAddressSeed, this)))); + threadDNSAddressSeed = std::thread(&TraceThread >, "dnsseed", std::function(std::bind(&CConnman::ThreadDNSAddressSeed, this))); // Initiate outbound connections from -addnode - threadGroup.create_thread(boost::bind(&TraceThread >, "addcon", boost::function(boost::bind(&CConnman::ThreadOpenAddedConnections, this)))); + threadOpenAddedConnections = std::thread(&TraceThread >, "addcon", std::function(std::bind(&CConnman::ThreadOpenAddedConnections, this))); // Initiate outbound connections unless connect=0 if (!mapMultiArgs.count("-connect") || mapMultiArgs.at("-connect").size() != 1 || mapMultiArgs.at("-connect")[0] != "0") - threadGroup.create_thread(boost::bind(&TraceThread >, "opencon", boost::function(boost::bind(&CConnman::ThreadOpenConnections, this)))); + threadOpenConnections = std::thread(&TraceThread >, "opencon", std::function(std::bind(&CConnman::ThreadOpenConnections, this))); // Process messages - threadGroup.create_thread(boost::bind(&TraceThread >, "msghand", boost::function(boost::bind(&CConnman::ThreadMessageHandler, this)))); + threadMessageHandler = std::thread(&TraceThread >, "msghand", std::function(std::bind(&CConnman::ThreadMessageHandler, this))); // Dump network addresses scheduler.scheduleEvery(boost::bind(&CConnman::DumpData, this), DUMP_ADDRESSES_INTERVAL); @@ -2184,12 +2199,33 @@ public: } instance_of_cnetcleanup; -void CConnman::Stop() +void CConnman::Interrupt() { - LogPrintf("%s\n",__func__); + { + std::lock_guard lock(mutexMsgProc); + flagInterruptMsgProc = true; + } + condMsgProc.notify_all(); + + interruptNet(); + if (semOutbound) for (int i=0; i<(nMaxOutbound + nMaxFeeler); i++) semOutbound->post(); +} + +void CConnman::Stop() +{ + if (threadMessageHandler.joinable()) + threadMessageHandler.join(); + if (threadOpenConnections.joinable()) + threadOpenConnections.join(); + if (threadOpenAddedConnections.joinable()) + threadOpenAddedConnections.join(); + if (threadDNSAddressSeed.joinable()) + threadDNSAddressSeed.join(); + if (threadSocketHandler.joinable()) + threadSocketHandler.join(); if (fAddressesInitialized) { @@ -2232,6 +2268,7 @@ void CConnman::DeleteNode(CNode* pnode) CConnman::~CConnman() { + Interrupt(); Stop(); } diff --git a/src/net.h b/src/net.h index a7c0ecf324..b26f283265 100644 --- a/src/net.h +++ b/src/net.h @@ -19,11 +19,14 @@ #include "streams.h" #include "sync.h" #include "uint256.h" +#include "threadinterrupt.h" #include #include #include +#include #include +#include #ifndef WIN32 #include @@ -142,8 +145,9 @@ public: }; CConnman(uint64_t seed0, uint64_t seed1); ~CConnman(); - bool Start(boost::thread_group& threadGroup, CScheduler& scheduler, std::string& strNodeError, Options options); + bool Start(CScheduler& scheduler, std::string& strNodeError, Options options); void Stop(); + void Interrupt(); bool BindListenPort(const CService &bindAddr, std::string& strError, bool fWhitelisted = false); bool GetNetworkActive() const { return fNetworkActive; }; void SetNetworkActive(bool active); @@ -402,7 +406,6 @@ private: std::list vNodesDisconnected; mutable CCriticalSection cs_vNodes; std::atomic nLastNodeId; - boost::condition_variable messageHandlerCondition; /** Services this instance offers */ ServiceFlags nLocalServices; @@ -419,6 +422,18 @@ private: /** SipHasher seeds for deterministic randomness */ const uint64_t nSeed0, nSeed1; + + std::condition_variable condMsgProc; + std::mutex mutexMsgProc; + std::atomic flagInterruptMsgProc; + + CThreadInterrupt interruptNet; + + std::thread threadDNSAddressSeed; + std::thread threadSocketHandler; + std::thread threadOpenAddedConnections; + std::thread threadOpenConnections; + std::thread threadMessageHandler; }; extern std::unique_ptr g_connman; void Discover(boost::thread_group& threadGroup);