feat(stats): introduce support for queuing messages

This commit is contained in:
Kittywhiskers Van Gogh 2024-09-12 15:28:37 +00:00
parent fc4a736e2a
commit 38b1643fe6
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
6 changed files with 107 additions and 13 deletions

View File

@ -772,6 +772,7 @@ void SetupServerArgs(ArgsManager& argsman)
argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsduration=<ms>", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshost=<ip>", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshostname=<ip>", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsport=<port>", strprintf("Specify statsd port (default: %u)", DEFAULT_STATSD_PORT), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
@ -1540,8 +1541,9 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// regardless of whether transmitting stats are desirable or not and if
// g_stats_client isn't present when that attempt is made, the client will crash.
::g_stats_client = std::make_unique<StatsdClient>(args.GetArg("-statshost", DEFAULT_STATSD_HOST),
args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
args.GetArg("-statsport", DEFAULT_STATSD_PORT),
args.GetArg("-statsduration", DEFAULT_STATSD_DURATION),
args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),
args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE));

View File

@ -60,7 +60,7 @@ bool StatsdClient::ShouldSend(float sample_rate)
return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand);
}
StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename,
const std::string& ns, bool enabled) :
m_nodename{nodename},
m_ns{ns}
@ -71,7 +71,7 @@ StatsdClient::StatsdClient(const std::string& host, const std::string& nodename,
}
std::optional<std::string> error_opt;
m_sender = std::make_unique<RawSender>(host, port, error_opt);
m_sender = std::make_unique<RawSender>(host, port, interval_ms, error_opt);
if (error_opt.has_value()) {
LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value());
m_sender.reset();

View File

@ -21,15 +21,19 @@ static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"};
static const std::string DEFAULT_STATSD_HOSTNAME{""};
static const std::string DEFAULT_STATSD_NAMESPACE{""};
// schedule periodic measurements, in seconds: default - 1 minute, min - 5 sec, max - 1h.
/** Default number of milliseconds between flushing a queue of messages */
static constexpr int DEFAULT_STATSD_DURATION{1000};
/** Default number of seconds between recording periodic stats */
static constexpr int DEFAULT_STATSD_PERIOD{60};
/** Minimum number of seconds between recording periodic stats */
static constexpr int MIN_STATSD_PERIOD{5};
/** Maximum number of seconds between recording periodic stats */
static constexpr int MAX_STATSD_PERIOD{60 * 60};
class StatsdClient
{
public:
explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
explicit StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename,
const std::string& ns, bool enabled);
~StatsdClient();

View File

@ -8,10 +8,13 @@
#include <netaddress.h>
#include <netbase.h>
#include <util/sock.h>
#include <util/thread.h>
RawSender::RawSender(const std::string& host, uint16_t port, std::optional<std::string>& error) :
RawSender::RawSender(const std::string& host, uint16_t port, uint64_t interval_ms,
std::optional<std::string>& error) :
m_host{host},
m_port{port}
m_port{port},
m_interval_ms{interval_ms}
{
if (host.empty()) {
error = "No host specified";
@ -39,16 +42,42 @@ RawSender::RawSender(const std::string& host, uint16_t port, std::optional<std::
}
m_sock = std::make_unique<Sock>(hSocket);
LogPrintf("Started RawSender sending messages to %s:%d\n", m_host, m_port);
if (m_interval_ms == 0) {
LogPrintf("Send interval is zero, not starting RawSender queueing thread.\n");
} else {
m_interrupt.reset();
m_thread = std::thread(&util::TraceThread, "rawsender", [this] { QueueThreadMain(); });
}
LogPrintf("Started %sRawSender sending messages to %s:%d\n", m_thread.joinable() ? "threaded " : "", m_host, m_port);
}
RawSender::~RawSender()
{
LogPrintf("Stopping RawSender instance sending messages to %s:%d. %d successes, %d failures.\n",
// If there is a thread, interrupt and stop it
if (m_thread.joinable()) {
m_interrupt();
m_thread.join();
}
// Flush queue of uncommitted messages
QueueFlush();
LogPrintf("Stopped RawSender instance sending messages to %s:%d. %d successes, %d failures.\n",
m_host, m_port, m_successes, m_failures);
}
std::optional<std::string> RawSender::Send(const RawMessage& msg)
{
// If there is a thread, append to queue
if (m_thread.joinable()) {
QueueAdd(msg);
return std::nullopt;
}
// There isn't a queue, send directly
return SendDirectly(msg);
}
std::optional<std::string> RawSender::SendDirectly(const RawMessage& msg)
{
if (!m_sock) {
m_failures++;
@ -72,3 +101,42 @@ std::optional<std::string> RawSender::Send(const RawMessage& msg)
}
std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_host, m_port); }
void RawSender::QueueAdd(const RawMessage& msg)
{
AssertLockNotHeld(cs);
WITH_LOCK(cs, m_queue.push_back(msg));
}
void RawSender::QueueFlush()
{
AssertLockNotHeld(cs);
WITH_LOCK(cs, QueueFlush(m_queue));
}
void RawSender::QueueFlush(std::deque<RawMessage>& queue)
{
while (!queue.empty()) {
SendDirectly(queue.front());
queue.pop_front();
}
}
void RawSender::QueueThreadMain()
{
AssertLockNotHeld(cs);
while (!m_interrupt) {
// Swap the queues to commit the existing queue of messages
std::deque<RawMessage> queue;
WITH_LOCK(cs, m_queue.swap(queue));
// Flush the committed queue
QueueFlush(queue);
assert(queue.empty());
if (!m_interrupt.sleep_for(std::chrono::milliseconds(m_interval_ms))) {
return;
}
}
}

View File

@ -8,7 +8,9 @@
#include <compat.h>
#include <sync.h>
#include <threadinterrupt.h>
#include <deque>
#include <memory>
#include <optional>
#include <string>
@ -53,27 +55,44 @@ struct RawMessage : public std::vector<uint8_t>
class RawSender
{
public:
RawSender(const std::string& host, uint16_t port, std::optional<std::string>& error);
RawSender(const std::string& host, uint16_t port, uint64_t interval_ms, std::optional<std::string>& error);
~RawSender();
RawSender(const RawSender&) = delete;
RawSender& operator=(const RawSender&) = delete;
RawSender(RawSender&&) = delete;
std::optional<std::string> Send(const RawMessage& msg);
std::optional<std::string> Send(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs);
std::optional<std::string> SendDirectly(const RawMessage& msg);
std::string ToStringHostPort() const;
void QueueAdd(const RawMessage& msg) EXCLUSIVE_LOCKS_REQUIRED(!cs);
void QueueFlush() EXCLUSIVE_LOCKS_REQUIRED(!cs);
void QueueFlush(std::deque<RawMessage>& queue);
void QueueThreadMain() EXCLUSIVE_LOCKS_REQUIRED(!cs);
private:
/* Socket used to communicate with host */
std::unique_ptr<Sock> m_sock{nullptr};
/* Socket address containing host information */
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
/* Mutex to protect messages queue */
mutable Mutex cs;
/* Interrupt for queue processing thread */
CThreadInterrupt m_interrupt;
/* Queue of messages to be sent */
std::deque<RawMessage> m_queue GUARDED_BY(cs);
/* Thread that processes queue every m_interval_ms */
std::thread m_thread;
/* Hostname of server receiving messages */
const std::string m_host;
/* Port of server receiving messages */
const uint16_t m_port;
/* Time between queue thread runs (expressed in milliseconds) */
const uint64_t m_interval_ms;
/* Number of messages sent */
uint64_t m_successes{0};

View File

@ -185,8 +185,9 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve
InitScriptExecutionCache();
::g_stats_client = std::make_unique<StatsdClient>(
m_node.args->GetArg("-statshost", DEFAULT_STATSD_HOST),
m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
m_node.args->GetArg("-statsport", DEFAULT_STATSD_PORT),
m_node.args->GetArg("-statsduration", DEFAULT_STATSD_DURATION),
m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
m_node.args->GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),
m_node.args->GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)
);