diff --git a/src/init.cpp b/src/init.cpp index c847b4bfb6..639a6f2395 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -772,6 +772,7 @@ void SetupServerArgs(ArgsManager& argsman) argsman.AddArg("-server", "Accept command line and JSON-RPC commands", ArgsManager::ALLOW_ANY, OptionsCategory::RPC); argsman.AddArg("-statsenabled", strprintf("Publish internal stats to statsd (default: %u)", DEFAULT_STATSD_ENABLE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); + argsman.AddArg("-statsbatchsize=", strprintf("Specify the size of each batch of stats messages (default: %d)", DEFAULT_STATSD_BATCH_SIZE), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statsduration=", strprintf("Specify the number of milliseconds between stats messages (default: %d)", DEFAULT_STATSD_DURATION), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statshost=", strprintf("Specify statsd host (default: %s)", DEFAULT_STATSD_HOST), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); argsman.AddArg("-statshostname=", strprintf("Specify statsd host name (default: %s)", DEFAULT_STATSD_HOSTNAME), ArgsManager::ALLOW_ANY, OptionsCategory::STATSD); @@ -1542,6 +1543,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) // g_stats_client isn't present when that attempt is made, the client will crash. ::g_stats_client = std::make_unique(args.GetArg("-statshost", DEFAULT_STATSD_HOST), args.GetArg("-statsport", DEFAULT_STATSD_PORT), + args.GetArg("-statsbatchsize", DEFAULT_STATSD_BATCH_SIZE), args.GetArg("-statsduration", DEFAULT_STATSD_DURATION), args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE), diff --git a/src/stats/client.cpp b/src/stats/client.cpp index ba9fa60f7c..ddd5b0a738 100644 --- a/src/stats/client.cpp +++ b/src/stats/client.cpp @@ -43,6 +43,9 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include +/** Delimiter segmenting two fully formed Statsd messages */ +static constexpr char STATSD_MSG_DELIMITER{'\n'}; + std::unique_ptr g_stats_client; bool StatsdClient::ShouldSend(float sample_rate) @@ -60,8 +63,8 @@ bool StatsdClient::ShouldSend(float sample_rate) return sample_rate > std::uniform_real_distribution(0.f, 1.f)(insecure_rand); } -StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename, - const std::string& ns, bool enabled) : +StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms, + const std::string& nodename, const std::string& ns, bool enabled) : m_nodename{nodename}, m_ns{ns} { @@ -71,7 +74,9 @@ StatsdClient::StatsdClient(const std::string& host, uint16_t port, uint64_t inte } std::optional error_opt; - m_sender = std::make_unique(host, port, interval_ms, error_opt); + m_sender = std::make_unique(host, port, + std::make_pair(batch_size, static_cast(STATSD_MSG_DELIMITER)), + interval_ms, error_opt); if (error_opt.has_value()) { LogPrintf("ERROR: %s, cannot initialize StatsdClient.\n", error_opt.value()); m_sender.reset(); diff --git a/src/stats/client.h b/src/stats/client.h index 0cad8bc600..40d74c2505 100644 --- a/src/stats/client.h +++ b/src/stats/client.h @@ -25,6 +25,8 @@ static const std::string DEFAULT_STATSD_NAMESPACE{""}; static constexpr int DEFAULT_STATSD_DURATION{1000}; /** Default number of seconds between recording periodic stats */ static constexpr int DEFAULT_STATSD_PERIOD{60}; +/** Default size in bytes of a batch of messages */ +static constexpr int DEFAULT_STATSD_BATCH_SIZE{1024}; /** Minimum number of seconds between recording periodic stats */ static constexpr int MIN_STATSD_PERIOD{5}; /** Maximum number of seconds between recording periodic stats */ @@ -33,8 +35,8 @@ static constexpr int MAX_STATSD_PERIOD{60 * 60}; class StatsdClient { public: - explicit StatsdClient(const std::string& host, uint16_t port, uint64_t interval_ms, const std::string& nodename, - const std::string& ns, bool enabled); + explicit StatsdClient(const std::string& host, uint16_t port, uint64_t batch_size, uint64_t interval_ms, + const std::string& nodename, const std::string& ns, bool enabled); ~StatsdClient(); public: diff --git a/src/stats/rawsender.cpp b/src/stats/rawsender.cpp index 0c2f10ffa6..eb423e37d5 100644 --- a/src/stats/rawsender.cpp +++ b/src/stats/rawsender.cpp @@ -10,10 +10,11 @@ #include #include -RawSender::RawSender(const std::string& host, uint16_t port, uint64_t interval_ms, - std::optional& error) : +RawSender::RawSender(const std::string& host, uint16_t port, std::pair batching_opts, + uint64_t interval_ms, std::optional& error) : m_host{host}, m_port{port}, + m_batching_opts{batching_opts}, m_interval_ms{interval_ms} { if (host.empty()) { @@ -105,7 +106,27 @@ std::string RawSender::ToStringHostPort() const { return strprintf("%s:%d", m_ho void RawSender::QueueAdd(const RawMessage& msg) { AssertLockNotHeld(cs); - WITH_LOCK(cs, m_queue.push_back(msg)); + LOCK(cs); + + const auto& [batch_size, batch_delim] = m_batching_opts; + // If no batch size has been specified, simply add to queue + if (batch_size == 0) { + m_queue.push_back(msg); + return; + } + + // We can batch, either create a new batch in queue or append to existing batch in queue + if (m_queue.empty() || m_queue.back().size() + msg.size() >= batch_size) { + // Either we don't have a place to batch our message or we exceeded the batch size, make a new batch + m_queue.emplace_back(); + m_queue.back().reserve(batch_size); + } else if (!m_queue.back().empty()) { + // When there is already a batch open we need a delimiter when its not empty + m_queue.back() += batch_delim; + } + + // Add the new message to the batch + m_queue.back() += msg; } void RawSender::QueueFlush() diff --git a/src/stats/rawsender.h b/src/stats/rawsender.h index 24dadcde94..d91f6c0def 100644 --- a/src/stats/rawsender.h +++ b/src/stats/rawsender.h @@ -55,7 +55,8 @@ struct RawMessage : public std::vector class RawSender { public: - RawSender(const std::string& host, uint16_t port, uint64_t interval_ms, std::optional& error); + RawSender(const std::string& host, uint16_t port, std::pair batching_opts, + uint64_t interval_ms, std::optional& error); ~RawSender(); RawSender(const RawSender&) = delete; @@ -78,11 +79,11 @@ private: /* Socket address containing host information */ std::pair m_server{{}, sizeof(struct sockaddr_storage)}; - /* Mutex to protect messages queue */ + /* Mutex to protect (batches of) messages queue */ mutable Mutex cs; /* Interrupt for queue processing thread */ CThreadInterrupt m_interrupt; - /* Queue of messages to be sent */ + /* Queue of (batches of) messages to be sent */ std::deque m_queue GUARDED_BY(cs); /* Thread that processes queue every m_interval_ms */ std::thread m_thread; @@ -91,6 +92,8 @@ private: const std::string m_host; /* Port of server receiving messages */ const uint16_t m_port; + /* Batching parameters */ + const std::pair m_batching_opts{0, 0}; /* Time between queue thread runs (expressed in milliseconds) */ const uint64_t m_interval_ms; diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 67ce533d8b..7d7c898d58 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -186,6 +186,7 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve ::g_stats_client = std::make_unique( m_node.args->GetArg("-statshost", DEFAULT_STATSD_HOST), m_node.args->GetArg("-statsport", DEFAULT_STATSD_PORT), + m_node.args->GetArg("-statsbatchsize", DEFAULT_STATSD_BATCH_SIZE), m_node.args->GetArg("-statsduration", DEFAULT_STATSD_DURATION), m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME), m_node.args->GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),