Fix data races in bls_worker and use ctpl_stl queue. (#4240)

Change ctpl implementation to use STL queue & mutex.

Use ctpl synchronized queue instead of boost lockfree queue in bls worker aggregator.

Use smart pointers for memory management of Aggregator and VectorAggregator. With 'delete this;' the objects are prone to data race on the delete operator.

Use smart pointers for memory management of ContributionVerifier.

Pass shared_ptr by value to other threads via worker pool.
This commit is contained in:
gabriel-bjg 2021-07-12 06:25:27 +03:00 committed by GitHub
parent 0f027caba2
commit ee6d2f5b0d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 89 additions and 83 deletions

View File

@ -152,7 +152,7 @@ BITCOIN_CORE_H = \
core_io.h \
core_memusage.h \
cuckoocache.h \
ctpl.h \
ctpl_stl.h \
cxxtimer.hpp \
evo/cbtx.h \
evo/deterministicmns.h \

View File

@ -128,7 +128,7 @@ bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& i
// The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the
// input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator
template <typename T>
struct Aggregator {
struct Aggregator : public std::enable_shared_from_this<Aggregator<T>> {
typedef T ElementType;
size_t batchSize{16};
@ -140,7 +140,7 @@ struct Aggregator {
std::mutex m;
// items in the queue are all intermediate aggregation results of finished batches.
// The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue)
boost::lockfree::queue<T*> aggQueue;
ctpl::detail::Queue<T*> aggQueue;
std::atomic<size_t> aggQueueSize{0};
// keeps track of currently queued/in-progress batches. If it reaches 0, we are done
@ -158,7 +158,6 @@ struct Aggregator {
DoneCallback _doneCallback) :
workerPool(_workerPool),
parallel(_parallel),
aggQueue(0),
doneCallback(std::move(_doneCallback))
{
inputVec = std::make_shared<std::vector<const T*> >(count);
@ -182,19 +181,18 @@ struct Aggregator {
} else {
doneCallback(SyncAggregate(*inputVec, 0, inputVec->size()));
}
delete this;
return;
}
if (batchCount == 1) {
// just a single batch of work, take a shortcut.
PushWork([this](int threadId) {
auto self(this->shared_from_this());
PushWork([this, self](int threadId) {
if (inputVec->size() == 1) {
doneCallback(*(*inputVec)[0]);
} else {
doneCallback(SyncAggregate(*inputVec, 0, inputVec->size()));
}
delete this;
});
return;
}
@ -250,17 +248,18 @@ struct Aggregator {
delete rem[i];
}
doneCallback(r);
delete this;
}
void AsyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
void AsyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*>>& vec, size_t start, size_t count, bool del)
{
IncWait();
PushWork(std::bind(&Aggregator::SyncAggregateAndPushAggQueue, this, vec, start, count, del));
auto self(this->shared_from_this());
PushWork([self, vec, start, count, del](int threadId){
self->SyncAggregateAndPushAggQueue(vec, start, count, del);
});
}
void SyncAggregateAndPushAggQueue(std::shared_ptr<std::vector<const T*> >& vec, size_t start, size_t count, bool del)
void SyncAggregateAndPushAggQueue(const std::shared_ptr<std::vector<const T*>>& vec, size_t start, size_t count, bool del)
{
// aggregate vec and push the intermediate result onto the work queue
PushAggQueue(SyncAggregate(*vec, start, count));
@ -331,7 +330,7 @@ struct Aggregator {
// [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4]
// Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive)
template <typename T>
struct VectorAggregator {
struct VectorAggregator : public std::enable_shared_from_this<VectorAggregator<T>> {
typedef Aggregator<T> AggregatorType;
typedef std::vector<T> VectorType;
typedef std::shared_ptr<VectorType> VectorPtrType;
@ -369,19 +368,15 @@ struct VectorAggregator {
void Start()
{
std::vector<AggregatorType*> aggregators;
for (size_t i = 0; i < vecSize; i++) {
std::vector<const T*> tmp(count);
for (size_t j = 0; j < count; j++) {
tmp[j] = &(*vecs[start + j])[i];
}
auto aggregator = new AggregatorType(std::move(tmp), 0, count, parallel, workerPool, std::bind(&VectorAggregator::CheckDone, this, std::placeholders::_1, i));
// we can't directly start the aggregator here as it might be so fast that it deletes "this" while we are still in this loop
aggregators.emplace_back(aggregator);
}
for (auto agg : aggregators) {
agg->Start();
auto self(this->shared_from_this());
auto aggregator = std::make_shared<AggregatorType>(std::move(tmp), 0, count, parallel, workerPool, [self, i](const T& agg) {self->CheckDone(agg, i);});
aggregator->Start();
}
}
@ -390,14 +385,13 @@ struct VectorAggregator {
(*result)[idx] = agg;
if (++doneCount == vecSize) {
doneCallback(result);
delete this;
}
}
};
// See comment of AsyncVerifyContributionShares for a description on what this does
// Same rules as in Aggregator apply for the inputs
struct ContributionVerifier {
struct ContributionVerifier : public std::enable_shared_from_this<ContributionVerifier> {
struct BatchState {
size_t start;
size_t count;
@ -490,7 +484,6 @@ struct ContributionVerifier {
}
}
doneCallback(result);
delete this;
}
void AsyncAggregate(size_t batchIdx)
@ -498,8 +491,9 @@ struct ContributionVerifier {
auto& batchState = batchStates[batchIdx];
// aggregate vvecs and skShares of batch in parallel
auto vvecAgg = new VectorAggregator<CBLSPublicKey>(vvecs, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggVvecDone, this, batchIdx, std::placeholders::_1));
auto skShareAgg = new Aggregator<CBLSSecretKey>(skShares, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggSkShareDone, this, batchIdx, std::placeholders::_1));
auto self(this->shared_from_this());
auto vvecAgg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, batchState.start, batchState.count, parallel, workerPool, [this, self, batchIdx] (const BLSVerificationVectorPtr& vvec) {HandleAggVvecDone(batchIdx, vvec);});
auto skShareAgg = std::make_shared<Aggregator<CBLSSecretKey>>(skShares, batchState.start, batchState.count, parallel, workerPool, [this, self, batchIdx] (const CBLSSecretKey& skShare) {HandleAggSkShareDone(batchIdx, skShare);});
vvecAgg->Start();
skShareAgg->Start();
@ -547,7 +541,8 @@ struct ContributionVerifier {
void AsyncAggregatedVerifyBatch(size_t batchIdx)
{
auto f = [this, batchIdx](int threadId) {
auto self(this->shared_from_this());
auto f = [this, self, batchIdx](int threadId) {
auto& batchState = batchStates[batchIdx];
bool result = Verify(batchState.vvec, batchState.skShare);
if (result) {
@ -567,7 +562,8 @@ struct ContributionVerifier {
size_t count = batchStates[batchIdx].count;
batchStates[batchIdx].verifyResults.assign(count, 0);
for (size_t i = 0; i < count; i++) {
auto f = [this, i, batchIdx](int threadId) {
auto self(this->shared_from_this());
auto f = [this, self, i, batchIdx](int threadId) {
auto& batchState = batchStates[batchIdx];
batchState.verifyResults[i] = Verify(vvecs[batchState.start + i], skShares[batchState.start + i]);
HandleVerifyDone(1);
@ -614,7 +610,7 @@ void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector<BLSVerific
return;
}
auto agg = new VectorAggregator<CBLSPublicKey>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
auto agg = std::make_shared<VectorAggregator<CBLSPublicKey>>(vvecs, start, count, parallel, workerPool, std::move(doneCallback));
agg->Start();
}
@ -649,7 +645,7 @@ void AsyncAggregateHelper(ctpl::thread_pool& workerPool,
return;
}
auto agg = new Aggregator<T>(vec, start, count, parallel, workerPool, std::move(doneCallback));
auto agg = std::make_shared<Aggregator<T>>(vec, start, count, parallel, workerPool, std::move(doneCallback));
agg->Start();
}
@ -734,7 +730,7 @@ void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::v
return;
}
auto verifier = new ContributionVerifier(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
auto verifier = std::make_shared<ContributionVerifier>(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback));
verifier->Start();
}

View File

@ -7,7 +7,7 @@
#include <bls/bls.h>
#include <ctpl.h>
#include <ctpl_stl.h>
#include <future>
#include <mutex>

View File

@ -1,25 +1,24 @@
/*********************************************************
*
* Copyright (C) 2014 by Vitaliy Vitsentiy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*********************************************************/
*
* Copyright (C) 2014 by Vitaliy Vitsentiy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*********************************************************/
#ifndef __ctpl_thread_pool_H__
#define __ctpl_thread_pool_H__
#ifndef __ctpl_stl_thread_pool_H__
#define __ctpl_stl_thread_pool_H__
#include <functional>
#include <thread>
@ -29,13 +28,9 @@
#include <exception>
#include <future>
#include <mutex>
#include <boost/lockfree/queue.hpp>
#include <queue>
#ifndef _ctplThreadPoolLength_
#define _ctplThreadPoolLength_ 100
#endif
// thread pool to run user's functors with signature
// ret func(int id, other_params)
@ -45,12 +40,40 @@
namespace ctpl {
namespace detail {
template <typename T>
class Queue {
public:
bool push(T const & value) {
std::unique_lock<std::mutex> lock(this->mutex);
this->q.push(value);
return true;
}
// deletes the retrieved element, do not use for non integral types
bool pop(T & v) {
std::unique_lock<std::mutex> lock(this->mutex);
if (this->q.empty())
return false;
v = this->q.front();
this->q.pop();
return true;
}
bool empty() {
std::unique_lock<std::mutex> lock(this->mutex);
return this->q.empty();
}
private:
std::queue<T> q;
std::mutex mutex;
};
}
class thread_pool {
public:
thread_pool() : q(_ctplThreadPoolLength_) { this->init(); }
thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); }
thread_pool() { this->init(); }
thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
// the destructor waits for all the functions in the queue to be finished
~thread_pool() {
@ -99,24 +122,22 @@ namespace ctpl {
void clear_queue() {
std::function<void(int id)> * _f;
while (this->q.pop(_f))
delete _f; // empty the queue
delete _f; // empty the queue
}
// pops a functional wraper to the original function
// pops a functional wrapper to the original function
std::function<void(int)> pop() {
std::function<void(int id)> * _f = nullptr;
this->q.pop(_f);
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::function<void(int)> f;
if (_f)
f = *_f;
return f;
}
// wait for all computing threads to finish and stop all threads
// may be called asyncronously to not pause the calling thread while waiting
// may be called asynchronously to not pause the calling thread while waiting
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
void stop(bool isWait = false) {
if (!isWait) {
@ -151,17 +172,14 @@ namespace ctpl {
template<typename F, typename... Rest>
auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
);
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
@ -170,15 +188,12 @@ namespace ctpl {
template<typename F>
auto push(F && f) ->std::future<decltype(f(0))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
@ -192,40 +207,37 @@ namespace ctpl {
thread_pool & operator=(thread_pool &&);// = delete;
void set_thread(int i) {
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
std::atomic<bool> & _flag = *flag;
std::function<void(int id)> * _f;
bool isPop = this->q.pop(_f);
while (true) {
while (isPop) { // if there is anything in the queue
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);
if (_flag)
return; // the thread is wanted to stop, return even if the queue is not empty yet
else
isPop = this->q.pop(_f);
}
// the queue is empty here, wait for the next command
std::unique_lock<std::mutex> lock(this->mutex);
++this->nWaiting;
this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
--this->nWaiting;
if (!isPop)
return; // if the queue is empty and this->isDone == true or *flag then return
}
};
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}
void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
mutable boost::lockfree::queue<std::function<void(int id)> *> q;
detail::Queue<std::function<void(int id)> *> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // how many threads are waiting
@ -236,5 +248,4 @@ namespace ctpl {
}
#endif // __ctpl_thread_pool_H__
#endif // __ctpl_stl_thread_pool_H__

View File

@ -6,7 +6,7 @@
#define BITCOIN_LLMQ_QUORUMS_DKGSESSIONHANDLER_H
#include <ctpl.h>
#include <ctpl_stl.h>
#include <net.h>
class CBLSWorker;

View File

@ -8,7 +8,7 @@
#include <support/allocators/secure.h>
#include <chainparamsbase.h>
#include <ctpl.h>
#include <ctpl_stl.h>
#include <random.h>
#include <serialize.h>
#include <stacktraces.h>

View File

@ -10,7 +10,7 @@ export LC_ALL=C
HEADER_ID_PREFIX="BITCOIN_"
HEADER_ID_SUFFIX="_H"
REGEXP_EXCLUDE_FILES_WITH_PREFIX="src/(crypto/ctaes/|leveldb/|secp256k1/|tinyformat.h|bench/nanobench.h|univalue/|ctpl.h|bls/|crypto/sph)"
REGEXP_EXCLUDE_FILES_WITH_PREFIX="src/(crypto/ctaes/|leveldb/|secp256k1/|tinyformat.h|bench/nanobench.h|univalue/|ctpl_stl.h|bls/|crypto/sph)"
EXIT_CODE=0
for HEADER_FILE in $(git ls-files -- "*.h" | grep -vE "^${REGEXP_EXCLUDE_FILES_WITH_PREFIX}")

View File

@ -59,7 +59,6 @@ EXPECTED_BOOST_INCLUDES=(
boost/filesystem/fstream.hpp
boost/function.hpp
boost/lexical_cast.hpp
boost/lockfree/queue.hpp
boost/multi_index/hashed_index.hpp
boost/multi_index/ordered_index.hpp
boost/multi_index/sequenced_index.hpp