diff --git a/contrib/devtools/copyright_header.py b/contrib/devtools/copyright_header.py index e6efff75fd..2b222e8ac7 100755 --- a/contrib/devtools/copyright_header.py +++ b/contrib/devtools/copyright_header.py @@ -26,7 +26,6 @@ EXCLUDE = [ 'src/crypto/*', 'src/ctpl_stl.h', 'src/reverse_iterator.h', - 'src/statsd_client.cpp', 'src/test/fuzz/FuzzedDataProvider.h', 'src/tinyformat.h', 'src/bench/nanobench.h', diff --git a/doc/release-notes-6267.md b/doc/release-notes-6267.md new file mode 100644 index 0000000000..73881366a3 --- /dev/null +++ b/doc/release-notes-6267.md @@ -0,0 +1,25 @@ +Statistics +----------- + +### New Features + +- The Statsd client now supports queueing and batching messages, reducing the number of packets and the rate at which + they are sent to the Statsd daemon. + +- The maximum size of each batch of messages (default, 1KiB) can be adjusted using `-statsbatchsize` (in bytes) + and the frequency at which queued messages are sent to the daemon (default, 1 second) can be adjusted using + `-statsduration` (in milliseconds) + - `-statsduration` has no bearing on `-statsperiod`, which dictates how frequently some stats are _collected_. + +### Deprecations + +- `-statsenabled` has been deprecated and enablement will now be implied by the presence of `-statshost`. `-statsenabled` + will be removed in a future release. + +- `-statshostname` has been deprecated and replaced with `-statssuffix` as the latter is better representative of the + argument's purpose. They behave identically to each other. `-statshostname` will be removed in a future + release. + +- `-statsns` has been deprecated and replaced with `-statsprefix` as the latter is better representative of the + argument's purpose. `-statsprefix`, unlike `-statsns`, will enforce the usage of a delimiter between the prefix + and key. `-statsns` will be removed in a future release. diff --git a/src/Makefile.am b/src/Makefile.am index 7ca5f7ce75..aa2960b20c 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -315,7 +315,8 @@ BITCOIN_CORE_H = \ spork.h \ stacktraces.h \ streams.h \ - statsd_client.h \ + stats/client.h \ + stats/rawsender.h \ support/allocators/mt_pooled_secure.h \ support/allocators/pool.h \ support/allocators/pooled_secure.h \ @@ -530,7 +531,8 @@ libbitcoin_server_a_SOURCES = \ script/sigcache.cpp \ shutdown.cpp \ spork.cpp \ - statsd_client.cpp \ + stats/client.cpp \ + stats/rawsender.cpp \ timedata.cpp \ torcontrol.cpp \ txdb.cpp \ diff --git a/src/init.cpp b/src/init.cpp index 773f4b54a2..86429fe62b 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -106,7 +106,7 @@ #include #include -#include +#include #include #include @@ -770,12 +770,16 @@ void SetupServerArgs(ArgsManager& argsman) argsman.AddArg("-rpcworkqueue=", strprintf("Set the depth of the work queue to service RPC calls (default: %d)", DEFAULT_HTTP_WORKQUEUE), ArgsManager::ALLOW_ANY | ArgsManager::DEBUG_ONLY, OptionsCategory::RPC); argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); - argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + hidden_args.emplace_back("-statsenabled"); + argsman.AddArg("-statsbatchsize=", strprintf("Specify the size of each batch of stats messages (default: %d)", DEFAULT_STATSD_BATCH_SIZE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + argsman.AddArg("-statsduration=", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statshost=", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); - argsman.AddArg("-statshostname=", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + hidden_args.emplace_back("-statshostname"); argsman.AddArg("-statsport=", strprintf("Specify statsd port (default: %u)", DEFAULT_STATSD_PORT), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); - argsman.AddArg("-statsns=", strprintf("Specify additional namespace prefix (default: %s)", DEFAULT_STATSD_NAMESPACE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + hidden_args.emplace_back("-statsns"); argsman.AddArg("-statsperiod=", strprintf("Specify the number of seconds between periodic measurements (default: %d)", DEFAULT_STATSD_PERIOD), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + argsman.AddArg("-statsprefix=", strprintf("Specify an optional string prepended to every stats key (default: %s)", DEFAULT_STATSD_PREFIX), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + argsman.AddArg("-statssuffix=", strprintf("Specify an optional string appended to every stats key (default: %s)", DEFAULT_STATSD_SUFFIX), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); #if HAVE_DECL_FORK argsman.AddArg("-daemon", strprintf("Run in the background as a daemon and accept commands (default: %d)", DEFAULT_DAEMON), ArgsManager::ALLOW_BOOL, OptionsCategory::OPTIONS); argsman.AddArg("-daemonwait", strprintf("Wait for initialization to be finished before exiting. This implies -daemon (default: %d)", DEFAULT_DAEMONWAIT), ArgsManager::ALLOW_BOOL, OptionsCategory::OPTIONS); @@ -835,7 +839,7 @@ static void StartupNotify(const ArgsManager& args) static void PeriodicStats(ArgsManager& args, ChainstateManager& chainman, const CTxMemPool& mempool) { - assert(args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)); + assert(::g_stats_client->active()); CCoinsStats stats{CoinStatsHashType::NONE}; chainman.ActiveChainstate().ForceFlushStateToDisk(); if (WITH_LOCK(cs_main, return GetUTXOStats(&chainman.ActiveChainstate().CoinsDB(), std::ref(chainman.m_blockman), stats, RpcInterruptionPoint, chainman.ActiveChain().Tip()))) { @@ -1538,11 +1542,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // We need to initialize g_stats_client early as currently, g_stats_client is called // regardless of whether transmitting stats are desirable or not and if // g_stats_client isn't present when that attempt is made, the client will crash. - ::g_stats_client = std::make_unique(args.GetArg("-statshost", DEFAULT_STATSD_HOST), - args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), - args.GetArg("-statsport", DEFAULT_STATSD_PORT), - args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE), - args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)); + ::g_stats_client = InitStatsClient(args); { @@ -2273,7 +2273,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) #endif // ENABLE_WALLET } - if (args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)) { + if (::g_stats_client->active()) { int nStatsPeriod = std::min(std::max((int)args.GetArg("-statsperiod", DEFAULT_STATSD_PERIOD), MIN_STATSD_PERIOD), MAX_STATSD_PERIOD); node.scheduler->scheduleEvery(std::bind(&PeriodicStats, std::ref(*node.args), std::ref(chainman), std::cref(*node.mempool)), std::chrono::seconds{nStatsPeriod}); } diff --git a/src/net.cpp b/src/net.cpp index a49132c8df..6619998749 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -42,7 +42,7 @@ #include #include -#include +#include #ifdef WIN32 #include @@ -2146,7 +2146,7 @@ void CConnman::NotifyNumConnectionsChanged(CMasternodeSync& mn_sync) void CConnman::CalculateNumConnectionsChangedStats() { - if (!gArgs.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)) { + if (!::g_stats_client->active()) { return; } @@ -4602,7 +4602,7 @@ void CConnman::RecordBytesRecv(uint64_t bytes) { nTotalBytesRecv += bytes; ::g_stats_client->count("bandwidth.bytesReceived", bytes, 0.1f); - ::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv, 0.01f); + ::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv.load(), 0.01f); } void CConnman::RecordBytesSent(uint64_t bytes) diff --git a/src/net_processing.cpp b/src/net_processing.cpp index f78889857d..bf7e590975 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -70,7 +70,7 @@ #include #include -#include +#include /** Maximum number of in-flight objects from a peer */ static constexpr int32_t MAX_PEER_OBJECT_IN_FLIGHT = 100; diff --git a/src/stats/client.cpp b/src/stats/client.cpp new file mode 100644 index 0000000000..c09dbc9bdf --- /dev/null +++ b/src/stats/client.cpp @@ -0,0 +1,182 @@ +// Copyright (c) 2014-2017 Statoshi Developers +// Copyright (c) 2017-2023 Vincent Thiery +// Copyright (c) 2020-2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include + +#include +#include +#include + +namespace { +/** Threshold below which a value is considered effectively zero */ +static constexpr float EPSILON{0.0001f}; + +/** Delimiter segmenting two fully formed Statsd messages */ +static constexpr char STATSD_MSG_DELIMITER{'\n'}; +/** Delimiter segmenting namespaces in a Statsd key */ +static constexpr char STATSD_NS_DELIMITER{'.'}; +/** Character used to denote Statsd message type as count */ +static constexpr char STATSD_METRIC_COUNT[]{"c"}; +/** Character used to denote Statsd message type as gauge */ +static constexpr char STATSD_METRIC_GAUGE[]{"g"}; +/** Characters used to denote Statsd message type as timing */ +static constexpr char STATSD_METRIC_TIMING[]{"ms"}; +} // anonymous namespace + +std::unique_ptr g_stats_client; + +std::unique_ptr InitStatsClient(const ArgsManager& args) +{ + auto is_enabled = args.GetBoolArg("-statsenabled", /*fDefault=*/false); + auto host = args.GetArg("-statshost", /*fDefault=*/""); + + if (is_enabled && host.empty()) { + // Stats are enabled but host has not been specified, then use + // default host. This is to preserve old behavior. + host = DEFAULT_STATSD_HOST; + } else if (!host.empty()) { + // Host is specified but stats are not explcitly enabled. Assume + // that if a host has been specified, we want stats enabled. This + // is new behaviour and will substitute old behaviour in a future + // release. + is_enabled = true; + } + + auto sanitize_string = [](std::string& string) { + // Remove key delimiters from the front and back as they're added back + if (!string.empty()) { + if (string.front() == STATSD_NS_DELIMITER) string.erase(string.begin()); + if (string.back() == STATSD_NS_DELIMITER) string.pop_back(); + } + }; + + // Get our prefix and suffix and if we get nothing, try again with the + // deprecated argument. If we still get nothing, that's fine, they're optional. + auto prefix = args.GetArg("-statsprefix", DEFAULT_STATSD_PREFIX); + if (prefix.empty()) { + prefix = args.GetArg("-statsns", DEFAULT_STATSD_PREFIX); + } else { + // We restrict sanitization logic to our newly added arguments to + // prevent breaking changes. + sanitize_string(prefix); + // We need to add the delimiter here for backwards compatibility with + // the deprecated argument. + // + // TODO: Move this step into the constructor when removing deprecated + // args support + prefix += STATSD_NS_DELIMITER; + } + + auto suffix = args.GetArg("-statssuffix", DEFAULT_STATSD_SUFFIX); + if (suffix.empty()) { + suffix = args.GetArg("-statshostname", DEFAULT_STATSD_SUFFIX); + } else { + // We restrict sanitization logic to our newly added arguments to + // prevent breaking changes. + sanitize_string(suffix); + } + + return std::make_unique( + host, + args.GetArg("-statsport", DEFAULT_STATSD_PORT), + args.GetArg("-statsbatchsize", DEFAULT_STATSD_BATCH_SIZE), + args.GetArg("-statsduration", DEFAULT_STATSD_DURATION), + prefix, + suffix, + is_enabled + ); +} + +StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms, + const std::string& prefix, const std::string& suffix, bool enabled) : + m_prefix{prefix}, + m_suffix{[suffix]() { return !suffix.empty() ? STATSD_NS_DELIMITER + suffix : suffix; }()} +{ + if (!enabled) { + LogPrintf("Transmitting stats are disabled, will not init StatsdClient\n"); + return; + } + + std::optional error_opt; + m_sender = std::make_unique(host, port, + std::make_pair(batch_size, static_cast(STATSD_MSG_DELIMITER)), + interval_ms, error_opt); + if (error_opt.has_value()) { + LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value()); + m_sender.reset(); + return; + } + + LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", host, port); +} + +StatsdClient::~StatsdClient() {} + +bool StatsdClient::dec(const std::string& key, float sample_rate) { return count(key, -1, sample_rate); } + +bool StatsdClient::inc(const std::string& key, float sample_rate) { return count(key, 1, sample_rate); } + +bool StatsdClient::count(const std::string& key, int64_t delta, float sample_rate) +{ + return send(key, delta, STATSD_METRIC_COUNT, sample_rate); +} + +bool StatsdClient::gauge(const std::string& key, int64_t value, float sample_rate) +{ + return send(key, value, STATSD_METRIC_GAUGE, sample_rate); +} + +bool StatsdClient::gaugeDouble(const std::string& key, double value, float sample_rate) +{ + return send(key, value, STATSD_METRIC_GAUGE, sample_rate); +} + +bool StatsdClient::timing(const std::string& key, uint64_t ms, float sample_rate) +{ + return send(key, ms, STATSD_METRIC_TIMING, sample_rate); +} + +template +bool StatsdClient::send(const std::string& key, T1 value, const std::string& type, float sample_rate) +{ + static_assert(std::is_arithmetic::value, "Must specialize to an arithmetic type"); + + if (!m_sender) { + return false; + } + + // Determine if we should send the message at all but claim that we did even if we don't + sample_rate = std::clamp(sample_rate, 0.f, 1.f); + bool always_send = std::fabs(sample_rate - 1.f) < EPSILON; + bool never_send = std::fabs(sample_rate) < EPSILON; + if (never_send || (!always_send && + WITH_LOCK(cs, return sample_rate < std::uniform_real_distribution(0.f, 1.f)(insecure_rand)))) { + return true; + } + + // Construct the message and if our message isn't always-send, report the sample rate + RawMessage msg{strprintf("%s%s%s:%f|%s", m_prefix, key, m_suffix, value, type)}; + if (!always_send) { + msg += strprintf("|@%.2f", sample_rate); + } + + // Send it and report an error if we encounter one + if (auto error_opt = m_sender->Send(msg); error_opt.has_value()) { + LogPrintf("ERROR: %s.\n", error_opt.value()); + return false; + } + + return true; +} + +template bool StatsdClient::send(const std::string& key, double value, const std::string& type, float sample_rate); +template bool StatsdClient::send(const std::string& key, int32_t value, const std::string& type, float sample_rate); +template bool StatsdClient::send(const std::string& key, int64_t value, const std::string& type, float sample_rate); +template bool StatsdClient::send(const std::string& key, uint32_t value, const std::string& type, float sample_rate); +template bool StatsdClient::send(const std::string& key, uint64_t value, const std::string& type, float sample_rate); diff --git a/src/stats/client.h b/src/stats/client.h new file mode 100644 index 0000000000..0fa8538260 --- /dev/null +++ b/src/stats/client.h @@ -0,0 +1,83 @@ +// Copyright (c) 2014-2017 Statoshi Developers +// Copyright (c) 2017-2023 Vincent Thiery +// Copyright (c) 2020-2023 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_STATS_CLIENT_H +#define BITCOIN_STATS_CLIENT_H + +#include +#include + +#include +#include + +class ArgsManager; +class RawSender; + +/** Default port used to connect to a Statsd server */ +static constexpr uint16_t DEFAULT_STATSD_PORT{8125}; +/** Default host assumed to be running a Statsd server */ +static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"}; +/** Default prefix prepended to Statsd message keys */ +static const std::string DEFAULT_STATSD_PREFIX{""}; +/** Default suffix appended to Statsd message keys */ +static const std::string DEFAULT_STATSD_SUFFIX{""}; + +/** Default number of milliseconds between flushing a queue of messages */ +static constexpr int DEFAULT_STATSD_DURATION{1000}; +/** Default number of seconds between recording periodic stats */ +static constexpr int DEFAULT_STATSD_PERIOD{60}; +/** Default size in bytes of a batch of messages */ +static constexpr int DEFAULT_STATSD_BATCH_SIZE{1024}; +/** Minimum number of seconds between recording periodic stats */ +static constexpr int MIN_STATSD_PERIOD{5}; +/** Maximum number of seconds between recording periodic stats */ +static constexpr int MAX_STATSD_PERIOD{60 * 60}; + +class StatsdClient +{ +public: + explicit StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms, + const std::string& prefix, const std::string& suffix, bool enabled); + ~StatsdClient(); + +public: + /* Statsd-defined APIs */ + bool dec(const std::string& key, float sample_rate = 1.f); + bool inc(const std::string& key, float sample_rate = 1.f); + bool count(const std::string& key, int64_t delta, float sample_rate = 1.f); + bool gauge(const std::string& key, int64_t value, float sample_rate = 1.f); + bool gaugeDouble(const std::string& key, double value, float sample_rate = 1.f); + bool timing(const std::string& key, uint64_t ms, float sample_rate = 1.f); + + /* Statsd-compatible APIs */ + template + bool send(const std::string& key, T1 value, const std::string& type, float sample_rate = 1.f); + + /* Check if a StatsdClient instance is ready to send messages */ + bool active() const { return m_sender != nullptr; } + +private: + /* Mutex to protect PRNG */ + mutable Mutex cs; + /* PRNG used to dice-roll messages that are 0 < f < 1 */ + mutable FastRandomContext insecure_rand GUARDED_BY(cs); + + /* Broadcasts messages crafted by StatsdClient */ + std::unique_ptr m_sender{nullptr}; + + /* Phrase prepended to keys */ + const std::string m_prefix{""}; + /* Phrase appended to keys */ + const std::string m_suffix{""}; +}; + +/** Parses arguments and constructs a StatsdClient instance */ +std::unique_ptr InitStatsClient(const ArgsManager& args); + +/** Global smart pointer containing StatsdClient instance */ +extern std::unique_ptr g_stats_client; + +#endif // BITCOIN_STATS_CLIENT_H diff --git a/src/stats/rawsender.cpp b/src/stats/rawsender.cpp new file mode 100644 index 0000000000..eb423e37d5 --- /dev/null +++ b/src/stats/rawsender.cpp @@ -0,0 +1,163 @@ +// Copyright (c) 2017-2023 Vincent Thiery +// Copyright (c) 2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#include + +#include +#include +#include +#include + +RawSender::RawSender(const std::string& host, uint16_t port, std::pair batching_opts, + uint64_t interval_ms, std::optional& error) : + m_host{host}, + m_port{port}, + m_batching_opts{batching_opts}, + m_interval_ms{interval_ms} +{ + if (host.empty()) { + error = "No host specified"; + return; + } + + if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) { + if (!netaddr->IsIPv4()) { + error = strprintf("Host %s on unsupported network", m_host); + return; + } + if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast(&m_server.first), &m_server.second)) { + error = strprintf("Cannot get socket address for %s", m_host); + return; + } + } else { + error = strprintf("Unable to lookup host %s", m_host); + return; + } + + SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); + if (hSocket == INVALID_SOCKET) { + error = strprintf("Cannot create socket (socket() returned error %s)", NetworkErrorString(WSAGetLastError())); + return; + } + m_sock = std::make_unique(hSocket); + + if (m_interval_ms == 0) { + LogPrintf("Send interval is zero, not starting RawSender queueing thread.\n"); + } else { + m_interrupt.reset(); + m_thread = std::thread(&util::TraceThread, "rawsender", [this] { QueueThreadMain(); }); + } + + LogPrintf("Started %sRawSender sending messages to %s:%d\n", m_thread.joinable() ? "threaded " : "", m_host, m_port); +} + +RawSender::~RawSender() +{ + // If there is a thread, interrupt and stop it + if (m_thread.joinable()) { + m_interrupt(); + m_thread.join(); + } + // Flush queue of uncommitted messages + QueueFlush(); + + LogPrintf("Stopped RawSender instance sending messages to %s:%d. %d successes, %d failures.\n", + m_host, m_port, m_successes, m_failures); +} + +std::optional RawSender::Send(const RawMessage& msg) +{ + // If there is a thread, append to queue + if (m_thread.joinable()) { + QueueAdd(msg); + return std::nullopt; + } + // There isn't a queue, send directly + return SendDirectly(msg); +} + +std::optional RawSender::SendDirectly(const RawMessage& msg) +{ + if (!m_sock) { + m_failures++; + return "Socket not initialized, cannot send message"; + } + + if (::sendto(m_sock->Get(), reinterpret_cast(msg.data()), +#ifdef WIN32 + static_cast(msg.size()), +#else + msg.size(), +#endif // WIN32 + /*flags=*/0, reinterpret_cast(&m_server.first), m_server.second) == SOCKET_ERROR) { + m_failures++; + return strprintf("Unable to send message to %s (sendto() returned error %s)", this->ToStringHostPort(), + NetworkErrorString(WSAGetLastError())); + } + + m_successes++; + return std::nullopt; +} + +std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); } + +void RawSender::QueueAdd(const RawMessage& msg) +{ + AssertLockNotHeld(cs); + LOCK(cs); + + const auto& [batch_size, batch_delim] = m_batching_opts; + // If no batch size has been specified, simply add to queue + if (batch_size == 0) { + m_queue.push_back(msg); + return; + } + + // We can batch, either create a new batch in queue or append to existing batch in queue + if (m_queue.empty() || m_queue.back().size() + msg.size() >= batch_size) { + // Either we don't have a place to batch our message or we exceeded the batch size, make a new batch + m_queue.emplace_back(); + m_queue.back().reserve(batch_size); + } else if (!m_queue.back().empty()) { + // When there is already a batch open we need a delimiter when its not empty + m_queue.back() += batch_delim; + } + + // Add the new message to the batch + m_queue.back() += msg; +} + +void RawSender::QueueFlush() +{ + AssertLockNotHeld(cs); + WITH_LOCK(cs, QueueFlush(m_queue)); +} + +void RawSender::QueueFlush(std::deque& queue) +{ + while (!queue.empty()) { + SendDirectly(queue.front()); + queue.pop_front(); + } +} + +void RawSender::QueueThreadMain() +{ + AssertLockNotHeld(cs); + + while (!m_interrupt) { + // Swap the queues to commit the existing queue of messages + std::deque queue; + WITH_LOCK(cs, m_queue.swap(queue)); + + // Flush the committed queue + QueueFlush(queue); + assert(queue.empty()); + + if (!m_interrupt.sleep_for(std::chrono::milliseconds(m_interval_ms))) { + return; + } + } +} diff --git a/src/stats/rawsender.h b/src/stats/rawsender.h new file mode 100644 index 0000000000..d91f6c0def --- /dev/null +++ b/src/stats/rawsender.h @@ -0,0 +1,106 @@ +// Copyright (c) 2017-2023 Vincent Thiery +// Copyright (c) 2024 The Dash Core developers +// Distributed under the MIT software license, see the accompanying +// file COPYING or http://www.opensource.org/licenses/mit-license.php. + +#ifndef BITCOIN_STATS_RAWSENDER_H +#define BITCOIN_STATS_RAWSENDER_H + +#include +#include +#include + +#include +#include +#include +#include +#include + +class Sock; + +struct RawMessage : public std::vector +{ + using parent_type = std::vector; + using parent_type::parent_type; + + explicit RawMessage(const std::string& data) : parent_type{data.begin(), data.end()} {} + + parent_type& operator+=(value_type rhs) { return append(rhs); } + parent_type& operator+=(std::string::value_type rhs) { return append(rhs); } + parent_type& operator+=(const parent_type& rhs) { return append(rhs); } + parent_type& operator+=(const std::string& rhs) { return append(rhs); } + + parent_type& append(value_type rhs) + { + push_back(rhs); + return *this; + } + parent_type& append(std::string::value_type rhs) + { + push_back(static_cast(rhs)); + return *this; + } + parent_type& append(const parent_type& rhs) + { + insert(end(), rhs.begin(), rhs.end()); + return *this; + } + parent_type& append(const std::string& rhs) + { + insert(end(), rhs.begin(), rhs.end()); + return *this; + } +}; + +class RawSender +{ +public: + RawSender(const std::string& host, uint16_t port, std::pair batching_opts, + uint64_t interval_ms, std::optional& error); + ~RawSender(); + + RawSender(const RawSender&) = delete; + RawSender& operator=(const RawSender&) = delete; + RawSender(RawSender&&) = delete; + + std::optional Send(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs); + std::optional SendDirectly(const RawMessage& msg); + + std::string ToStringHostPort() const; + + void QueueAdd(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs); + void QueueFlush() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void QueueFlush(std::deque& queue); + void QueueThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs); + +private: + /* Socket used to communicate with host */ + std::unique_ptr m_sock{nullptr}; + /* Socket address containing host information */ + std::pair m_server{{}, sizeof(struct sockaddr_storage)}; + + /* Mutex to protect (batches of) messages queue */ + mutable Mutex cs; + /* Interrupt for queue processing thread */ + CThreadInterrupt m_interrupt; + /* Queue of (batches of) messages to be sent */ + std::deque m_queue GUARDED_BY(cs); + /* Thread that processes queue every m_interval_ms */ + std::thread m_thread; + + /* Hostname of server receiving messages */ + const std::string m_host; + /* Port of server receiving messages */ + const uint16_t m_port; + /* Batching parameters */ + const std::pair m_batching_opts{0, 0}; + /* Time between queue thread runs (expressed in milliseconds) */ + const uint64_t m_interval_ms; + + /* Number of messages sent */ + uint64_t m_successes{0}; + /* Number of messages not sent */ + uint64_t m_failures{0}; +}; + +#endif // BITCOIN_STATS_RAWSENDER_H diff --git a/src/statsd_client.cpp b/src/statsd_client.cpp deleted file mode 100644 index d9ef85242b..0000000000 --- a/src/statsd_client.cpp +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright (c) 2014-2017 Statoshi Developers -// Copyright (c) 2020-2024 The Dash Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -/** -Copyright (c) 2014, Rex -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of the {organization} nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -**/ - -#include - -#include -#include -#include - -#include -#include -#include - -std::unique_ptr g_stats_client; - -namespace statsd { -bool StatsdClient::ShouldSend(float sample_rate) -{ - sample_rate = std::clamp(sample_rate, 0.f, 1.f); - - constexpr float EPSILON{0.0001f}; - /* If sample rate is 1, we should always send */ - if (std::fabs(sample_rate - 1.f) < EPSILON) return true; - /* If sample rate is 0, we should never send */ - if (std::fabs(sample_rate) < EPSILON) return false; - - /* Sample rate is >0 and <1, roll the dice */ - LOCK(cs); - return sample_rate > std::uniform_real_distribution(0.f, 1.f)(insecure_rand); -} - -StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, const std::string& ns, - bool enabled) : - m_port{port}, - m_host{host}, - m_nodename{nodename}, - m_ns{ns} -{ - if (!enabled) { - LogPrintf("Transmitting stats are disabled, will not init StatsdClient\n"); - return; - } - - if (auto netaddr = LookupHost(m_host, /*fAllowLookup=*/true); netaddr.has_value()) { - if (!netaddr->IsIPv4()) { - LogPrintf("ERROR: Host %s on unsupported network, cannot init StatsdClient\n", m_host); - return; - } - if (!CService(*netaddr, port).GetSockAddr(reinterpret_cast(&m_server.first), &m_server.second)) { - LogPrintf("ERROR: Cannot get socket address for %s, cannot init StatsdClient\n", m_host); - return; - } - } else { - LogPrintf("Unable to lookup host %s, cannot init StatsdClient\n", m_host); - return; - } - - SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); - if (hSocket == INVALID_SOCKET) { - LogPrintf("ERROR: Cannot create socket (socket() returned error %s), cannot init StatsdClient\n", - NetworkErrorString(WSAGetLastError())); - return; - } - m_sock = std::make_unique(hSocket); - - LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", m_host, m_port); -} - -/* will change the original string */ -void StatsdClient::cleanup(std::string& key) -{ - auto pos = key.find_first_of(":|@"); - while (pos != std::string::npos) { - key[pos] = '_'; - pos = key.find_first_of(":|@"); - } -} - -bool StatsdClient::dec(const std::string& key, float sample_rate) { return count(key, -1, sample_rate); } - -bool StatsdClient::inc(const std::string& key, float sample_rate) { return count(key, 1, sample_rate); } - -bool StatsdClient::count(const std::string& key, int64_t value, float sample_rate) -{ - return send(key, value, "c", sample_rate); -} - -bool StatsdClient::gauge(const std::string& key, int64_t value, float sample_rate) -{ - return send(key, value, "g", sample_rate); -} - -bool StatsdClient::gaugeDouble(const std::string& key, double value, float sample_rate) -{ - return sendDouble(key, value, "g", sample_rate); -} - -bool StatsdClient::timing(const std::string& key, int64_t ms, float sample_rate) -{ - return send(key, ms, "ms", sample_rate); -} - -bool StatsdClient::send(std::string key, int64_t value, const std::string& type, float sample_rate) -{ - if (!m_sock) { - return false; - } - - if (!ShouldSend(sample_rate)) { - // Not our turn to send but report that we have - return true; - } - - // partition stats by node name if set - if (!m_nodename.empty()) key = key + "." + m_nodename; - - cleanup(key); - - std::string buf{strprintf("%s%s:%d|%s", m_ns, key, value, type)}; - if (sample_rate < 1.f) { - buf += strprintf("|@%.2f", sample_rate); - } - - return send(buf); -} - -bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate) -{ - if (!m_sock) { - return false; - } - - if (!ShouldSend(sample_rate)) { - // Not our turn to send but report that we have - return true; - } - - // partition stats by node name if set - if (!m_nodename.empty()) key = key + "." + m_nodename; - - cleanup(key); - - std::string buf{strprintf("%s%s:%f|%s", m_ns, key, value, type)}; - if (sample_rate < 1.f) { - buf += strprintf("|@%.2f", sample_rate); - } - - return send(buf); -} - -bool StatsdClient::send(const std::string& message) -{ - assert(m_sock); - - if (::sendto(m_sock->Get(), message.data(), message.size(), /*flags=*/0, - reinterpret_cast(&m_server.first), m_server.second) == SOCKET_ERROR) { - LogPrintf("ERROR: Unable to send message (sendto() returned error %s), host=%s:%d\n", - NetworkErrorString(WSAGetLastError()), m_host, m_port); - return false; - } - - return true; -} -} // namespace statsd diff --git a/src/statsd_client.h b/src/statsd_client.h deleted file mode 100644 index 7568d8cd0e..0000000000 --- a/src/statsd_client.h +++ /dev/null @@ -1,73 +0,0 @@ -// Copyright (c) 2014-2017 Statoshi Developers -// Copyright (c) 2020-2023 The Dash Core developers -// Distributed under the MIT software license, see the accompanying -// file COPYING or http://www.opensource.org/licenses/mit-license.php. - -#ifndef BITCOIN_STATSD_CLIENT_H -#define BITCOIN_STATSD_CLIENT_H - -#include -#include -#include - -#include -#include - -static constexpr bool DEFAULT_STATSD_ENABLE{false}; -static constexpr uint16_t DEFAULT_STATSD_PORT{8125}; -static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"}; -static const std::string DEFAULT_STATSD_HOSTNAME{""}; -static const std::string DEFAULT_STATSD_NAMESPACE{""}; - -// schedule periodic measurements, in seconds: default - 1 minute, min - 5 sec, max - 1h. -static constexpr int DEFAULT_STATSD_PERIOD{60}; -static constexpr int MIN_STATSD_PERIOD{5}; -static constexpr int MAX_STATSD_PERIOD{60 * 60}; - -namespace statsd { -class StatsdClient { - public: - explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, - const std::string& ns, bool enabled); - - public: - bool inc(const std::string& key, float sample_rate = 1.f); - bool dec(const std::string& key, float sample_rate = 1.f); - bool count(const std::string& key, int64_t value, float sample_rate = 1.f); - bool gauge(const std::string& key, int64_t value, float sample_rate = 1.f); - bool gaugeDouble(const std::string& key, double value, float sample_rate = 1.f); - bool timing(const std::string& key, int64_t ms, float sample_rate = 1.f); - - /* (Low Level Api) manually send a message - * type = "c", "g" or "ms" - */ - bool send(std::string key, int64_t value, const std::string& type, float sample_rate); - bool sendDouble(std::string key, double value, const std::string& type, float sample_rate); - - private: - /** - * (Low Level Api) manually send a message - * which might be composed of several lines. - */ - bool send(const std::string& message); - - void cleanup(std::string& key); - bool ShouldSend(float sample_rate); - - private: - mutable Mutex cs; - mutable FastRandomContext insecure_rand GUARDED_BY(cs); - - std::unique_ptr m_sock{nullptr}; - std::pair m_server{{}, sizeof(struct sockaddr_storage)}; - - const uint16_t m_port; - const std::string m_host; - const std::string m_nodename; - const std::string m_ns; -}; -} // namespace statsd - -extern std::unique_ptr g_stats_client; - -#endif // BITCOIN_STATSD_CLIENT_H diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 74770c301f..b8fba9dd06 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -39,7 +39,7 @@ #include #include