stats: move message sending logic to RawSender

`RawSender` is inspired by `UDPSender` from `vthiery/cpp-statsd-client`
and separating it out of `StatsdClient` is needed to implement queueing
and batching support in upcoming commits.

This is the start of migrating our Statsd codebase to `cpp-statsd-client`.
This commit is contained in:
Kittywhiskers Van Gogh 2024-10-04 08:47:36 +00:00
parent 92690685be
commit fc4a736e2a
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
5 changed files with 212 additions and 76 deletions

View File

@ -314,6 +314,7 @@ BITCOIN_CORE_H = \
stacktraces.h \ stacktraces.h \
streams.h \ streams.h \
stats/client.h \ stats/client.h \
stats/rawsender.h \
support/allocators/mt_pooled_secure.h \ support/allocators/mt_pooled_secure.h \
support/allocators/pool.h \ support/allocators/pool.h \
support/allocators/pooled_secure.h \ support/allocators/pooled_secure.h \
@ -527,6 +528,7 @@ libbitcoin_server_a_SOURCES = \
shutdown.cpp \ shutdown.cpp \
spork.cpp \ spork.cpp \
stats/client.cpp \ stats/client.cpp \
stats/rawsender.cpp \
timedata.cpp \ timedata.cpp \
torcontrol.cpp \ torcontrol.cpp \
txdb.cpp \ txdb.cpp \

View File

@ -1,4 +1,5 @@
// Copyright (c) 2014-2017 Statoshi Developers // Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2020-2024 The Dash Core developers // Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying // Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -35,8 +36,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <stats/client.h> #include <stats/client.h>
#include <compat.h> #include <stats/rawsender.h>
#include <netbase.h>
#include <util/system.h> #include <util/system.h>
#include <cmath> #include <cmath>
@ -60,10 +60,8 @@ bool StatsdClient::ShouldSend(float sample_rate)
return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand); return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand);
} }
StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, const std::string& ns, StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
bool enabled) : const std::string& ns, bool enabled) :
m_port{port},
m_host{host},
m_nodename{nodename}, m_nodename{nodename},
m_ns{ns} m_ns{ns}
{ {
@ -72,30 +70,18 @@ StatsdClient::StatsdClient(const std::string& host, const std::string& nodename,
return; return;
} }
if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) { std::optional<std::string> error_opt;
if (!netaddr->IsIPv4()) { m_sender = std::make_unique<RawSender>(host, port, error_opt);
LogPrintf("ERROR: Host %s on unsupported network, cannot init StatsdClient\n", m_host); if (error_opt.has_value()) {
return; LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value());
} m_sender.reset();
if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
LogPrintf("ERROR: Cannot get socket address for %s, cannot init StatsdClient\n", m_host);
return;
}
} else {
LogPrintf("Unable to lookup host %s, cannot init StatsdClient\n", m_host);
return; return;
} }
SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", host, port);
if (hSocket == INVALID_SOCKET) {
LogPrintf("ERROR: Cannot create socket (socket() returned error %s), cannot init StatsdClient\n",
NetworkErrorString(WSAGetLastError()));
return;
} }
m_sock = std::make_unique<Sock>(hSocket);
LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", m_host, m_port); StatsdClient::~StatsdClient() {}
}
/* will change the original string */ /* will change the original string */
void StatsdClient::cleanup(std::string& key) void StatsdClient::cleanup(std::string& key)
@ -133,7 +119,7 @@ bool StatsdClient::timing(const std::string& key, int64_t ms, float sample_rate)
bool StatsdClient::send(std::string key, int64_t value, const std::string& type, float sample_rate) bool StatsdClient::send(std::string key, int64_t value, const std::string& type, float sample_rate)
{ {
if (!m_sock) { if (!m_sender) {
return false; return false;
} }
@ -147,17 +133,22 @@ bool StatsdClient::send(std::string key, int64_t value, const std::string& type,
cleanup(key); cleanup(key);
std::string buf{strprintf("%s%s:%d|%s", m_ns, key, value, type)}; RawMessage msg{strprintf("%s%s:%d|%s", m_ns, key, value, type)};
if (sample_rate < 1.f) { if (sample_rate < 1.f) {
buf += strprintf("|@%.2f", sample_rate); msg += strprintf("|@%.2f", sample_rate);
} }
return send(buf); if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) {
LogPrintf("ERROR: %s.\n", error_opt.value());
return false;
}
return true;
} }
bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate) bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate)
{ {
if (!m_sock) { if (!m_sender) {
return false; return false;
} }
@ -171,22 +162,13 @@ bool StatsdClient::sendDouble(std::string key, double value, const std::string&
cleanup(key); cleanup(key);
std::string buf{strprintf("%s%s:%f|%s", m_ns, key, value, type)}; RawMessage msg{strprintf("%s%s:%f|%s", m_ns, key, value, type)};
if (sample_rate < 1.f) { if (sample_rate < 1.f) {
buf += strprintf("|@%.2f", sample_rate); msg += strprintf("|@%.2f", sample_rate);
} }
return send(buf); if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) {
} LogPrintf("ERROR: %s.\n", error_opt.value());
bool StatsdClient::send(const std::string& message)
{
assert(m_sock);
if (::sendto(m_sock->Get(), message.data(), message.size(), /*flags=*/0,
reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
LogPrintf("ERROR: Unable to send message (sendto() returned error %s), host=%s:%d\n",
NetworkErrorString(WSAGetLastError()), m_host, m_port);
return false; return false;
} }

View File

@ -1,4 +1,5 @@
// Copyright (c) 2014-2017 Statoshi Developers // Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2020-2023 The Dash Core developers // Copyright (c) 2020-2023 The Dash Core developers
// Distributed under the MIT software license, see the accompanying // Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php. // file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -7,12 +8,13 @@
#define BITCOIN_STATS_CLIENT_H #define BITCOIN_STATS_CLIENT_H
#include <random.h> #include <random.h>
#include <threadsafety.h> #include <sync.h>
#include <util/sock.h>
#include <memory> #include <memory>
#include <string> #include <string>
class RawSender;
static constexpr bool DEFAULT_STATSD_ENABLE{false}; static constexpr bool DEFAULT_STATSD_ENABLE{false};
static constexpr uint16_t DEFAULT_STATSD_PORT{8125}; static constexpr uint16_t DEFAULT_STATSD_PORT{8125};
static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"}; static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"};
@ -29,6 +31,7 @@ class StatsdClient
public: public:
explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
const std::string& ns, bool enabled); const std::string& ns, bool enabled);
~StatsdClient();
public: public:
bool inc(const std::string& key, float sample_rate = 1.f); bool inc(const std::string& key, float sample_rate = 1.f);
@ -45,12 +48,6 @@ public:
bool sendDouble(std::string key, double value, const std::string& type, float sample_rate); bool sendDouble(std::string key, double value, const std::string& type, float sample_rate);
private: private:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
bool send(const std::string& message);
void cleanup(std::string& key); void cleanup(std::string& key);
bool ShouldSend(float sample_rate); bool ShouldSend(float sample_rate);
@ -58,11 +55,8 @@ private:
mutable Mutex cs; mutable Mutex cs;
mutable FastRandomContext insecure_rand GUARDED_BY(cs); mutable FastRandomContext insecure_rand GUARDED_BY(cs);
std::unique_ptr<Sock> m_sock{nullptr}; std::unique_ptr<RawSender> m_sender{nullptr};
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
const uint16_t m_port;
const std::string m_host;
const std::string m_nodename; const std::string m_nodename;
const std::string m_ns; const std::string m_ns;
}; };

74
src/stats/rawsender.cpp Normal file
View File

@ -0,0 +1,74 @@
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <stats/rawsender.h>
#include <netaddress.h>
#include <netbase.h>
#include <util/sock.h>
RawSender::RawSender(const std::string& host, uint16_t port, std::optional<std::string>& error) :
m_host{host},
m_port{port}
{
if (host.empty()) {
error = "No host specified";
return;
}
if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) {
if (!netaddr->IsIPv4()) {
error = strprintf("Host %s on unsupported network", m_host);
return;
}
if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
error = strprintf("Cannot get socket address for %s", m_host);
return;
}
} else {
error = strprintf("Unable to lookup host %s", m_host);
return;
}
SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (hSocket == INVALID_SOCKET) {
error = strprintf("Cannot create socket (socket() returned error %s)", NetworkErrorString(WSAGetLastError()));
return;
}
m_sock = std::make_unique<Sock>(hSocket);
LogPrintf("Started RawSender sending messages to %s:%d\n", m_host, m_port);
}
RawSender::~RawSender()
{
LogPrintf("Stopping RawSender instance sending messages to %s:%d. %d successes, %d failures.\n",
m_host, m_port, m_successes, m_failures);
}
std::optional<std::string> RawSender::Send(const RawMessage& msg)
{
if (!m_sock) {
m_failures++;
return "Socket not initialized, cannot send message";
}
if (::sendto(m_sock->Get(), reinterpret_cast<const char*>(msg.data()),
#ifdef WIN32
static_cast<int>(msg.size()),
#else
msg.size(),
#endif // WIN32
/*flags=*/0, reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
m_failures++;
return strprintf("Unable to send message to %s (sendto() returned error %s)", this->ToStringHostPort(),
NetworkErrorString(WSAGetLastError()));
}
m_successes++;
return std::nullopt;
}
std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); }

84
src/stats/rawsender.h Normal file
View File

@ -0,0 +1,84 @@
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_STATS_RAWSENDER_H
#define BITCOIN_STATS_RAWSENDER_H
#include <compat.h>
#include <sync.h>
#include <memory>
#include <optional>
#include <string>
#include <vector>
class Sock;
struct RawMessage : public std::vector<uint8_t>
{
using parent_type = std::vector<value_type>;
using parent_type::parent_type;
explicit RawMessage(const std::string& data) : parent_type{data.begin(), data.end()} {}
parent_type& operator+=(value_type rhs) { return append(rhs); }
parent_type& operator+=(std::string::value_type rhs) { return append(rhs); }
parent_type& operator+=(const parent_type& rhs) { return append(rhs); }
parent_type& operator+=(const std::string& rhs) { return append(rhs); }
parent_type& append(value_type rhs)
{
push_back(rhs);
return *this;
}
parent_type& append(std::string::value_type rhs)
{
push_back(static_cast<value_type>(rhs));
return *this;
}
parent_type& append(const parent_type& rhs)
{
insert(end(), rhs.begin(), rhs.end());
return *this;
}
parent_type& append(const std::string& rhs)
{
insert(end(), rhs.begin(), rhs.end());
return *this;
}
};
class RawSender
{
public:
RawSender(const std::string& host, uint16_t port, std::optional<std::string>& error);
~RawSender();
RawSender(const RawSender&) = delete;
RawSender& operator=(const RawSender&) = delete;
RawSender(RawSender&&) = delete;
std::optional<std::string> Send(const RawMessage& msg);
std::string ToStringHostPort() const;
private:
/* Socket used to communicate with host */
std::unique_ptr<Sock> m_sock{nullptr};
/* Socket address containing host information */
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
/* Hostname of server receiving messages */
const std::string m_host;
/* Port of server receiving messages */
const uint16_t m_port;
/* Number of messages sent */
uint64_t m_successes{0};
/* Number of messages not sent */
uint64_t m_failures{0};
};
#endif // BITCOIN_STATS_RAWSENDER_H