feat(stats): introduce support for batching messages

Co-authored-by: UdjinM6 <UdjinM6@users.noreply.github.com>
This commit is contained in:
Kittywhiskers Van Gogh 2024-09-12 15:24:37 +00:00
parent 38b1643fe6
commit bf44fc3bf6
No known key found for this signature in database
GPG Key ID: 30CD0C065E5C4AAD
6 changed files with 45 additions and 11 deletions

View File

@ -772,6 +772,7 @@ void SetupServerArgs(ArgsManager& argsman)
argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC);
argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsbatchsize=<bytes>", strprintf("Specify the size of each batch of stats messages (default: %d)", DEFAULT_STATSD_BATCH_SIZE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statsduration=<ms>", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statsduration=<ms>", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshost=<ip>", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statshost=<ip>", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
argsman.AddArg("-statshostname=<ip>", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statshostname=<ip>", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD);
@ -1542,6 +1543,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
// g_stats_client isn't present when that attempt is made, the client will crash. // g_stats_client isn't present when that attempt is made, the client will crash.
::g_stats_client = std::make_unique<StatsdClient>(args.GetArg("-statshost", DEFAULT_STATSD_HOST), ::g_stats_client = std::make_unique<StatsdClient>(args.GetArg("-statshost", DEFAULT_STATSD_HOST),
args.GetArg("-statsport", DEFAULT_STATSD_PORT), args.GetArg("-statsport", DEFAULT_STATSD_PORT),
args.GetArg("-statsbatchsize", DEFAULT_STATSD_BATCH_SIZE),
args.GetArg("-statsduration", DEFAULT_STATSD_DURATION), args.GetArg("-statsduration", DEFAULT_STATSD_DURATION),
args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE), args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),

View File

@ -43,6 +43,9 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <cstdio> #include <cstdio>
#include <random> #include <random>
/** Delimiter segmenting two fully formed Statsd messages */
static constexpr char STATSD_MSG_DELIMITER{'\n'};
std::unique_ptr<StatsdClient> g_stats_client; std::unique_ptr<StatsdClient> g_stats_client;
bool StatsdClient::ShouldSend(float sample_rate) bool StatsdClient::ShouldSend(float sample_rate)
@ -60,8 +63,8 @@ bool StatsdClient::ShouldSend(float sample_rate)
return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand); return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand);
} }
StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename, StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms,
const std::string& ns, bool enabled) : const std::string& nodename, const std::string& ns, bool enabled) :
m_nodename{nodename}, m_nodename{nodename},
m_ns{ns} m_ns{ns}
{ {
@ -71,7 +74,9 @@ StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t inte
} }
std::optional<std::string> error_opt; std::optional<std::string> error_opt;
m_sender = std::make_unique<RawSender>(host, port, interval_ms, error_opt); m_sender = std::make_unique<RawSender>(host, port,
std::make_pair(batch_size, static_cast<uint8_t>(STATSD_MSG_DELIMITER)),
interval_ms, error_opt);
if (error_opt.has_value()) { if (error_opt.has_value()) {
LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value()); LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value());
m_sender.reset(); m_sender.reset();

View File

@ -25,6 +25,8 @@ static const std::string DEFAULT_STATSD_NAMESPACE{""};
static constexpr int DEFAULT_STATSD_DURATION{1000}; static constexpr int DEFAULT_STATSD_DURATION{1000};
/** Default number of seconds between recording periodic stats */ /** Default number of seconds between recording periodic stats */
static constexpr int DEFAULT_STATSD_PERIOD{60}; static constexpr int DEFAULT_STATSD_PERIOD{60};
/** Default size in bytes of a batch of messages */
static constexpr int DEFAULT_STATSD_BATCH_SIZE{1024};
/** Minimum number of seconds between recording periodic stats */ /** Minimum number of seconds between recording periodic stats */
static constexpr int MIN_STATSD_PERIOD{5}; static constexpr int MIN_STATSD_PERIOD{5};
/** Maximum number of seconds between recording periodic stats */ /** Maximum number of seconds between recording periodic stats */
@ -33,8 +35,8 @@ static constexpr int MAX_STATSD_PERIOD{60 * 60};
class StatsdClient class StatsdClient
{ {
public: public:
explicit StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename, explicit StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms,
const std::string& ns, bool enabled); const std::string& nodename, const std::string& ns, bool enabled);
~StatsdClient(); ~StatsdClient();
public: public:

View File

@ -10,10 +10,11 @@
#include <util/sock.h> #include <util/sock.h>
#include <util/thread.h> #include <util/thread.h>
RawSender::RawSender(const std::string& host, uint16_t port, uint64_t interval_ms, RawSender::RawSender(const std::string& host, uint16_t port, std::pair<uint64_t, uint8_t> batching_opts,
std::optional<std::string>& error) : uint64_t interval_ms, std::optional<std::string>& error) :
m_host{host}, m_host{host},
m_port{port}, m_port{port},
m_batching_opts{batching_opts},
m_interval_ms{interval_ms} m_interval_ms{interval_ms}
{ {
if (host.empty()) { if (host.empty()) {
@ -105,7 +106,27 @@ std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_ho
void RawSender::QueueAdd(const RawMessage& msg) void RawSender::QueueAdd(const RawMessage& msg)
{ {
AssertLockNotHeld(cs); AssertLockNotHeld(cs);
WITH_LOCK(cs, m_queue.push_back(msg)); LOCK(cs);
const auto& [batch_size, batch_delim] = m_batching_opts;
// If no batch size has been specified, simply add to queue
if (batch_size == 0) {
m_queue.push_back(msg);
return;
}
// We can batch, either create a new batch in queue or append to existing batch in queue
if (m_queue.empty() || m_queue.back().size() + msg.size() >= batch_size) {
// Either we don't have a place to batch our message or we exceeded the batch size, make a new batch
m_queue.emplace_back();
m_queue.back().reserve(batch_size);
} else if (!m_queue.back().empty()) {
// When there is already a batch open we need a delimiter when its not empty
m_queue.back() += batch_delim;
}
// Add the new message to the batch
m_queue.back() += msg;
} }
void RawSender::QueueFlush() void RawSender::QueueFlush()

View File

@ -55,7 +55,8 @@ struct RawMessage : public std::vector<uint8_t>
class RawSender class RawSender
{ {
public: public:
RawSender(const std::string& host, uint16_t port, uint64_t interval_ms, std::optional<std::string>& error); RawSender(const std::string& host, uint16_t port, std::pair<uint64_t, uint8_t> batching_opts,
uint64_t interval_ms, std::optional<std::string>& error);
~RawSender(); ~RawSender();
RawSender(const RawSender&) = delete; RawSender(const RawSender&) = delete;
@ -78,11 +79,11 @@ private:
/* Socket address containing host information */ /* Socket address containing host information */
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)}; std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
/* Mutex to protect messages queue */ /* Mutex to protect (batches of) messages queue */
mutable Mutex cs; mutable Mutex cs;
/* Interrupt for queue processing thread */ /* Interrupt for queue processing thread */
CThreadInterrupt m_interrupt; CThreadInterrupt m_interrupt;
/* Queue of messages to be sent */ /* Queue of (batches of) messages to be sent */
std::deque<RawMessage> m_queue GUARDED_BY(cs); std::deque<RawMessage> m_queue GUARDED_BY(cs);
/* Thread that processes queue every m_interval_ms */ /* Thread that processes queue every m_interval_ms */
std::thread m_thread; std::thread m_thread;
@ -91,6 +92,8 @@ private:
const std::string m_host; const std::string m_host;
/* Port of server receiving messages */ /* Port of server receiving messages */
const uint16_t m_port; const uint16_t m_port;
/* Batching parameters */
const std::pair</*size=*/uint64_t, /*delimiter=*/uint8_t> m_batching_opts{0, 0};
/* Time between queue thread runs (expressed in milliseconds) */ /* Time between queue thread runs (expressed in milliseconds) */
const uint64_t m_interval_ms; const uint64_t m_interval_ms;

View File

@ -186,6 +186,7 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve
::g_stats_client = std::make_unique<StatsdClient>( ::g_stats_client = std::make_unique<StatsdClient>(
m_node.args->GetArg("-statshost", DEFAULT_STATSD_HOST), m_node.args->GetArg("-statshost", DEFAULT_STATSD_HOST),
m_node.args->GetArg("-statsport", DEFAULT_STATSD_PORT), m_node.args->GetArg("-statsport", DEFAULT_STATSD_PORT),
m_node.args->GetArg("-statsbatchsize", DEFAULT_STATSD_BATCH_SIZE),
m_node.args->GetArg("-statsduration", DEFAULT_STATSD_DURATION), m_node.args->GetArg("-statsduration", DEFAULT_STATSD_DURATION),
m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
m_node.args->GetArg("-statsns", DEFAULT_STATSD_NAMESPACE), m_node.args->GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),