From fc4a736e2a0e8b9b528613db65aed07595bc6be8 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 4 Oct 2024 08:47:36 +0000 Subject: [PATCH] stats: move message sending logic to `RawSender` `RawSender` is inspired by `UDPSender` from `vthiery/cpp-statsd-client` and separating it out of `StatsdClient` is needed to implement queueing and batching support in upcoming commits. This is the start of migrating our Statsd codebase to `cpp-statsd-client`. --- src/Makefile.am | 2 + src/stats/client.cpp | 110 +++++++++++++++++----------------------- src/stats/client.h | 18 +++---- src/stats/rawsender.cpp | 74 +++++++++++++++++++++++++++ src/stats/rawsender.h | 84 ++++++++++++++++++++++++++++++ 5 files changed, 212 insertions(+), 76 deletions(-) create mode 100644 src/stats/rawsender.cpp create mode 100644 src/stats/rawsender.h diff --git a/src/Makefile.am b/src/Makefile.am index e35efcc506..1ef8711187 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -314,6 +314,7 @@ BITCOIN_CORE_H = \ stacktraces.h \ streams.h \ stats/client.h \ + stats/rawsender.h \ support/allocators/mt_pooled_secure.h \ support/allocators/pool.h \ support/allocators/pooled_secure.h \ @@ -527,6 +528,7 @@ libbitcoin_server_a_SOURCES = \ shutdown.cpp \ spork.cpp \ stats/client.cpp \ + stats/rawsender.cpp \ timedata.cpp \ torcontrol.cpp \ txdb.cpp \ diff --git a/src/stats/client.cpp b/src/stats/client.cpp index a2683e6156..f89b283eab 100644 --- a/src/stats/client.cpp +++ b/src/stats/client.cpp @@ -1,4 +1,5 @@ // Copyright (c) 2014-2017 Statoshi Developers +// Copyright (c) 2017-2023 Vincent Thiery // Copyright (c) 2020-2024 The Dash Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -35,8 +36,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include -#include -#include +#include #include #include @@ -60,10 +60,8 @@ bool StatsdClient::ShouldSend(float sample_rate) return sample_rate > std::uniform_real_distribution(0.f, 1.f)(insecure_rand); } -StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, const std::string& ns, - bool enabled) : - m_port{port}, - m_host{host}, +StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, + const std::string& ns, bool enabled) : m_nodename{nodename}, m_ns{ns} { @@ -72,31 +70,19 @@ StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, return; } - if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) { - if (!netaddr->IsIPv4()) { - LogPrintf("ERROR: Host %s on unsupported network, cannot init StatsdClient\n", m_host); - return; - } - if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast(&m_server.first), &m_server.second)) { - LogPrintf("ERROR: Cannot get socket address for %s, cannot init StatsdClient\n", m_host); - return; - } - } else { - LogPrintf("Unable to lookup host %s, cannot init StatsdClient\n", m_host); + std::optional error_opt; + m_sender = std::make_unique(host, port, error_opt); + if (error_opt.has_value()) { + LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value()); + m_sender.reset(); return; } - SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (hSocket == INVALID_SOCKET) { - LogPrintf("ERROR: Cannot create socket (socket() returned error %s), cannot init StatsdClient\n", - NetworkErrorString(WSAGetLastError())); - return; - } - m_sock = std::make_unique(hSocket); - - LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", m_host, m_port); + LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", host, port); } +StatsdClient::~StatsdClient() {} + /* will change the original string */ void StatsdClient::cleanup(std::string& key) { @@ -133,7 +119,7 @@ bool StatsdClient::timing(const std::string& key, int64_t ms, float sample_rate) bool StatsdClient::send(std::string key, int64_t value, const std::string& type, float sample_rate) { - if (!m_sock) { + if (!m_sender) { return false; } @@ -147,46 +133,42 @@ bool StatsdClient::send(std::string key, int64_t value, const std::string& type, cleanup(key); - std::string buf{strprintf("%s%s:%d|%s", m_ns, key, value, type)}; + RawMessage msg{strprintf("%s%s:%d|%s", m_ns, key, value, type)}; if (sample_rate < 1.f) { - buf += strprintf("|@%.2f", sample_rate); + msg += strprintf("|@%.2f", sample_rate); } - return send(buf); -} - -bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate) -{ - if (!m_sock) { - return false; - } - - if (!ShouldSend(sample_rate)) { - // Not our turn to send but report that we have - return true; - } - - // partition stats by node name if set - if (!m_nodename.empty()) key = key + "." + m_nodename; - - cleanup(key); - - std::string buf{strprintf("%s%s:%f|%s", m_ns, key, value, type)}; - if (sample_rate < 1.f) { - buf += strprintf("|@%.2f", sample_rate); - } - - return send(buf); -} - -bool StatsdClient::send(const std::string& message) -{ - assert(m_sock); - - if (::sendto(m_sock->Get(), message.data(), message.size(), /*flags=*/0, - reinterpret_cast(&m_server.first), m_server.second) == SOCKET_ERROR) { - LogPrintf("ERROR: Unable to send message (sendto() returned error %s), host=%s:%d\n", - NetworkErrorString(WSAGetLastError()), m_host, m_port); + if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) { + LogPrintf("ERROR: %s.\n", error_opt.value()); + return false; + } + + return true; +} + +bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate) +{ + if (!m_sender) { + return false; + } + + if (!ShouldSend(sample_rate)) { + // Not our turn to send but report that we have + return true; + } + + // partition stats by node name if set + if (!m_nodename.empty()) key = key + "." + m_nodename; + + cleanup(key); + + RawMessage msg{strprintf("%s%s:%f|%s", m_ns, key, value, type)}; + if (sample_rate < 1.f) { + msg += strprintf("|@%.2f", sample_rate); + } + + if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) { + LogPrintf("ERROR: %s.\n", error_opt.value()); return false; } diff --git a/src/stats/client.h b/src/stats/client.h index eb8a483998..32780de7fc 100644 --- a/src/stats/client.h +++ b/src/stats/client.h @@ -1,4 +1,5 @@ // Copyright (c) 2014-2017 Statoshi Developers +// Copyright (c) 2017-2023 Vincent Thiery // Copyright (c) 2020-2023 The Dash Core developers // Distributed under the MIT software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. @@ -7,12 +8,13 @@ #define BITCOIN_STATS_CLIENT_H #include -#include -#include +#include #include #include +class RawSender; + static constexpr bool DEFAULT_STATSD_ENABLE{false}; static constexpr uint16_t DEFAULT_STATSD_PORT{8125}; static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"}; @@ -29,6 +31,7 @@ class StatsdClient public: explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, const std::string& ns, bool enabled); + ~StatsdClient(); public: bool inc(const std::string& key, float sample_rate = 1.f); @@ -45,12 +48,6 @@ public: bool sendDouble(std::string key, double value, const std::string& type, float sample_rate); private: - /** - * (Low Level Api) manually send a message - * which might be composed of several lines. - */ - bool send(const std::string& message); - void cleanup(std::string& key); bool ShouldSend(float sample_rate); @@ -58,11 +55,8 @@ private: mutable Mutex cs; mutable FastRandomContext insecure_rand GUARDED_BY(cs); - std::unique_ptr m_sock{nullptr}; - std::pair m_server{{}, sizeof(struct sockaddr_storage)}; + std::unique_ptr m_sender{nullptr}; - const uint16_t m_port; - const std::string m_host; const std::string m_nodename; const std::string m_ns; }; diff --git a/src/stats/rawsender.cpp b/src/stats/rawsender.cpp new file mode 100644 index 0000000000..98b42464b1 --- /dev/null +++ b/src/stats/rawsender.cpp @@ -0,0 +1,74 @@ +// Copyright (c) 2017-2023 Vincent Thiery +// Copyright (c) 2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include +#include + +RawSender::RawSender(const std::string& host, uint16_t port, std::optional& error) : + m_host{host}, + m_port{port} +{ + if (host.empty()) { + error = "No host specified"; + return; + } + + if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) { + if (!netaddr->IsIPv4()) { + error = strprintf("Host %s on unsupported network", m_host); + return; + } + if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast(&m_server.first), &m_server.second)) { + error = strprintf("Cannot get socket address for %s", m_host); + return; + } + } else { + error = strprintf("Unable to lookup host %s", m_host); + return; + } + + SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (hSocket == INVALID_SOCKET) { + error = strprintf("Cannot create socket (socket() returned error %s)", NetworkErrorString(WSAGetLastError())); + return; + } + m_sock = std::make_unique(hSocket); + + LogPrintf("Started RawSender sending messages to %s:%d\n", m_host, m_port); +} + +RawSender::~RawSender() +{ + LogPrintf("Stopping RawSender instance sending messages to %s:%d. %d successes, %d failures.\n", + m_host, m_port, m_successes, m_failures); +} + +std::optional RawSender::Send(const RawMessage& msg) +{ + if (!m_sock) { + m_failures++; + return "Socket not initialized, cannot send message"; + } + + if (::sendto(m_sock->Get(), reinterpret_cast(msg.data()), +#ifdef WIN32 + static_cast(msg.size()), +#else + msg.size(), +#endif // WIN32 + /*flags=*/0, reinterpret_cast(&m_server.first), m_server.second) == SOCKET_ERROR) { + m_failures++; + return strprintf("Unable to send message to %s (sendto() returned error %s)", this->ToStringHostPort(), + NetworkErrorString(WSAGetLastError())); + } + + m_successes++; + return std::nullopt; +} + +std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); } diff --git a/src/stats/rawsender.h b/src/stats/rawsender.h new file mode 100644 index 0000000000..dd12f14635 --- /dev/null +++ b/src/stats/rawsender.h @@ -0,0 +1,84 @@ +// Copyright (c) 2017-2023 Vincent Thiery +// Copyright (c) 2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_STATS_RAWSENDER_H +#define BITCOIN_STATS_RAWSENDER_H + +#include +#include + +#include +#include +#include +#include + +class Sock; + +struct RawMessage : public std::vector +{ + using parent_type = std::vector; + using parent_type::parent_type; + + explicit RawMessage(const std::string& data) : parent_type{data.begin(), data.end()} {} + + parent_type& operator+=(value_type rhs) { return append(rhs); } + parent_type& operator+=(std::string::value_type rhs) { return append(rhs); } + parent_type& operator+=(const parent_type& rhs) { return append(rhs); } + parent_type& operator+=(const std::string& rhs) { return append(rhs); } + + parent_type& append(value_type rhs) + { + push_back(rhs); + return *this; + } + parent_type& append(std::string::value_type rhs) + { + push_back(static_cast(rhs)); + return *this; + } + parent_type& append(const parent_type& rhs) + { + insert(end(), rhs.begin(), rhs.end()); + return *this; + } + parent_type& append(const std::string& rhs) + { + insert(end(), rhs.begin(), rhs.end()); + return *this; + } +}; + +class RawSender +{ +public: + RawSender(const std::string& host, uint16_t port, std::optional& error); + ~RawSender(); + + RawSender(const RawSender&) = delete; + RawSender& operator=(const RawSender&) = delete; + RawSender(RawSender&&) = delete; + + std::optional Send(const RawMessage& msg); + + std::string ToStringHostPort() const; + +private: + /* Socket used to communicate with host */ + std::unique_ptr m_sock{nullptr}; + /* Socket address containing host information */ + std::pair m_server{{}, sizeof(struct sockaddr_storage)}; + + /* Hostname of server receiving messages */ + const std::string m_host; + /* Port of server receiving messages */ + const uint16_t m_port; + + /* Number of messages sent */ + uint64_t m_successes{0}; + /* Number of messages not sent */ + uint64_t m_failures{0}; +}; + +#endif // BITCOIN_STATS_RAWSENDER_H