Merge #6267: feat(stats): split off transmission to RawSender, implement batching and queueing support, add streamlined prefix and suffix support

cc1a75ab3a docs: add release notes (Kittywhiskers Van Gogh)
39625f16f0 stats: drop copyright notice from `stats/client.cpp` (Kittywhiskers Van Gogh)
18a2e48eb9 stats: rename `statsns` to clearer `statsprefix` (Kittywhiskers Van Gogh)
42918c2cdc stats: rename `statshostname` to more appropriate `statssuffix` (Kittywhiskers Van Gogh)
f3a4844b0a stats: implicitly treat stats as enabled if `statshost` is specified (Kittywhiskers Van Gogh)
69603a83fa stats: miscellaneous changes and housekeeping (Kittywhiskers Van Gogh)
3e12ac0e09 stats: deduplicate `send` and `sendDouble` logic (Kittywhiskers Van Gogh)
bf44fc3bf6 feat(stats): introduce support for batching messages (Kittywhiskers Van Gogh)
38b1643fe6 feat(stats): introduce support for queuing messages (Kittywhiskers Van Gogh)
fc4a736e2a stats: move message sending logic to `RawSender` (Kittywhiskers Van Gogh)
92690685be stats: move `statsd_client` to `stats` directory (Kittywhiskers Van Gogh)
f782dfd562 stats: remove double indentation in header file (Kittywhiskers Van Gogh)

Pull request description:

  ## Motivation

  This pull request achieves the goal originally set out in [dash#5167](https://github.com/dashpay/dash/pull/5167), to migrate the base of our Statsd client implementation to one that is actively maintained. Statoshi ([source](54c3ffdcf0/src/statsd_client.cpp)) utilizes [talebook/statsd-client-cpp](https://github.com/talebook/statsd-client-cpp), which in turn is inherited by Dash.

  As Statsd is the only cross-platform reporting mechanism available (USDT requires a Linux host with superuser privileges and RPCs don't provide as much flexibility as desired), emphasis is placed on using Statsd as the primary way for the Dash daemon to report metrics related to node and network health.

  As part of maintaining our Statsd client, this PR aims to migrate the base of our implementation to [vthiery/cpp-statsd-client](https://github.com/vthiery/cpp-statsd-client), a streamlined implementation that embraces C++11 with a thread-safe implementation of queueing and batching, which reduces the number of packets used to transmit stats.

  These capabilities are optional and users can opt not to use them by setting `-statsduration=0` to disable queueing (which will also disable batching) or `-statsbatchsize=0`, which will disable batching (but will not disable queueing unless requested explicitly).

  ## Additional Information

  * Dependent on https://github.com/dashpay/dash/pull/5167
  * `RawSender` (and by extension, `RawMessage`) strive to remain as unopinionated as possible, moving the responsibility to construct valid Statsd messages onto `StatsdClient`. This is to ensure that  `RawSender` can be reused down the line or independently extended without impacting the Statsd client.
    * `RawMessage` exists to provide extensions to `std::vector<uint8_t>` that make it easier to abstract away strings while also implementing some of its semantics like `append()` (and its alias, `+=`).
  * `InitStatsClient()` was introduced to keep `StatsdClient` indifferent to _how_ arguments are obtained and sanitized before they're supplied to the constructor. This is to keep it indifferent to all the backwards-compatibility code for deprecated arguments still work.
  * When constructing the Statsd message, we can use `%f` without having to specify a precision as tinyformat automatically assumes a precision of 6 ([source](17110f50b3/src/tinyformat.h (L673))) and problems don't seem to be observed when using `%f` with integers ([source](https://github.com/dashpay/dash/pull/6267#issuecomment-2345592051)).
    * As a guardrail, there is a `static_assert` to ensure that a specialization of `send()` involving a non-arithmetic type will raise alarm when compiling ([source](a0ce720207/src/stats/client.cpp (L145))).

  ## Breaking changes

  * `-statsenabled` (replaced with specifying `-statshost`), `-statshostname` (replaced by `-statssuffix`) and `-statsns` (replaced by `-statsprefix`) have been deprecated and will be removed in a future release.

  ## Checklist:

  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas
  - [x] I have added or updated relevant unit/integration/functional/e2e tests **(note: N/A)**
  - [x] I have made corresponding changes to the documentation
  - [x] I have assigned this pull request to a milestone _(for repository code-owners and collaborators only)_

ACKs for top commit:
  knst:
    utACK cc1a75ab3a
  UdjinM6:
    utACK cc1a75ab3a

Tree-SHA512: b038419f2b6d807dac40a04d23c5046fbaa95beedb88f5a9e4c06a7042c2f5da7e01c72c4a2744bce10878cafc747136d6599dcd86ae1be0782ad4194d5b7bec
This commit is contained in:
pasta 2024-10-04 13:56:37 -05:00
commit 251f16b82b
No known key found for this signature in database
GPG Key ID: E2F3D7916E722D38
17 changed files with 584 additions and 299 deletions

View File

@ -26,7 +26,6 @@ EXCLUDE = [
'src/crypto/*',
'src/ctpl_stl.h',
'src/reverse_iterator.h',
'src/statsd_client.cpp',
'src/test/fuzz/FuzzedDataProvider.h',
'src/tinyformat.h',
'src/bench/nanobench.h',

25
doc/release-notes-6267.md Normal file
View File

@ -0,0 +1,25 @@
Statistics
-----------
### New Features
- The Statsd client now supports queueing and batching messages, reducing the number of packets and the rate at which
they are sent to the Statsd daemon.
- The maximum size of each batch of messages (default, 1KiB) can be adjusted using `-statsbatchsize` (in bytes)
and the frequency at which queued messages are sent to the daemon (default, 1 second) can be adjusted using
`-statsduration` (in milliseconds)
- `-statsduration` has no bearing on `-statsperiod`, which dictates how frequently some stats are _collected_.
### Deprecations
- `-statsenabled` has been deprecated and enablement will now be implied by the presence of `-statshost`. `-statsenabled`
will be removed in a future release.
- `-statshostname` has been deprecated and replaced with `-statssuffix` as the latter is better representative of the
argument's purpose. They behave identically to each other. `-statshostname` will be removed in a future
release.
- `-statsns` has been deprecated and replaced with `-statsprefix` as the latter is better representative of the
argument's purpose. `-statsprefix`, unlike `-statsns`, will enforce the usage of a delimiter between the prefix
and key. `-statsns` will be removed in a future release.

View File

@ -315,7 +315,8 @@ BITCOIN_CORE_H = \
spork.h \
stacktraces.h \
streams.h \
statsd_client.h \
stats/client.h \
stats/rawsender.h \
support/allocators/mt_pooled_secure.h \
support/allocators/pool.h \
support/allocators/pooled_secure.h \
@ -530,7 +531,8 @@ libbitcoin_server_a_SOURCES = \
script/sigcache.cpp \
shutdown.cpp \
spork.cpp \
statsd_client.cpp \
stats/client.cpp \
stats/rawsender.cpp \
timedata.cpp \
torcontrol.cpp \
txdb.cpp \

View File

@ -106,7 +106,7 @@
#include <llmq/snapshot.h>
#include <llmq/signing_shares.h>
#include <statsd_client.h>
#include <stats/client.h>
#include <algorithm>
#include <condition_variable>
@ -770,12 +770,16 @@ void SetupServerArgs(ArgsManager& argsman)
argsman.AddArg("-rpcworkqueue=<n>", strprintf("Set the depth of the work queue to service RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC);
argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
hidden_args.emplace_back("-statsenabled");
argsman.AddArg("-statsbatchsize=<bytes>", strprintf("Specify the size of each batch of stats messages (default: %d)", DEFAULT_STATSD_BATCH_SIZE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsduration=<ms>", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshost=<ip>", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshostname=<ip>", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
hidden_args.emplace_back("-statshostname");
argsman.AddArg("-statsport=<port>", strprintf("Specify statsd port (default: %u)", DEFAULT_STATSD_PORT), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsns=<ns>", strprintf("Specify additional namespace prefix (default: %s)", DEFAULT_STATSD_NAMESPACE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
hidden_args.emplace_back("-statsns");
argsman.AddArg("-statsperiod=<seconds>", strprintf("Specify the number of seconds between periodic measurements (default: %d)", DEFAULT_STATSD_PERIOD), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsprefix=<string>", strprintf("Specify an optional string prepended to every stats key (default: %s)", DEFAULT_STATSD_PREFIX), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statssuffix=<string>", strprintf("Specify an optional string appended to every stats key (default: %s)", DEFAULT_STATSD_SUFFIX), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
#if HAVE_DECL_FORK
argsman.AddArg("-daemon", strprintf("Run in the background as a daemon and accept commands (default: %d)", DEFAULT_DAEMON), ArgsManager::ALLOW_BOOL, OptionsCategory::OPTIONS);
argsman.AddArg("-daemonwait", strprintf("Wait for initialization to be finished before exiting. This implies -daemon (default: %d)", DEFAULT_DAEMONWAIT), ArgsManager::ALLOW_BOOL, OptionsCategory::OPTIONS);
@ -835,7 +839,7 @@ static void StartupNotify(const ArgsManager& args)
static void PeriodicStats(ArgsManager& args, ChainstateManager& chainman, const CTxMemPool& mempool)
{
assert(args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE));
assert(::g_stats_client->active());
CCoinsStats stats{CoinStatsHashType::NONE};
chainman.ActiveChainstate().ForceFlushStateToDisk();
if (WITH_LOCK(cs_main, return GetUTXOStats(&chainman.ActiveChainstate().CoinsDB(), std::ref(chainman.m_blockman), stats, RpcInterruptionPoint, chainman.ActiveChain().Tip()))) {
@ -1538,11 +1542,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// We need to initialize g_stats_client early as currently, g_stats_client is called
// regardless of whether transmitting stats are desirable or not and if
// g_stats_client isn't present when that attempt is made, the client will crash.
::g_stats_client = std::make_unique<statsd::StatsdClient>(args.GetArg("-statshost", DEFAULT_STATSD_HOST),
args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
args.GetArg("-statsport", DEFAULT_STATSD_PORT),
args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),
args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE));
::g_stats_client = InitStatsClient(args);
{
@ -2273,7 +2273,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
#endif // ENABLE_WALLET
}
if (args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)) {
if (::g_stats_client->active()) {
int nStatsPeriod = std::min(std::max((int)args.GetArg("-statsperiod", DEFAULT_STATSD_PERIOD), MIN_STATSD_PERIOD), MAX_STATSD_PERIOD);
node.scheduler->scheduleEvery(std::bind(&PeriodicStats, std::ref(*node.args), std::ref(chainman), std::cref(*node.mempool)), std::chrono::seconds{nStatsPeriod});
}

View File

@ -42,7 +42,7 @@
#include <coinjoin/coinjoin.h>
#include <evo/deterministicmns.h>
#include <statsd_client.h>
#include <stats/client.h>
#ifdef WIN32
#include <string.h>
@ -2146,7 +2146,7 @@ void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync)
void CConnman::CalculateNumConnectionsChangedStats()
{
if (!gArgs.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)) {
if (!::g_stats_client->active()) {
return;
}
@ -4602,7 +4602,7 @@ void CConnman::RecordBytesRecv(uint64_t bytes)
{
nTotalBytesRecv += bytes;
::g_stats_client->count("bandwidth.bytesReceived", bytes, 0.1f);
::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv, 0.01f);
::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv.load(), 0.01f);
}
void CConnman::RecordBytesSent(uint64_t bytes)

View File

@ -70,7 +70,7 @@
#include <llmq/signing_shares.h>
#include <llmq/snapshot.h>
#include <statsd_client.h>
#include <stats/client.h>
/** Maximum number of in-flight objects from a peer */
static constexpr int32_t MAX_PEER_OBJECT_IN_FLIGHT = 100;

182
src/stats/client.cpp Normal file
View File

@ -0,0 +1,182 @@
// Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <stats/client.h>
#include <stats/rawsender.h>
#include <util/system.h>
#include <cmath>
#include <cstdio>
#include <random>
namespace {
/** Threshold below which a value is considered effectively zero */
static constexpr float EPSILON{0.0001f};
/** Delimiter segmenting two fully formed Statsd messages */
static constexpr char STATSD_MSG_DELIMITER{'\n'};
/** Delimiter segmenting namespaces in a Statsd key */
static constexpr char STATSD_NS_DELIMITER{'.'};
/** Character used to denote Statsd message type as count */
static constexpr char STATSD_METRIC_COUNT[]{"c"};
/** Character used to denote Statsd message type as gauge */
static constexpr char STATSD_METRIC_GAUGE[]{"g"};
/** Characters used to denote Statsd message type as timing */
static constexpr char STATSD_METRIC_TIMING[]{"ms"};
} // anonymous namespace
std::unique_ptr<StatsdClient> g_stats_client;
std::unique_ptr<StatsdClient> InitStatsClient(const ArgsManager& args)
{
auto is_enabled = args.GetBoolArg("-statsenabled", /*fDefault=*/false);
auto host = args.GetArg("-statshost", /*fDefault=*/"");
if (is_enabled && host.empty()) {
// Stats are enabled but host has not been specified, then use
// default host. This is to preserve old behavior.
host = DEFAULT_STATSD_HOST;
} else if (!host.empty()) {
// Host is specified but stats are not explcitly enabled. Assume
// that if a host has been specified, we want stats enabled. This
// is new behaviour and will substitute old behaviour in a future
// release.
is_enabled = true;
}
auto sanitize_string = [](std::string& string) {
// Remove key delimiters from the front and back as they're added back
if (!string.empty()) {
if (string.front() == STATSD_NS_DELIMITER) string.erase(string.begin());
if (string.back() == STATSD_NS_DELIMITER) string.pop_back();
}
};
// Get our prefix and suffix and if we get nothing, try again with the
// deprecated argument. If we still get nothing, that's fine, they're optional.
auto prefix = args.GetArg("-statsprefix", DEFAULT_STATSD_PREFIX);
if (prefix.empty()) {
prefix = args.GetArg("-statsns", DEFAULT_STATSD_PREFIX);
} else {
// We restrict sanitization logic to our newly added arguments to
// prevent breaking changes.
sanitize_string(prefix);
// We need to add the delimiter here for backwards compatibility with
// the deprecated argument.
//
// TODO: Move this step into the constructor when removing deprecated
// args support
prefix += STATSD_NS_DELIMITER;
}
auto suffix = args.GetArg("-statssuffix", DEFAULT_STATSD_SUFFIX);
if (suffix.empty()) {
suffix = args.GetArg("-statshostname", DEFAULT_STATSD_SUFFIX);
} else {
// We restrict sanitization logic to our newly added arguments to
// prevent breaking changes.
sanitize_string(suffix);
}
return std::make_unique<StatsdClient>(
host,
args.GetArg("-statsport", DEFAULT_STATSD_PORT),
args.GetArg("-statsbatchsize", DEFAULT_STATSD_BATCH_SIZE),
args.GetArg("-statsduration", DEFAULT_STATSD_DURATION),
prefix,
suffix,
is_enabled
);
}
StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms,
const std::string& prefix, const std::string& suffix, bool enabled) :
m_prefix{prefix},
m_suffix{[suffix]() { return !suffix.empty() ? STATSD_NS_DELIMITER + suffix : suffix; }()}
{
if (!enabled) {
LogPrintf("Transmitting stats are disabled, will not init StatsdClient\n");
return;
}
std::optional<std::string> error_opt;
m_sender = std::make_unique<RawSender>(host, port,
std::make_pair(batch_size, static_cast<uint8_t>(STATSD_MSG_DELIMITER)),
interval_ms, error_opt);
if (error_opt.has_value()) {
LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value());
m_sender.reset();
return;
}
LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", host, port);
}
StatsdClient::~StatsdClient() {}
bool StatsdClient::dec(const std::string& key, float sample_rate) { return count(key, -1, sample_rate); }
bool StatsdClient::inc(const std::string& key, float sample_rate) { return count(key, 1, sample_rate); }
bool StatsdClient::count(const std::string& key, int64_t delta, float sample_rate)
{
return send(key, delta, STATSD_METRIC_COUNT, sample_rate);
}
bool StatsdClient::gauge(const std::string& key, int64_t value, float sample_rate)
{
return send(key, value, STATSD_METRIC_GAUGE, sample_rate);
}
bool StatsdClient::gaugeDouble(const std::string& key, double value, float sample_rate)
{
return send(key, value, STATSD_METRIC_GAUGE, sample_rate);
}
bool StatsdClient::timing(const std::string& key, uint64_t ms, float sample_rate)
{
return send(key, ms, STATSD_METRIC_TIMING, sample_rate);
}
template <typename T1>
bool StatsdClient::send(const std::string& key, T1 value, const std::string& type, float sample_rate)
{
static_assert(std::is_arithmetic<T1>::value, "Must specialize to an arithmetic type");
if (!m_sender) {
return false;
}
// Determine if we should send the message at all but claim that we did even if we don't
sample_rate = std::clamp(sample_rate, 0.f, 1.f);
bool always_send = std::fabs(sample_rate - 1.f) < EPSILON;
bool never_send = std::fabs(sample_rate) < EPSILON;
if (never_send || (!always_send &&
WITH_LOCK(cs, return sample_rate < std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand)))) {
return true;
}
// Construct the message and if our message isn't always-send, report the sample rate
RawMessage msg{strprintf("%s%s%s:%f|%s", m_prefix, key, m_suffix, value, type)};
if (!always_send) {
msg += strprintf("|@%.2f", sample_rate);
}
// Send it and report an error if we encounter one
if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) {
LogPrintf("ERROR: %s.\n", error_opt.value());
return false;
}
return true;
}
template bool StatsdClient::send(const std::string& key, double value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, int32_t value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, int64_t value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, uint32_t value, const std::string& type, float sample_rate);
template bool StatsdClient::send(const std::string& key, uint64_t value, const std::string& type, float sample_rate);

83
src/stats/client.h Normal file
View File

@ -0,0 +1,83 @@
// Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2020-2023 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_STATS_CLIENT_H
#define BITCOIN_STATS_CLIENT_H
#include <random.h>
#include <sync.h>
#include <memory>
#include <string>
class ArgsManager;
class RawSender;
/** Default port used to connect to a Statsd server */
static constexpr uint16_t DEFAULT_STATSD_PORT{8125};
/** Default host assumed to be running a Statsd server */
static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"};
/** Default prefix prepended to Statsd message keys */
static const std::string DEFAULT_STATSD_PREFIX{""};
/** Default suffix appended to Statsd message keys */
static const std::string DEFAULT_STATSD_SUFFIX{""};
/** Default number of milliseconds between flushing a queue of messages */
static constexpr int DEFAULT_STATSD_DURATION{1000};
/** Default number of seconds between recording periodic stats */
static constexpr int DEFAULT_STATSD_PERIOD{60};
/** Default size in bytes of a batch of messages */
static constexpr int DEFAULT_STATSD_BATCH_SIZE{1024};
/** Minimum number of seconds between recording periodic stats */
static constexpr int MIN_STATSD_PERIOD{5};
/** Maximum number of seconds between recording periodic stats */
static constexpr int MAX_STATSD_PERIOD{60 * 60};
class StatsdClient
{
public:
explicit StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms,
const std::string& prefix, const std::string& suffix, bool enabled);
~StatsdClient();
public:
/* Statsd-defined APIs */
bool dec(const std::string& key, float sample_rate = 1.f);
bool inc(const std::string& key, float sample_rate = 1.f);
bool count(const std::string& key, int64_t delta, float sample_rate = 1.f);
bool gauge(const std::string& key, int64_t value, float sample_rate = 1.f);
bool gaugeDouble(const std::string& key, double value, float sample_rate = 1.f);
bool timing(const std::string& key, uint64_t ms, float sample_rate = 1.f);
/* Statsd-compatible APIs */
template <typename T1>
bool send(const std::string& key, T1 value, const std::string& type, float sample_rate = 1.f);
/* Check if a StatsdClient instance is ready to send messages */
bool active() const { return m_sender != nullptr; }
private:
/* Mutex to protect PRNG */
mutable Mutex cs;
/* PRNG used to dice-roll messages that are 0 < f < 1 */
mutable FastRandomContext insecure_rand GUARDED_BY(cs);
/* Broadcasts messages crafted by StatsdClient */
std::unique_ptr<RawSender> m_sender{nullptr};
/* Phrase prepended to keys */
const std::string m_prefix{""};
/* Phrase appended to keys */
const std::string m_suffix{""};
};
/** Parses arguments and constructs a StatsdClient instance */
std::unique_ptr<StatsdClient> InitStatsClient(const ArgsManager& args);
/** Global smart pointer containing StatsdClient instance */
extern std::unique_ptr<StatsdClient> g_stats_client;
#endif // BITCOIN_STATS_CLIENT_H

163
src/stats/rawsender.cpp Normal file
View File

@ -0,0 +1,163 @@
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <stats/rawsender.h>
#include <netaddress.h>
#include <netbase.h>
#include <util/sock.h>
#include <util/thread.h>
RawSender::RawSender(const std::string& host, uint16_t port, std::pair<uint64_t, uint8_t> batching_opts,
uint64_t interval_ms, std::optional<std::string>& error) :
m_host{host},
m_port{port},
m_batching_opts{batching_opts},
m_interval_ms{interval_ms}
{
if (host.empty()) {
error = "No host specified";
return;
}
if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) {
if (!netaddr->IsIPv4()) {
error = strprintf("Host %s on unsupported network", m_host);
return;
}
if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
error = strprintf("Cannot get socket address for %s", m_host);
return;
}
} else {
error = strprintf("Unable to lookup host %s", m_host);
return;
}
SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (hSocket == INVALID_SOCKET) {
error = strprintf("Cannot create socket (socket() returned error %s)", NetworkErrorString(WSAGetLastError()));
return;
}
m_sock = std::make_unique<Sock>(hSocket);
if (m_interval_ms == 0) {
LogPrintf("Send interval is zero, not starting RawSender queueing thread.\n");
} else {
m_interrupt.reset();
m_thread = std::thread(&util::TraceThread, "rawsender", [this] { QueueThreadMain(); });
}
LogPrintf("Started %sRawSender sending messages to %s:%d\n", m_thread.joinable() ? "threaded " : "", m_host, m_port);
}
RawSender::~RawSender()
{
// If there is a thread, interrupt and stop it
if (m_thread.joinable()) {
m_interrupt();
m_thread.join();
}
// Flush queue of uncommitted messages
QueueFlush();
LogPrintf("Stopped RawSender instance sending messages to %s:%d. %d successes, %d failures.\n",
m_host, m_port, m_successes, m_failures);
}
std::optional<std::string> RawSender::Send(const RawMessage& msg)
{
// If there is a thread, append to queue
if (m_thread.joinable()) {
QueueAdd(msg);
return std::nullopt;
}
// There isn't a queue, send directly
return SendDirectly(msg);
}
std::optional<std::string> RawSender::SendDirectly(const RawMessage& msg)
{
if (!m_sock) {
m_failures++;
return "Socket not initialized, cannot send message";
}
if (::sendto(m_sock->Get(), reinterpret_cast<const char*>(msg.data()),
#ifdef WIN32
static_cast<int>(msg.size()),
#else
msg.size(),
#endif // WIN32
/*flags=*/0, reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
m_failures++;
return strprintf("Unable to send message to %s (sendto() returned error %s)", this->ToStringHostPort(),
NetworkErrorString(WSAGetLastError()));
}
m_successes++;
return std::nullopt;
}
std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); }
void RawSender::QueueAdd(const RawMessage& msg)
{
AssertLockNotHeld(cs);
LOCK(cs);
const auto& [batch_size, batch_delim] = m_batching_opts;
// If no batch size has been specified, simply add to queue
if (batch_size == 0) {
m_queue.push_back(msg);
return;
}
// We can batch, either create a new batch in queue or append to existing batch in queue
if (m_queue.empty() || m_queue.back().size() + msg.size() >= batch_size) {
// Either we don't have a place to batch our message or we exceeded the batch size, make a new batch
m_queue.emplace_back();
m_queue.back().reserve(batch_size);
} else if (!m_queue.back().empty()) {
// When there is already a batch open we need a delimiter when its not empty
m_queue.back() += batch_delim;
}
// Add the new message to the batch
m_queue.back() += msg;
}
void RawSender::QueueFlush()
{
AssertLockNotHeld(cs);
WITH_LOCK(cs, QueueFlush(m_queue));
}
void RawSender::QueueFlush(std::deque<RawMessage>& queue)
{
while (!queue.empty()) {
SendDirectly(queue.front());
queue.pop_front();
}
}
void RawSender::QueueThreadMain()
{
AssertLockNotHeld(cs);
while (!m_interrupt) {
// Swap the queues to commit the existing queue of messages
std::deque<RawMessage> queue;
WITH_LOCK(cs, m_queue.swap(queue));
// Flush the committed queue
QueueFlush(queue);
assert(queue.empty());
if (!m_interrupt.sleep_for(std::chrono::milliseconds(m_interval_ms))) {
return;
}
}
}

106
src/stats/rawsender.h Normal file
View File

@ -0,0 +1,106 @@
// Copyright (c) 2017-2023 Vincent Thiery
// Copyright (c) 2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_STATS_RAWSENDER_H
#define BITCOIN_STATS_RAWSENDER_H
#include <compat.h>
#include <sync.h>
#include <threadinterrupt.h>
#include <deque>
#include <memory>
#include <optional>
#include <string>
#include <vector>
class Sock;
struct RawMessage : public std::vector<uint8_t>
{
using parent_type = std::vector<value_type>;
using parent_type::parent_type;
explicit RawMessage(const std::string& data) : parent_type{data.begin(), data.end()} {}
parent_type& operator+=(value_type rhs) { return append(rhs); }
parent_type& operator+=(std::string::value_type rhs) { return append(rhs); }
parent_type& operator+=(const parent_type& rhs) { return append(rhs); }
parent_type& operator+=(const std::string& rhs) { return append(rhs); }
parent_type& append(value_type rhs)
{
push_back(rhs);
return *this;
}
parent_type& append(std::string::value_type rhs)
{
push_back(static_cast<value_type>(rhs));
return *this;
}
parent_type& append(const parent_type& rhs)
{
insert(end(), rhs.begin(), rhs.end());
return *this;
}
parent_type& append(const std::string& rhs)
{
insert(end(), rhs.begin(), rhs.end());
return *this;
}
};
class RawSender
{
public:
RawSender(const std::string& host, uint16_t port, std::pair<uint64_t, uint8_t> batching_opts,
uint64_t interval_ms, std::optional<std::string>& error);
~RawSender();
RawSender(const RawSender&) = delete;
RawSender& operator=(const RawSender&) = delete;
RawSender(RawSender&&) = delete;
std::optional<std::string> Send(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs);
std::optional<std::string> SendDirectly(const RawMessage& msg);
std::string ToStringHostPort() const;
void QueueAdd(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs);
void QueueFlush() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void QueueFlush(std::deque<RawMessage>& queue);
void QueueThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs);
private:
/* Socket used to communicate with host */
std::unique_ptr<Sock> m_sock{nullptr};
/* Socket address containing host information */
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
/* Mutex to protect (batches of) messages queue */
mutable Mutex cs;
/* Interrupt for queue processing thread */
CThreadInterrupt m_interrupt;
/* Queue of (batches of) messages to be sent */
std::deque<RawMessage> m_queue GUARDED_BY(cs);
/* Thread that processes queue every m_interval_ms */
std::thread m_thread;
/* Hostname of server receiving messages */
const std::string m_host;
/* Port of server receiving messages */
const uint16_t m_port;
/* Batching parameters */
const std::pair</*size=*/uint64_t, /*delimiter=*/uint8_t> m_batching_opts{0, 0};
/* Time between queue thread runs (expressed in milliseconds) */
const uint64_t m_interval_ms;
/* Number of messages sent */
uint64_t m_successes{0};
/* Number of messages not sent */
uint64_t m_failures{0};
};
#endif // BITCOIN_STATS_RAWSENDER_H

View File

@ -1,196 +0,0 @@
// Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
/**
Copyright (c) 2014, Rex
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of the {organization} nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
**/
#include <statsd_client.h>
#include <compat.h>
#include <netbase.h>
#include <util/system.h>
#include <cmath>
#include <cstdio>
#include <random>
std::unique_ptr<statsd::StatsdClient> g_stats_client;
namespace statsd {
bool StatsdClient::ShouldSend(float sample_rate)
{
sample_rate = std::clamp(sample_rate, 0.f, 1.f);
constexpr float EPSILON{0.0001f};
/* If sample rate is 1, we should always send */
if (std::fabs(sample_rate - 1.f) < EPSILON) return true;
/* If sample rate is 0, we should never send */
if (std::fabs(sample_rate) < EPSILON) return false;
/* Sample rate is >0 and <1, roll the dice */
LOCK(cs);
return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand);
}
StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, const std::string& ns,
bool enabled) :
m_port{port},
m_host{host},
m_nodename{nodename},
m_ns{ns}
{
if (!enabled) {
LogPrintf("Transmitting stats are disabled, will not init StatsdClient\n");
return;
}
if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) {
if (!netaddr->IsIPv4()) {
LogPrintf("ERROR: Host %s on unsupported network, cannot init StatsdClient\n", m_host);
return;
}
if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
LogPrintf("ERROR: Cannot get socket address for %s, cannot init StatsdClient\n", m_host);
return;
}
} else {
LogPrintf("Unable to lookup host %s, cannot init StatsdClient\n", m_host);
return;
}
SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (hSocket == INVALID_SOCKET) {
LogPrintf("ERROR: Cannot create socket (socket() returned error %s), cannot init StatsdClient\n",
NetworkErrorString(WSAGetLastError()));
return;
}
m_sock = std::make_unique<Sock>(hSocket);
LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", m_host, m_port);
}
/* will change the original string */
void StatsdClient::cleanup(std::string& key)
{
auto pos = key.find_first_of(":|@");
while (pos != std::string::npos) {
key[pos] = '_';
pos = key.find_first_of(":|@");
}
}
bool StatsdClient::dec(const std::string& key, float sample_rate) { return count(key, -1, sample_rate); }
bool StatsdClient::inc(const std::string& key, float sample_rate) { return count(key, 1, sample_rate); }
bool StatsdClient::count(const std::string& key, int64_t value, float sample_rate)
{
return send(key, value, "c", sample_rate);
}
bool StatsdClient::gauge(const std::string& key, int64_t value, float sample_rate)
{
return send(key, value, "g", sample_rate);
}
bool StatsdClient::gaugeDouble(const std::string& key, double value, float sample_rate)
{
return sendDouble(key, value, "g", sample_rate);
}
bool StatsdClient::timing(const std::string& key, int64_t ms, float sample_rate)
{
return send(key, ms, "ms", sample_rate);
}
bool StatsdClient::send(std::string key, int64_t value, const std::string& type, float sample_rate)
{
if (!m_sock) {
return false;
}
if (!ShouldSend(sample_rate)) {
// Not our turn to send but report that we have
return true;
}
// partition stats by node name if set
if (!m_nodename.empty()) key = key + "." + m_nodename;
cleanup(key);
std::string buf{strprintf("%s%s:%d|%s", m_ns, key, value, type)};
if (sample_rate < 1.f) {
buf += strprintf("|@%.2f", sample_rate);
}
return send(buf);
}
bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate)
{
if (!m_sock) {
return false;
}
if (!ShouldSend(sample_rate)) {
// Not our turn to send but report that we have
return true;
}
// partition stats by node name if set
if (!m_nodename.empty()) key = key + "." + m_nodename;
cleanup(key);
std::string buf{strprintf("%s%s:%f|%s", m_ns, key, value, type)};
if (sample_rate < 1.f) {
buf += strprintf("|@%.2f", sample_rate);
}
return send(buf);
}
bool StatsdClient::send(const std::string& message)
{
assert(m_sock);
if (::sendto(m_sock->Get(), message.data(), message.size(), /*flags=*/0,
reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
LogPrintf("ERROR: Unable to send message (sendto() returned error %s), host=%s:%d\n",
NetworkErrorString(WSAGetLastError()), m_host, m_port);
return false;
}
return true;
}
} // namespace statsd

View File

@ -1,73 +0,0 @@
// Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2020-2023 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#ifndef BITCOIN_STATSD_CLIENT_H
#define BITCOIN_STATSD_CLIENT_H
#include <random.h>
#include <threadsafety.h>
#include <util/sock.h>
#include <string>
#include <memory>
static constexpr bool DEFAULT_STATSD_ENABLE{false};
static constexpr uint16_t DEFAULT_STATSD_PORT{8125};
static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"};
static const std::string DEFAULT_STATSD_HOSTNAME{""};
static const std::string DEFAULT_STATSD_NAMESPACE{""};
// schedule periodic measurements, in seconds: default - 1 minute, min - 5 sec, max - 1h.
static constexpr int DEFAULT_STATSD_PERIOD{60};
static constexpr int MIN_STATSD_PERIOD{5};
static constexpr int MAX_STATSD_PERIOD{60 * 60};
namespace statsd {
class StatsdClient {
public:
explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
const std::string& ns, bool enabled);
public:
bool inc(const std::string& key, float sample_rate = 1.f);
bool dec(const std::string& key, float sample_rate = 1.f);
bool count(const std::string& key, int64_t value, float sample_rate = 1.f);
bool gauge(const std::string& key, int64_t value, float sample_rate = 1.f);
bool gaugeDouble(const std::string& key, double value, float sample_rate = 1.f);
bool timing(const std::string& key, int64_t ms, float sample_rate = 1.f);
/* (Low Level Api) manually send a message
* type = "c", "g" or "ms"
*/
bool send(std::string key, int64_t value, const std::string& type, float sample_rate);
bool sendDouble(std::string key, double value, const std::string& type, float sample_rate);
private:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
bool send(const std::string& message);
void cleanup(std::string& key);
bool ShouldSend(float sample_rate);
private:
mutable Mutex cs;
mutable FastRandomContext insecure_rand GUARDED_BY(cs);
std::unique_ptr<Sock> m_sock{nullptr};
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
const uint16_t m_port;
const std::string m_host;
const std::string m_nodename;
const std::string m_ns;
};
} // namespace statsd
extern std::unique_ptr<statsd::StatsdClient> g_stats_client;
#endif // BITCOIN_STATSD_CLIENT_H

View File

@ -39,7 +39,7 @@
#include <scheduler.h>
#include <script/sigcache.h>
#include <spork.h>
#include <statsd_client.h>
#include <stats/client.h>
#include <streams.h>
#include <test/util/index.h>
#include <txdb.h>
@ -183,13 +183,7 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve
SetupNetworking();
InitSignatureCache();
InitScriptExecutionCache();
::g_stats_client = std::make_unique<statsd::StatsdClient>(
m_node.args->GetArg("-statshost", DEFAULT_STATSD_HOST),
m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
m_node.args->GetArg("-statsport", DEFAULT_STATSD_PORT),
m_node.args->GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),
m_node.args->GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)
);
::g_stats_client = InitStatsClient(*m_node.args);
m_node.chain = interfaces::MakeChain(m_node);
m_node.netgroupman = std::make_unique<NetGroupManager>(/*asmap=*/std::vector<bool>());

View File

@ -7,7 +7,7 @@
#include <consensus/validation.h>
#include <logging.h>
#include <policy/policy.h>
#include <statsd_client.h>
#include <stats/client.h>
#include <cassert>

View File

@ -66,7 +66,7 @@
#include <llmq/instantsend.h>
#include <llmq/chainlocks.h>
#include <statsd_client.h>
#include <stats/client.h>
#include <algorithm>
#include <cassert>

View File

@ -45,7 +45,6 @@ KNOWN_VIOLATIONS=(
"src/bitcoin-tx.cpp.*stoul"
"src/dbwrapper.cpp:.*vsnprintf"
"src/rest.cpp:.*strtol"
"src/statsd_client.cpp:.*snprintf"
"src/test/dbwrapper_tests.cpp:.*snprintf"
"src/test/fuzz/locale.cpp"
"src/test/fuzz/string.cpp"

View File

@ -29,7 +29,8 @@ src/rpc/quorums.cpp
src/saltedhasher.*
src/spork.*
src/stacktraces.*
src/statsd_client.*
src/stats/*.cpp
src/stats/*.h
src/test/block_reward_reallocation_tests.cpp
src/test/bls_tests.cpp
src/test/dip0020opcodes_tests.cpp