refactor: Make bls worker aggregator batch size const (rename it accordingly)

This commit is contained in:
UdjinM6 2024-11-01 15:22:59 +03:00
parent 0cf7d4a65e
commit 38b304c7b0
No known key found for this signature in database
GPG Key ID: 83592BD1400D58D9

View File

@ -124,7 +124,7 @@ bool CBLSWorker::GenerateContributions(int quorumThreshold, Span<CBLSId> ids, BL
// input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator
template <typename T>
struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
size_t batchSize{16};
const size_t BATCH_SIZE{16};
std::shared_ptr<std::vector<const T*> > inputVec;
bool parallel;
@ -164,7 +164,7 @@ struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
// If parallel=true, then this will return fast, otherwise this will block until aggregation is done
void Start()
{
size_t batchCount = (inputVec->size() + batchSize - 1) / batchSize;
size_t batchCount = (inputVec->size() + BATCH_SIZE - 1) / BATCH_SIZE;
if (!parallel) {
if (inputVec->size() == 1) {
@ -191,8 +191,8 @@ struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
// increment wait counter as otherwise the first finished async aggregation might signal that we're done
IncWait();
for (size_t i = 0; i < batchCount; i++) {
size_t start = i * batchSize;
size_t count = std::min(batchSize, inputVec->size() - start);
size_t start = i * BATCH_SIZE;
size_t count = std::min(BATCH_SIZE, inputVec->size() - start);
AsyncAggregateAndPushAggQueue(inputVec, start, count, false);
}
// this will decrement the wait counter and in most cases NOT finish, as async work is still in progress
@ -272,24 +272,24 @@ struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
throw;
}
if (++aggQueueSize >= batchSize) {
if (++aggQueueSize >= BATCH_SIZE) {
// we've collected enough intermediate results to form a new batch.
std::shared_ptr<std::vector<const T*> > newBatch;
{
std::unique_lock<std::mutex> l(m);
if (aggQueueSize < batchSize) {
if (aggQueueSize < BATCH_SIZE) {
// some other worker thread grabbed this batch
return;
}
newBatch = std::make_shared<std::vector<const T*> >(batchSize);
newBatch = std::make_shared<std::vector<const T*>>(BATCH_SIZE);
// collect items for new batch
for (size_t i = 0; i < batchSize; i++) {
for (size_t i = 0; i < BATCH_SIZE; i++) {
T* p = nullptr;
bool s = aggQueue.pop(p);
assert(s);
(*newBatch)[i] = p;
}
aggQueueSize -= batchSize;
aggQueueSize -= BATCH_SIZE;
}
// push new batch to work queue. del=true this time as these items are intermediate results and need to be deleted