From 38b1643fe6de4b84465dd8331db4c73267feddcc Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 12 Sep 2024 15:28:37 +0000 Subject: [PATCH] feat(stats): introduce support for queuing messages --- src/init.cpp | 4 +- src/stats/client.cpp | 4 +- src/stats/client.h | 10 +++-- src/stats/rawsender.cpp | 76 ++++++++++++++++++++++++++++++++-- src/stats/rawsender.h | 23 +++++++++- src/test/util/setup_common.cpp | 3 +- 6 files changed, 107 insertions(+), 13 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 1afff5b6de..c847b4bfb6 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -772,6 +772,7 @@ void SetupServerArgs(ArgsManager& argsman) argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + argsman.AddArg("-statsduration=", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statshost=", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statshostname=", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statsport=", strprintf("Specify statsd port (default: %u)", DEFAULT_STATSD_PORT), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); @@ -1540,8 +1541,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // regardless of whether transmitting stats are desirable or not and if // g_stats_client isn't present when that attempt is made, the client will crash. ::g_stats_client = std::make_unique(args.GetArg("-statshost", DEFAULT_STATSD_HOST), - args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), args.GetArg("-statsport", DEFAULT_STATSD_PORT), + args.GetArg("-statsduration", DEFAULT_STATSD_DURATION), + args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE), args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)); diff --git a/src/stats/client.cpp b/src/stats/client.cpp index f89b283eab..ba9fa60f7c 100644 --- a/src/stats/client.cpp +++ b/src/stats/client.cpp @@ -60,7 +60,7 @@ bool StatsdClient::ShouldSend(float sample_rate) return sample_rate > std::uniform_real_distribution(0.f, 1.f)(insecure_rand); } -StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, +StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename, const std::string& ns, bool enabled) : m_nodename{nodename}, m_ns{ns} @@ -71,7 +71,7 @@ StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, } std::optional error_opt; - m_sender = std::make_unique(host, port, error_opt); + m_sender = std::make_unique(host, port, interval_ms, error_opt); if (error_opt.has_value()) { LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value()); m_sender.reset(); diff --git a/src/stats/client.h b/src/stats/client.h index 32780de7fc..0cad8bc600 100644 --- a/src/stats/client.h +++ b/src/stats/client.h @@ -21,16 +21,20 @@ static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"}; static const std::string DEFAULT_STATSD_HOSTNAME{""}; static const std::string DEFAULT_STATSD_NAMESPACE{""}; -// schedule periodic measurements, in seconds: default - 1 minute, min - 5 sec, max - 1h. +/** Default number of milliseconds between flushing a queue of messages */ +static constexpr int DEFAULT_STATSD_DURATION{1000}; +/** Default number of seconds between recording periodic stats */ static constexpr int DEFAULT_STATSD_PERIOD{60}; +/** Minimum number of seconds between recording periodic stats */ static constexpr int MIN_STATSD_PERIOD{5}; +/** Maximum number of seconds between recording periodic stats */ static constexpr int MAX_STATSD_PERIOD{60 * 60}; class StatsdClient { public: - explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, - const std::string& ns, bool enabled); + explicit StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename, + const std::string& ns, bool enabled); ~StatsdClient(); public: diff --git a/src/stats/rawsender.cpp b/src/stats/rawsender.cpp index 98b42464b1..0c2f10ffa6 100644 --- a/src/stats/rawsender.cpp +++ b/src/stats/rawsender.cpp @@ -8,10 +8,13 @@ #include #include #include +#include -RawSender::RawSender(const std::string& host, uint16_t port, std::optional& error) : +RawSender::RawSender(const std::string& host, uint16_t port, uint64_t interval_ms, + std::optional& error) : m_host{host}, - m_port{port} + m_port{port}, + m_interval_ms{interval_ms} { if (host.empty()) { error = "No host specified"; @@ -39,16 +42,42 @@ RawSender::RawSender(const std::string& host, uint16_t port, std::optional(hSocket); - LogPrintf("Started RawSender sending messages to %s:%d\n", m_host, m_port); + if (m_interval_ms == 0) { + LogPrintf("Send interval is zero, not starting RawSender queueing thread.\n"); + } else { + m_interrupt.reset(); + m_thread = std::thread(&util::TraceThread, "rawsender", [this] { QueueThreadMain(); }); + } + + LogPrintf("Started %sRawSender sending messages to %s:%d\n", m_thread.joinable() ? "threaded " : "", m_host, m_port); } RawSender::~RawSender() { - LogPrintf("Stopping RawSender instance sending messages to %s:%d. %d successes, %d failures.\n", + // If there is a thread, interrupt and stop it + if (m_thread.joinable()) { + m_interrupt(); + m_thread.join(); + } + // Flush queue of uncommitted messages + QueueFlush(); + + LogPrintf("Stopped RawSender instance sending messages to %s:%d. %d successes, %d failures.\n", m_host, m_port, m_successes, m_failures); } std::optional RawSender::Send(const RawMessage& msg) +{ + // If there is a thread, append to queue + if (m_thread.joinable()) { + QueueAdd(msg); + return std::nullopt; + } + // There isn't a queue, send directly + return SendDirectly(msg); +} + +std::optional RawSender::SendDirectly(const RawMessage& msg) { if (!m_sock) { m_failures++; @@ -72,3 +101,42 @@ std::optional RawSender::Send(const RawMessage& msg) } std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); } + +void RawSender::QueueAdd(const RawMessage& msg) +{ + AssertLockNotHeld(cs); + WITH_LOCK(cs, m_queue.push_back(msg)); +} + +void RawSender::QueueFlush() +{ + AssertLockNotHeld(cs); + WITH_LOCK(cs, QueueFlush(m_queue)); +} + +void RawSender::QueueFlush(std::deque& queue) +{ + while (!queue.empty()) { + SendDirectly(queue.front()); + queue.pop_front(); + } +} + +void RawSender::QueueThreadMain() +{ + AssertLockNotHeld(cs); + + while (!m_interrupt) { + // Swap the queues to commit the existing queue of messages + std::deque queue; + WITH_LOCK(cs, m_queue.swap(queue)); + + // Flush the committed queue + QueueFlush(queue); + assert(queue.empty()); + + if (!m_interrupt.sleep_for(std::chrono::milliseconds(m_interval_ms))) { + return; + } + } +} diff --git a/src/stats/rawsender.h b/src/stats/rawsender.h index dd12f14635..24dadcde94 100644 --- a/src/stats/rawsender.h +++ b/src/stats/rawsender.h @@ -8,7 +8,9 @@ #include #include +#include +#include #include #include #include @@ -53,27 +55,44 @@ struct RawMessage : public std::vector class RawSender { public: - RawSender(const std::string& host, uint16_t port, std::optional& error); + RawSender(const std::string& host, uint16_t port, uint64_t interval_ms, std::optional& error); ~RawSender(); RawSender(const RawSender&) = delete; RawSender& operator=(const RawSender&) = delete; RawSender(RawSender&&) = delete; - std::optional Send(const RawMessage& msg); + std::optional Send(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs); + std::optional SendDirectly(const RawMessage& msg); std::string ToStringHostPort() const; + void QueueAdd(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs); + void QueueFlush() EXCLUSIVE_LOCKS_REQUIRED(!cs); + void QueueFlush(std::deque& queue); + void QueueThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs); + private: /* Socket used to communicate with host */ std::unique_ptr m_sock{nullptr}; /* Socket address containing host information */ std::pair m_server{{}, sizeof(struct sockaddr_storage)}; + /* Mutex to protect messages queue */ + mutable Mutex cs; + /* Interrupt for queue processing thread */ + CThreadInterrupt m_interrupt; + /* Queue of messages to be sent */ + std::deque m_queue GUARDED_BY(cs); + /* Thread that processes queue every m_interval_ms */ + std::thread m_thread; + /* Hostname of server receiving messages */ const std::string m_host; /* Port of server receiving messages */ const uint16_t m_port; + /* Time between queue thread runs (expressed in milliseconds) */ + const uint64_t m_interval_ms; /* Number of messages sent */ uint64_t m_successes{0}; diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 04bcfe43bb..67ce533d8b 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -185,8 +185,9 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve InitScriptExecutionCache(); ::g_stats_client = std::make_unique( m_node.args->GetArg("-statshost", DEFAULT_STATSD_HOST), - m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), m_node.args->GetArg("-statsport", DEFAULT_STATSD_PORT), + m_node.args->GetArg("-statsduration", DEFAULT_STATSD_DURATION), + m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), m_node.args->GetArg("-statsns", DEFAULT_STATSD_NAMESPACE), m_node.args->GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE) );