Merge #5167: refactor(stats): modernize statsd::StatsdClient, make global unique_ptr

cc998abec1 fmt: apply `clang-format-diff.py` suggestions (Kittywhiskers Van Gogh)
0401c581eb stats: `const`-ify variables and arguments (Kittywhiskers Van Gogh)
9f96723774 stats: stop using error codes, switch over to `bool` (Kittywhiskers Van Gogh)
1a81979c1e stats: initialize socket after we have a valid socket address (Kittywhiskers Van Gogh)
dbbfc8d766 stats: use `Socks` wrapper, use `CService` to generate our `sockaddr` (Kittywhiskers Van Gogh)
2def905044 stats: move init logic into constructor (Kittywhiskers Van Gogh)
4bc727cd6c stats: clean up randomization code, move `FastRandomContext` inward (Kittywhiskers Van Gogh)
840241eefd stats: cleanup error logging, improve code sanity (Kittywhiskers Van Gogh)
85890ddb13 docs: add copyright notice to source file, update notice in header (Kittywhiskers Van Gogh)
a9d1b1494d stats: move `_StatsdClientData` variables into `StatsdClient` (Kittywhiskers Van Gogh)
30c30c1397 stats: fetch all arguments needed when constructing `g_stats_client` (Kittywhiskers Van Gogh)
5133d88415 stats: s/statsClient/g_stats_client/g (Kittywhiskers Van Gogh)
f81951dd00 stats: make `statsClient` a `std::unique_ptr`, denote as global variable (Kittywhiskers Van Gogh)

Pull request description:

  ## Additional Information

  Support for transmitting stats to a Statsd server has been courtesy of Statoshi ([repo](https://github.com/jlopp/statoshi)), implemented Dec, 2020 by [dash#2515](https://github.com/dashpay/dash/pull/2515) but since then, it hasn't gotten much attention aside from benefiting from codebase-wide changes and the occasional compiler appeasement. This pull request aims to give our statistics code some TLC.

  Changes include:

  * Limiting initialization to solely during construction and moving the responsibility of fetching arguments outside of `statsd::StatsdClient`.
  * Using the RAII `Socks` wrapper as early as possible (we still need to construct a raw socket ourselves but this is done in the initializer and control is moved to the wrapper and everywhere else, the wrapper is used)
  * Utilizing existing networking code to generate the socket address
    * This lets us trivially allow IPv6 connections as the responsibility to construct it safely is moved to `CService`.
  * Using `std::string` and our string manipulation capabilities (replacing `snprintf` with `strprintf`), replacing platform-specific types (replacing `short` with `uint16_t`).

  ## Breaking Changes

  None observed.

  ## Checklist:

  - [x] I have performed a self-review of my own code
  - [x] I have commented my code, particularly in hard-to-understand areas **(note: N/A)**
  - [x] I have added or updated relevant unit/integration/functional/e2e tests **(note: N/A)**
  - [x] I have made corresponding changes to the documentation **(note: N/A)**
  - [x] I have assigned this pull request to a milestone

ACKs for top commit:
  PastaPastaPasta:
    utACK [cc998ab](cc998abec1)
  UdjinM6:
    utACK cc998abec1

Tree-SHA512: 433c92160d6ac7ebb8582ada3cbb65ead7913618266b773619a528c90dfe0e286aafa46dc3b0bca62f246938e5948a732080e2cddba942d3627f007ca6efcc1f
This commit is contained in:
pasta 2024-09-11 11:11:59 -05:00
commit 96685be685
No known key found for this signature in database
GPG Key ID: 52527BEDABE87984
10 changed files with 222 additions and 259 deletions

View File

@ -289,6 +289,7 @@ void PrepareShutdown(NodeContext& node)
node.banman.reset();
node.addrman.reset();
node.netgroupman.reset();
::g_stats_client.reset();
if (node.mempool && node.mempool->IsLoaded() && node.args->GetBoolArg("-persistmempool", DEFAULT_PERSIST_MEMPOOL)) {
DumpMempool(*node.mempool);
@ -839,12 +840,12 @@ static void PeriodicStats(ArgsManager& args, ChainstateManager& chainman, const
CCoinsStats stats{CoinStatsHashType::NONE};
chainman.ActiveChainstate().ForceFlushStateToDisk();
if (WITH_LOCK(cs_main, return GetUTXOStats(&chainman.ActiveChainstate().CoinsDB(), std::ref(chainman.m_blockman), stats, RpcInterruptionPoint, chainman.ActiveChain().Tip()))) {
statsClient.gauge("utxoset.tx", stats.nTransactions, 1.0f);
statsClient.gauge("utxoset.txOutputs", stats.nTransactionOutputs, 1.0f);
statsClient.gauge("utxoset.dbSizeBytes", stats.nDiskSize, 1.0f);
statsClient.gauge("utxoset.blockHeight", stats.nHeight, 1.0f);
::g_stats_client->gauge("utxoset.tx", stats.nTransactions, 1.0f);
::g_stats_client->gauge("utxoset.txOutputs", stats.nTransactionOutputs, 1.0f);
::g_stats_client->gauge("utxoset.dbSizeBytes", stats.nDiskSize, 1.0f);
::g_stats_client->gauge("utxoset.blockHeight", stats.nHeight, 1.0f);
if (stats.total_amount.has_value()) {
statsClient.gauge("utxoset.totalAmount", (double)stats.total_amount.value() / (double)COIN, 1.0f);
::g_stats_client->gauge("utxoset.totalAmount", (double)stats.total_amount.value() / (double)COIN, 1.0f);
}
} else {
// something went wrong
@ -866,22 +867,22 @@ static void PeriodicStats(ArgsManager& args, ChainstateManager& chainman, const
int64_t timeDiff = maxTime - minTime;
double nNetworkHashPS = workDiff.getdouble() / timeDiff;
statsClient.gaugeDouble("network.hashesPerSecond", nNetworkHashPS);
statsClient.gaugeDouble("network.terahashesPerSecond", nNetworkHashPS / 1e12);
statsClient.gaugeDouble("network.petahashesPerSecond", nNetworkHashPS / 1e15);
statsClient.gaugeDouble("network.exahashesPerSecond", nNetworkHashPS / 1e18);
::g_stats_client->gaugeDouble("network.hashesPerSecond", nNetworkHashPS);
::g_stats_client->gaugeDouble("network.terahashesPerSecond", nNetworkHashPS / 1e12);
::g_stats_client->gaugeDouble("network.petahashesPerSecond", nNetworkHashPS / 1e15);
::g_stats_client->gaugeDouble("network.exahashesPerSecond", nNetworkHashPS / 1e18);
// No need for cs_main, we never use null tip here
statsClient.gaugeDouble("network.difficulty", (double)GetDifficulty(tip));
::g_stats_client->gaugeDouble("network.difficulty", (double)GetDifficulty(tip));
statsClient.gauge("transactions.txCacheSize", WITH_LOCK(cs_main, return chainman.ActiveChainstate().CoinsTip().GetCacheSize()), 1.0f);
statsClient.gauge("transactions.totalTransactions", tip->nChainTx, 1.0f);
::g_stats_client->gauge("transactions.txCacheSize", WITH_LOCK(cs_main, return chainman.ActiveChainstate().CoinsTip().GetCacheSize()), 1.0f);
::g_stats_client->gauge("transactions.totalTransactions", tip->nChainTx, 1.0f);
{
LOCK(mempool.cs);
statsClient.gauge("transactions.mempool.totalTransactions", mempool.size(), 1.0f);
statsClient.gauge("transactions.mempool.totalTxBytes", (int64_t) mempool.GetTotalTxSize(), 1.0f);
statsClient.gauge("transactions.mempool.memoryUsageBytes", (int64_t) mempool.DynamicMemoryUsage(), 1.0f);
statsClient.gauge("transactions.mempool.minFeePerKb", mempool.GetMinFee(args.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(), 1.0f);
::g_stats_client->gauge("transactions.mempool.totalTransactions", mempool.size(), 1.0f);
::g_stats_client->gauge("transactions.mempool.totalTxBytes", (int64_t) mempool.GetTotalTxSize(), 1.0f);
::g_stats_client->gauge("transactions.mempool.memoryUsageBytes", (int64_t) mempool.DynamicMemoryUsage(), 1.0f);
::g_stats_client->gauge("transactions.mempool.minFeePerKb", mempool.GetMinFee(args.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(), 1.0f);
}
}
@ -1524,6 +1525,15 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
fDiscover = args.GetBoolArg("-discover", true);
const bool ignores_incoming_txs{args.GetBoolArg("-blocksonly", DEFAULT_BLOCKSONLY)};
// We need to initialize g_stats_client early as currently, g_stats_client is called
// regardless of whether transmitting stats are desirable or not and if
// g_stats_client isn't present when that attempt is made, the client will crash.
::g_stats_client = std::make_unique<statsd::StatsdClient>(args.GetArg("-statshost", DEFAULT_STATSD_HOST),
args.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
args.GetArg("-statsport", DEFAULT_STATSD_PORT),
args.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),
args.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE));
{
// Read asmap file if configured

View File

@ -623,7 +623,7 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
/*inbound_onion=*/false,
std::move(i2p_transient_session));
pnode->AddRef();
statsClient.inc("peers.connect", 1.0f);
::g_stats_client->inc("peers.connect", 1.0f);
// We're making a new connection, harvest entropy from the time (and our peer count)
RandAddEvent((uint32_t)id);
@ -666,7 +666,7 @@ void CNode::CloseSocketDisconnect(CConnman* connman)
m_sock.reset();
m_i2p_sam_session.reset();
statsClient.inc("peers.disconnect", 1.0f);
::g_stats_client->inc("peers.disconnect", 1.0f);
}
void CConnman::AddWhitelistPermissionFlags(NetPermissionFlags& flags, const CNetAddr &addr) const {
@ -817,7 +817,7 @@ bool CNode::ReceiveMsgBytes(Span<const uint8_t> msg_bytes, bool& complete)
}
assert(i != mapRecvBytesPerMsgType.end());
i->second += msg.m_raw_message_size;
statsClient.count("bandwidth.message." + std::string(msg.m_type) + ".bytesReceived", msg.m_raw_message_size, 1.0f);
::g_stats_client->count("bandwidth.message." + std::string(msg.m_type) + ".bytesReceived", msg.m_raw_message_size, 1.0f);
// push the message to the process queue,
vRecvMsg.push_back(std::move(msg));
@ -1741,20 +1741,20 @@ void CConnman::CalculateNumConnectionsChangedStats()
torNodes++;
const auto last_ping_time = count_microseconds(pnode->m_last_ping_time);
if (last_ping_time > 0)
statsClient.timing("peers.ping_us", last_ping_time, 1.0f);
::g_stats_client->timing("peers.ping_us", last_ping_time, 1.0f);
}
for (const std::string &msg : getAllNetMessageTypes()) {
statsClient.gauge("bandwidth.message." + msg + ".totalBytesReceived", mapRecvBytesMsgStats[msg], 1.0f);
statsClient.gauge("bandwidth.message." + msg + ".totalBytesSent", mapSentBytesMsgStats[msg], 1.0f);
::g_stats_client->gauge("bandwidth.message." + msg + ".totalBytesReceived", mapRecvBytesMsgStats[msg], 1.0f);
::g_stats_client->gauge("bandwidth.message." + msg + ".totalBytesSent", mapSentBytesMsgStats[msg], 1.0f);
}
statsClient.gauge("peers.totalConnections", nPrevNodeCount, 1.0f);
statsClient.gauge("peers.spvNodeConnections", spvNodes, 1.0f);
statsClient.gauge("peers.fullNodeConnections", fullNodes, 1.0f);
statsClient.gauge("peers.inboundConnections", inboundNodes, 1.0f);
statsClient.gauge("peers.outboundConnections", outboundNodes, 1.0f);
statsClient.gauge("peers.ipv4Connections", ipv4Nodes, 1.0f);
statsClient.gauge("peers.ipv6Connections", ipv6Nodes, 1.0f);
statsClient.gauge("peers.torConnections", torNodes, 1.0f);
::g_stats_client->gauge("peers.totalConnections", nPrevNodeCount, 1.0f);
::g_stats_client->gauge("peers.spvNodeConnections", spvNodes, 1.0f);
::g_stats_client->gauge("peers.fullNodeConnections", fullNodes, 1.0f);
::g_stats_client->gauge("peers.inboundConnections", inboundNodes, 1.0f);
::g_stats_client->gauge("peers.outboundConnections", outboundNodes, 1.0f);
::g_stats_client->gauge("peers.ipv4Connections", ipv4Nodes, 1.0f);
::g_stats_client->gauge("peers.ipv6Connections", ipv6Nodes, 1.0f);
::g_stats_client->gauge("peers.torConnections", torNodes, 1.0f);
}
bool CConnman::ShouldRunInactivityChecks(const CNode& node, std::chrono::seconds now) const
@ -4126,8 +4126,8 @@ bool CConnman::DisconnectNode(NodeId id)
void CConnman::RecordBytesRecv(uint64_t bytes)
{
nTotalBytesRecv += bytes;
statsClient.count("bandwidth.bytesReceived", bytes, 0.1f);
statsClient.gauge("bandwidth.totalBytesReceived", nTotalBytesRecv, 0.01f);
::g_stats_client->count("bandwidth.bytesReceived", bytes, 0.1f);
::g_stats_client->gauge("bandwidth.totalBytesReceived", nTotalBytesRecv, 0.01f);
}
void CConnman::RecordBytesSent(uint64_t bytes)
@ -4136,8 +4136,8 @@ void CConnman::RecordBytesSent(uint64_t bytes)
LOCK(m_total_bytes_sent_mutex);
nTotalBytesSent += bytes;
statsClient.count("bandwidth.bytesSent", bytes, 0.01f);
statsClient.gauge("bandwidth.totalBytesSent", nTotalBytesSent, 0.01f);
::g_stats_client->count("bandwidth.bytesSent", bytes, 0.01f);
::g_stats_client->gauge("bandwidth.totalBytesSent", nTotalBytesSent, 0.01f);
const auto now = GetTime<std::chrono::seconds>();
if (nMaxOutboundCycleStartTime + MAX_UPLOAD_TIMEFRAME < now)
@ -4293,8 +4293,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
msg.data.data()
);
statsClient.count(strprintf("bandwidth.message.%s.bytesSent", msg.m_type), nMessageSize, 1.0f);
statsClient.inc(strprintf("message.sent.%s", msg.m_type), 1.0f);
::g_stats_client->count(strprintf("bandwidth.message.%s.bytesSent", msg.m_type), nMessageSize, 1.0f);
::g_stats_client->inc(strprintf("message.sent.%s", msg.m_type), 1.0f);
{
LOCK(pnode->cs_vSend);

View File

@ -1699,9 +1699,9 @@ void PeerManagerImpl::Misbehaving(const NodeId pnode, const int howmuch, const s
if (score_now >= DISCOURAGEMENT_THRESHOLD && score_before < DISCOURAGEMENT_THRESHOLD) {
warning = " DISCOURAGE THRESHOLD EXCEEDED";
peer->m_should_discourage = true;
statsClient.inc("misbehavior.banned", 1.0f);
::g_stats_client->inc("misbehavior.banned", 1.0f);
} else {
statsClient.count("misbehavior.amount", howmuch, 1.0);
::g_stats_client->count("misbehavior.amount", howmuch, 1.0);
}
LogPrint(BCLog::NET, "Misbehaving: peer=%d (%d -> %d)%s%s\n",
@ -3290,7 +3290,7 @@ void PeerManagerImpl::ProcessMessage(
AssertLockHeld(g_msgproc_mutex);
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId());
statsClient.inc("message.received." + SanitizeString(msg_type), 1.0f);
::g_stats_client->inc("message.received." + SanitizeString(msg_type), 1.0f);
const bool is_masternode = m_mn_activeman != nullptr;
@ -3789,7 +3789,7 @@ void PeerManagerImpl::ProcessMessage(
if (inv.IsMsgBlk()) {
const bool fAlreadyHave = AlreadyHaveBlock(inv.hash);
LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId());
statsClient.inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f);
::g_stats_client->inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f);
UpdateBlockAvailability(pfrom.GetId(), inv.hash);
if (!fAlreadyHave && !fImporting && !fReindex && !IsBlockRequested(inv.hash)) {
@ -3804,7 +3804,7 @@ void PeerManagerImpl::ProcessMessage(
} else {
const bool fAlreadyHave = AlreadyHave(inv);
LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId());
statsClient.inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f);
::g_stats_client->inc(strprintf("message.received.inv_%s", inv.GetCommand()), 1.0f);
static std::set<int> allowWhileInIBDObjs = {
MSG_SPORK

View File

@ -207,10 +207,6 @@ public:
return rand32() % nMax;
}
uint32_t operator()(uint32_t nMax) {
return rand32(nMax);
}
/** Generate random bytes. */
template <typename B = unsigned char>
std::vector<B> randbytes(size_t len);

View File

@ -1,3 +1,8 @@
// Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2020-2024 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
/**
Copyright (c) 2014, Rex
All rights reserved.
@ -32,217 +37,160 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <compat.h>
#include <netbase.h>
#include <random.h>
#include <util/system.h>
#include <cmath>
#include <cstdio>
#include <random>
statsd::StatsdClient statsClient;
std::unique_ptr<statsd::StatsdClient> g_stats_client;
namespace statsd {
inline bool fequal(float a, float b)
bool StatsdClient::ShouldSend(float sample_rate)
{
const float epsilon = 0.0001;
return ( fabs(a - b) < epsilon );
sample_rate = std::clamp(sample_rate, 0.f, 1.f);
constexpr float EPSILON{0.0001f};
/* If sample rate is 1, we should always send */
if (std::fabs(sample_rate - 1.f) < EPSILON) return true;
/* If sample rate is 0, we should never send */
if (std::fabs(sample_rate) < EPSILON) return false;
/* Sample rate is >0 and <1, roll the dice */
LOCK(cs);
return sample_rate > std::uniform_real_distribution<float>(0.f, 1.f)(insecure_rand);
}
thread_local FastRandomContext insecure_rand;
inline bool should_send(float sample_rate)
StatsdClient::StatsdClient(const std::string& host, const std::string& nodename, uint16_t port, const std::string& ns,
bool enabled) :
m_port{port},
m_host{host},
m_nodename{nodename},
m_ns{ns}
{
if ( fequal(sample_rate, 1.0) )
{
return true;
if (!enabled) {
LogPrintf("Transmitting stats are disabled, will not init StatsdClient\n");
return;
}
float p = float(insecure_rand(std::numeric_limits<uint32_t>::max())) / float(std::numeric_limits<uint32_t>::max());
return sample_rate > p;
CNetAddr netaddr;
if (!LookupHost(m_host, netaddr, /*fAllowLookup=*/true)) {
LogPrintf("ERROR: Unable to lookup host %s, cannot init StatsdClient\n", m_host);
return;
}
if (!netaddr.IsIPv4()) {
LogPrintf("ERROR: Host %s on unsupported network, cannot init StatsdClient\n", m_host);
return;
}
if (!CService(netaddr, port).GetSockAddr(reinterpret_cast<struct sockaddr*>(&m_server.first), &m_server.second)) {
LogPrintf("ERROR: Cannot get socket address for %s, cannot init StatsdClient\n", m_host);
return;
}
struct _StatsdClientData {
SOCKET sock;
struct sockaddr_in server;
std::string ns;
std::string host;
std::string nodename;
short port;
bool init;
char errmsg[1024];
};
StatsdClient::StatsdClient(const std::string& host, int port, const std::string& ns) :
d(std::make_unique<_StatsdClientData>())
{
d->sock = INVALID_SOCKET;
config(host, port, ns);
SOCKET hSocket = ::socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (hSocket == INVALID_SOCKET) {
LogPrintf("ERROR: Cannot create socket (socket() returned error %s), cannot init StatsdClient\n",
NetworkErrorString(WSAGetLastError()));
return;
}
m_sock = std::make_unique<Sock>(hSocket);
StatsdClient::~StatsdClient()
{
// close socket
CloseSocket(d->sock);
}
void StatsdClient::config(const std::string& host, int port, const std::string& ns)
{
d->ns = ns;
d->host = host;
d->port = port;
d->init = false;
CloseSocket(d->sock);
}
int StatsdClient::init()
{
static bool fEnabled = gArgs.GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE);
if (!fEnabled) return -3;
if ( d->init ) return 0;
config(gArgs.GetArg("-statshost", DEFAULT_STATSD_HOST), gArgs.GetArg("-statsport", DEFAULT_STATSD_PORT), gArgs.GetArg("-statsns", DEFAULT_STATSD_NAMESPACE));
d->sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if ( d->sock == INVALID_SOCKET ) {
snprintf(d->errmsg, sizeof(d->errmsg), "could not create socket, err=%m");
return -1;
}
memset(&d->server, 0, sizeof(d->server));
d->server.sin_family = AF_INET;
d->server.sin_port = htons(d->port);
CNetAddr netaddr(d->server.sin_addr);
if (!LookupHost(d->host, netaddr, true) || !netaddr.GetInAddr(&d->server.sin_addr)) {
snprintf(d->errmsg, sizeof(d->errmsg), "LookupHost or GetInAddr failed");
return -2;
}
if (gArgs.IsArgSet("-statshostname")) {
d->nodename = gArgs.GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME);
}
d->init = true;
return 0;
LogPrintf("StatsdClient initialized to transmit stats to %s:%d\n", m_host, m_port);
}
/* will change the original string */
void StatsdClient::cleanup(std::string& key)
{
size_t pos = key.find_first_of(":|@");
while ( pos != std::string::npos )
{
auto pos = key.find_first_of(":|@");
while (pos != std::string::npos) {
key[pos] = '_';
pos = key.find_first_of(":|@");
}
}
int StatsdClient::dec(const std::string& key, float sample_rate)
{
return count(key, -1, sample_rate);
}
bool StatsdClient::dec(const std::string& key, float sample_rate) { return count(key, -1, sample_rate); }
int StatsdClient::inc(const std::string& key, float sample_rate)
{
return count(key, 1, sample_rate);
}
bool StatsdClient::inc(const std::string& key, float sample_rate) { return count(key, 1, sample_rate); }
int StatsdClient::count(const std::string& key, size_t value, float sample_rate)
bool StatsdClient::count(const std::string& key, int64_t value, float sample_rate)
{
return send(key, value, "c", sample_rate);
}
int StatsdClient::gauge(const std::string& key, size_t value, float sample_rate)
bool StatsdClient::gauge(const std::string& key, int64_t value, float sample_rate)
{
return send(key, value, "g", sample_rate);
}
int StatsdClient::gaugeDouble(const std::string& key, double value, float sample_rate)
bool StatsdClient::gaugeDouble(const std::string& key, double value, float sample_rate)
{
return sendDouble(key, value, "g", sample_rate);
}
int StatsdClient::timing(const std::string& key, size_t ms, float sample_rate)
bool StatsdClient::timing(const std::string& key, int64_t ms, float sample_rate)
{
return send(key, ms, "ms", sample_rate);
}
int StatsdClient::send(std::string key, size_t value, const std::string& type, float sample_rate)
bool StatsdClient::send(std::string key, int64_t value, const std::string& type, float sample_rate)
{
if (!should_send(sample_rate)) {
return 0;
if (!m_sock) {
return false;
}
if (!ShouldSend(sample_rate)) {
// Not our turn to send but report that we have
return true;
}
// partition stats by node name if set
if (!d->nodename.empty())
key = key + "." + d->nodename;
if (!m_nodename.empty()) key = key + "." + m_nodename;
cleanup(key);
char buf[256];
if ( fequal( sample_rate, 1.0 ) )
{
snprintf(buf, sizeof(buf), "%s%s:%zd|%s",
d->ns.c_str(), key.c_str(), (ssize_t) value, type.c_str());
}
else
{
snprintf(buf, sizeof(buf), "%s%s:%zd|%s|@%.2f",
d->ns.c_str(), key.c_str(), (ssize_t) value, type.c_str(), sample_rate);
std::string buf{strprintf("%s%s:%d|%s", m_ns, key, value, type)};
if (sample_rate < 1.f) {
buf += strprintf("|@%.2f", sample_rate);
}
return send(buf);
}
int StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate)
bool StatsdClient::sendDouble(std::string key, double value, const std::string& type, float sample_rate)
{
if (!should_send(sample_rate)) {
return 0;
if (!m_sock) {
return false;
}
if (!ShouldSend(sample_rate)) {
// Not our turn to send but report that we have
return true;
}
// partition stats by node name if set
if (!d->nodename.empty())
key = key + "." + d->nodename;
if (!m_nodename.empty()) key = key + "." + m_nodename;
cleanup(key);
char buf[256];
if ( fequal( sample_rate, 1.0 ) )
{
snprintf(buf, sizeof(buf), "%s%s:%f|%s",
d->ns.c_str(), key.c_str(), value, type.c_str());
}
else
{
snprintf(buf, sizeof(buf), "%s%s:%f|%s|@%.2f",
d->ns.c_str(), key.c_str(), value, type.c_str(), sample_rate);
std::string buf{strprintf("%s%s:%f|%s", m_ns, key, value, type)};
if (sample_rate < 1.f) {
buf += strprintf("|@%.2f", sample_rate);
}
return send(buf);
}
int StatsdClient::send(const std::string& message)
bool StatsdClient::send(const std::string& message)
{
int ret = init();
if ( ret )
{
return ret;
}
ret = sendto(d->sock, message.data(), message.size(), 0, reinterpret_cast<const sockaddr*>(&d->server), sizeof(d->server));
if ( ret == -1) {
snprintf(d->errmsg, sizeof(d->errmsg),
"sendto server fail, host=%s:%d, err=%m", d->host.c_str(), d->port);
return -1;
}
return 0;
assert(m_sock);
if (::sendto(m_sock->Get(), message.data(), message.size(), /*flags=*/0,
reinterpret_cast<struct sockaddr*>(&m_server.first), m_server.second) == SOCKET_ERROR) {
LogPrintf("ERROR: Unable to send message (sendto() returned error %s), host=%s:%d\n",
NetworkErrorString(WSAGetLastError()), m_host, m_port);
return false;
}
const char* StatsdClient::errmsg()
{
return d->errmsg;
return true;
}
} // namespace statsd

View File

@ -1,3 +1,4 @@
// Copyright (c) 2014-2017 Statoshi Developers
// Copyright (c) 2020-2023 The Dash Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
@ -5,67 +6,68 @@
#ifndef BITCOIN_STATSD_CLIENT_H
#define BITCOIN_STATSD_CLIENT_H
#include <random.h>
#include <threadsafety.h>
#include <util/sock.h>
#include <string>
#include <memory>
static const bool DEFAULT_STATSD_ENABLE = false;
static const int DEFAULT_STATSD_PORT = 8125;
static const std::string DEFAULT_STATSD_HOST = "127.0.0.1";
static const std::string DEFAULT_STATSD_HOSTNAME = "";
static const std::string DEFAULT_STATSD_NAMESPACE = "";
static constexpr bool DEFAULT_STATSD_ENABLE{false};
static constexpr uint16_t DEFAULT_STATSD_PORT{8125};
static const std::string DEFAULT_STATSD_HOST{"127.0.0.1"};
static const std::string DEFAULT_STATSD_HOSTNAME{""};
static const std::string DEFAULT_STATSD_NAMESPACE{""};
// schedule periodic measurements, in seconds: default - 1 minute, min - 5 sec, max - 1h.
static const int DEFAULT_STATSD_PERIOD = 60;
static const int MIN_STATSD_PERIOD = 5;
static const int MAX_STATSD_PERIOD = 60 * 60;
static constexpr int DEFAULT_STATSD_PERIOD{60};
static constexpr int MIN_STATSD_PERIOD{5};
static constexpr int MAX_STATSD_PERIOD{60 * 60};
namespace statsd {
struct _StatsdClientData;
class StatsdClient {
public:
explicit StatsdClient(const std::string& host = DEFAULT_STATSD_HOST, int port = DEFAULT_STATSD_PORT, const std::string& ns = DEFAULT_STATSD_NAMESPACE);
~StatsdClient();
explicit StatsdClient(const std::string& host, const std::string& nodename, uint16_t port,
const std::string& ns, bool enabled);
public:
// you can config at anytime; client will use new address (useful for Singleton)
void config(const std::string& host, int port, const std::string& ns = DEFAULT_STATSD_NAMESPACE);
const char* errmsg();
public:
int inc(const std::string& key, float sample_rate = 1.0);
int dec(const std::string& key, float sample_rate = 1.0);
int count(const std::string& key, size_t value, float sample_rate = 1.0);
int gauge(const std::string& key, size_t value, float sample_rate = 1.0);
int gaugeDouble(const std::string& key, double value, float sample_rate = 1.0);
int timing(const std::string& key, size_t ms, float sample_rate = 1.0);
public:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
int send(const std::string& message);
bool inc(const std::string& key, float sample_rate = 1.f);
bool dec(const std::string& key, float sample_rate = 1.f);
bool count(const std::string& key, int64_t value, float sample_rate = 1.f);
bool gauge(const std::string& key, int64_t value, float sample_rate = 1.f);
bool gaugeDouble(const std::string& key, double value, float sample_rate = 1.f);
bool timing(const std::string& key, int64_t ms, float sample_rate = 1.f);
/* (Low Level Api) manually send a message
* type = "c", "g" or "ms"
*/
int send(std::string key, size_t value,
const std::string& type, float sample_rate);
int sendDouble(std::string key, double value,
const std::string& type, float sample_rate);
bool send(std::string key, int64_t value, const std::string& type, float sample_rate);
bool sendDouble(std::string key, double value, const std::string& type, float sample_rate);
protected:
int init();
static void cleanup(std::string& key);
private:
/**
* (Low Level Api) manually send a message
* which might be composed of several lines.
*/
bool send(const std::string& message);
protected:
const std::unique_ptr<struct _StatsdClientData> d;
void cleanup(std::string& key);
bool ShouldSend(float sample_rate);
private:
mutable Mutex cs;
mutable FastRandomContext insecure_rand GUARDED_BY(cs);
std::unique_ptr<Sock> m_sock{nullptr};
std::pair<struct sockaddr_storage, socklen_t> m_server{{}, sizeof(struct sockaddr_storage)};
const uint16_t m_port;
const std::string m_host;
const std::string m_nodename;
const std::string m_ns;
};
} // namespace statsd
extern statsd::StatsdClient statsClient;
extern std::unique_ptr<statsd::StatsdClient> g_stats_client;
#endif // BITCOIN_STATSD_CLIENT_H

View File

@ -39,6 +39,7 @@
#include <scheduler.h>
#include <script/sigcache.h>
#include <spork.h>
#include <statsd_client.h>
#include <streams.h>
#include <test/util/index.h>
#include <txdb.h>
@ -182,6 +183,13 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve
SetupNetworking();
InitSignatureCache();
InitScriptExecutionCache();
::g_stats_client = std::make_unique<statsd::StatsdClient>(
m_node.args->GetArg("-statshost", DEFAULT_STATSD_HOST),
m_node.args->GetArg("-statshostname", DEFAULT_STATSD_HOSTNAME),
m_node.args->GetArg("-statsport", DEFAULT_STATSD_PORT),
m_node.args->GetArg("-statsns", DEFAULT_STATSD_NAMESPACE),
m_node.args->GetBoolArg("-statsenabled", DEFAULT_STATSD_ENABLE)
);
m_node.chain = interfaces::MakeChain(m_node);
m_node.netgroupman = std::make_unique<NetGroupManager>(/*asmap=*/std::vector<bool>());
@ -217,6 +225,7 @@ BasicTestingSetup::~BasicTestingSetup()
m_node.connman.reset();
m_node.addrman.reset();
m_node.netgroupman.reset();
::g_stats_client.reset();
LogInstance().DisconnectTestLogger();
fs::remove_all(m_path_root);

View File

@ -51,8 +51,8 @@ bool TxOrphanage::AddTx(const CTransactionRef& tx, NodeId peer)
LogPrint(BCLog::MEMPOOL, "stored orphan tx %s (mapsz %u outsz %u)\n", hash.ToString(),
m_orphans.size(), m_outpoint_to_orphan_it.size());
statsClient.inc("transactions.orphans.add", 1.0f);
statsClient.gauge("transactions.orphans", m_orphans.size());
::g_stats_client->inc("transactions.orphans.add", 1.0f);
::g_stats_client->gauge("transactions.orphans", m_orphans.size());
return true;
}
@ -87,8 +87,8 @@ int TxOrphanage::EraseTx(const uint256& txid)
assert(m_orphan_tx_size >= it->second.nTxSize);
m_orphan_tx_size -= it->second.nTxSize;
m_orphans.erase(it);
statsClient.inc("transactions.orphans.remove", 1.0f);
statsClient.gauge("transactions.orphans", m_orphans.size());
::g_stats_client->inc("transactions.orphans.remove", 1.0f);
::g_stats_client->gauge("transactions.orphans", m_orphans.size());
return 1;
}

View File

@ -612,7 +612,7 @@ bool MemPoolAccept::PreChecks(ATMPArgs& args, Workspace& ws)
// is it already in the memory pool?
if (m_pool.exists(hash)) {
statsClient.inc("transactions.duplicate", 1.0f);
::g_stats_client->inc("transactions.duplicate", 1.0f);
return state.Invalid(TxValidationResult::TX_CONFLICT, "txn-already-in-mempool");
}
@ -838,11 +838,11 @@ bool MemPoolAccept::Finalize(const ATMPArgs& args, Workspace& ws)
CAmount nValueOut = tx.GetValueOut();
unsigned int nSigOps = GetTransactionSigOpCount(tx, m_view, STANDARD_SCRIPT_VERIFY_FLAGS);
statsClient.count("transactions.sizeBytes", entry->GetTxSize(), 1.0f);
statsClient.count("transactions.fees", nModifiedFees, 1.0f);
statsClient.count("transactions.inputValue", nValueOut - nModifiedFees, 1.0f);
statsClient.count("transactions.outputValue", nValueOut, 1.0f);
statsClient.count("transactions.sigOps", nSigOps, 1.0f);
::g_stats_client->count("transactions.sizeBytes", entry->GetTxSize(), 1.0f);
::g_stats_client->count("transactions.fees", nModifiedFees, 1.0f);
::g_stats_client->count("transactions.inputValue", nValueOut - nModifiedFees, 1.0f);
::g_stats_client->count("transactions.outputValue", nValueOut, 1.0f);
::g_stats_client->count("transactions.sigOps", nSigOps, 1.0f);
// Add memory address index
if (fAddressIndex) {
@ -895,10 +895,10 @@ MempoolAcceptResult MemPoolAccept::AcceptSingleTransaction(const CTransactionRef
const CTransaction& tx = *ptx;
auto finish = Now<SteadyMilliseconds>();
auto diff = finish - start;
statsClient.timing("AcceptToMemoryPool_ms", count_milliseconds(diff), 1.0f);
statsClient.inc("transactions.accepted", 1.0f);
statsClient.count("transactions.inputs", tx.vin.size(), 1.0f);
statsClient.count("transactions.outputs", tx.vout.size(), 1.0f);
::g_stats_client->timing("AcceptToMemoryPool_ms", count_milliseconds(diff), 1.0f);
::g_stats_client->inc("transactions.accepted", 1.0f);
::g_stats_client->count("transactions.inputs", tx.vin.size(), 1.0f);
::g_stats_client->count("transactions.outputs", tx.vout.size(), 1.0f);
return MempoolAcceptResult::Success(ws.m_base_fees);
}
@ -1319,7 +1319,7 @@ void CChainState::InvalidChainFound(CBlockIndex* pindexNew)
{
AssertLockHeld(cs_main);
statsClient.inc("warnings.InvalidChainFound", 1.0f);
::g_stats_client->inc("warnings.InvalidChainFound", 1.0f);
if (!m_chainman.m_best_invalid || pindexNew->nChainWork > m_chainman.m_best_invalid->nChainWork) {
m_chainman.m_best_invalid = pindexNew;
@ -1343,7 +1343,7 @@ void CChainState::ConflictingChainFound(CBlockIndex* pindexNew)
{
AssertLockHeld(cs_main);
statsClient.inc("warnings.ConflictingChainFound", 1.0f);
::g_stats_client->inc("warnings.ConflictingChainFound", 1.0f);
LogPrintf("%s: conflicting block=%s height=%d log2_work=%f date=%s\n", __func__,
pindexNew->GetBlockHash().ToString(), pindexNew->nHeight,
@ -1362,7 +1362,7 @@ void CChainState::InvalidBlockFound(CBlockIndex *pindex, const BlockValidationSt
{
AssertLockHeld(cs_main);
statsClient.inc("warnings.InvalidBlockFound", 1.0f);
::g_stats_client->inc("warnings.InvalidBlockFound", 1.0f);
if (state.GetResult() != BlockValidationResult::BLOCK_MUTATED) {
pindex->nStatus |= BLOCK_FAILED_VALID;
m_chainman.m_failed_blocks.insert(pindex);
@ -1526,7 +1526,7 @@ bool CheckInputScripts(const CTransaction& tx, TxValidationState &state, const C
auto finish = Now<SteadyMilliseconds>();
auto diff = finish - start;
statsClient.timing("CheckInputScripts_ms", count_milliseconds(diff), 1.0f);
::g_stats_client->timing("CheckInputScripts_ms", count_milliseconds(diff), 1.0f);
return true;
}
@ -1729,7 +1729,7 @@ DisconnectResult CChainState::DisconnectBlock(const CBlock& block, const CBlockI
auto finish = Now<SteadyMilliseconds>();
auto diff = finish - start;
statsClient.timing("DisconnectBlock_ms", count_milliseconds(diff), 1.0f);
::g_stats_client->timing("DisconnectBlock_ms", count_milliseconds(diff), 1.0f);
return fClean ? DISCONNECT_OK : DISCONNECT_UNCLEAN;
}
@ -2296,12 +2296,12 @@ bool CChainState::ConnectBlock(const CBlock& block, BlockValidationState& state,
int64_t nTime8 = GetTimeMicros(); nTimeCallbacks += nTime8 - nTime5;
LogPrint(BCLog::BENCHMARK, " - Callbacks: %.2fms [%.2fs (%.2fms/blk)]\n", MILLI * (nTime8 - nTime5), nTimeCallbacks * MICRO, nTimeCallbacks * MILLI / nBlocksTotal);
statsClient.timing("ConnectBlock_ms", (nTime8 - nTimeStart) / 1000, 1.0f);
statsClient.gauge("blocks.tip.SizeBytes", ::GetSerializeSize(block, PROTOCOL_VERSION), 1.0f);
statsClient.gauge("blocks.tip.Height", m_chain.Height(), 1.0f);
statsClient.gauge("blocks.tip.Version", block.nVersion, 1.0f);
statsClient.gauge("blocks.tip.NumTransactions", block.vtx.size(), 1.0f);
statsClient.gauge("blocks.tip.SigOps", nSigOps, 1.0f);
::g_stats_client->timing("ConnectBlock_ms", (nTime8 - nTimeStart) / 1000, 1.0f);
::g_stats_client->gauge("blocks.tip.SizeBytes", ::GetSerializeSize(block, PROTOCOL_VERSION), 1.0f);
::g_stats_client->gauge("blocks.tip.Height", m_chain.Height(), 1.0f);
::g_stats_client->gauge("blocks.tip.Version", block.nVersion, 1.0f);
::g_stats_client->gauge("blocks.tip.NumTransactions", block.vtx.size(), 1.0f);
::g_stats_client->gauge("blocks.tip.SigOps", nSigOps, 1.0f);
TRACE6(validation, block_connected,
block.GetHash().data(),
@ -2772,7 +2772,7 @@ bool CChainState::ConnectTip(BlockValidationState& state, CBlockIndex* pindexNew
LogPrint(BCLog::BENCHMARK, " - Connect postprocess: %.2fms [%.2fs (%.2fms/blk)]\n", (nTime6 - nTime5) * MILLI, nTimePostConnect * MICRO, nTimePostConnect * MILLI / nBlocksTotal);
LogPrint(BCLog::BENCHMARK, "- Connect block: %.2fms [%.2fs (%.2fms/blk)]\n", (nTime6 - nTime1) * MILLI, nTimeTotal * MICRO, nTimeTotal * MILLI / nBlocksTotal);
statsClient.timing("ConnectTip_ms", (nTime6 - nTime1) / 1000, 1.0f);
::g_stats_client->timing("ConnectTip_ms", (nTime6 - nTime1) / 1000, 1.0f);
connectTrace.BlockConnected(pindexNew, std::move(pthisBlock));
return true;
@ -3084,7 +3084,7 @@ bool CChainState::ActivateBestChain(BlockValidationState& state, std::shared_ptr
auto finish = Now<SteadyMilliseconds>();
auto diff = finish - start;
statsClient.timing("ActivateBestChain_ms", count_milliseconds(diff), 1.0f);
::g_stats_client->timing("ActivateBestChain_ms", count_milliseconds(diff), 1.0f);
// Write changes periodically to disk, after relay.
if (!FlushStateToDisk(state, FlushStateMode::PERIODIC)) {
@ -3572,7 +3572,7 @@ bool CheckBlock(const CBlock& block, BlockValidationState& state, const Consensu
auto finish = Now<SteadyMicroseconds>();
auto diff = finish - start;
statsClient.timing("CheckBlock_us", count_microseconds(diff), 1.0f);
::g_stats_client->timing("CheckBlock_us", count_microseconds(diff), 1.0f);
return true;
}
@ -3943,7 +3943,7 @@ bool CChainState::AcceptBlock(const std::shared_ptr<const CBlock>& pblock, Block
auto finish = Now<SteadyMicroseconds>();
auto diff = finish - start;
statsClient.timing("AcceptBlock_us", count_microseconds(diff), 1.0f);
::g_stats_client->timing("AcceptBlock_us", count_microseconds(diff), 1.0f);
return true;
}

View File

@ -24,8 +24,6 @@ FALSE_POSITIVES = [
("src/qt/networkstyle.cpp", "strprintf(titleAddText, gArgs.GetDevNetName())"),
("src/rpc/evo.cpp", "strprintf(it->second, nParamNum)"),
("src/stacktraces.cpp", "strprintf(fmtStr, i, si.pc, lstr, fstr)"),
("src/statsd_client.cpp", "snprintf(d->errmsg, sizeof(d->errmsg), \"could not create socket, err=%m\")"),
("src/statsd_client.cpp", "snprintf(d->errmsg, sizeof(d->errmsg), \"sendto server fail, host=%s:%d, err=%m\", d->host.c_str(), d->port)"),
("src/util/system.cpp", "strprintf(_(COPYRIGHT_HOLDERS).translated, COPYRIGHT_HOLDERS_SUBSTITUTION)"),
("src/validationinterface.cpp", "LogPrint(BCLog::VALIDATION, fmt \"\\n\", __VA_ARGS__)"),
("src/wallet/wallet.h", "WalletLogPrintf(std::string fmt, Params... parameters)"),