From ee6d2f5b0d5379cd8d7b0c1b4191bce671d39603 Mon Sep 17 00:00:00 2001 From: gabriel-bjg Date: Mon, 12 Jul 2021 06:25:27 +0300 Subject: [PATCH] Fix data races in bls_worker and use ctpl_stl queue. (#4240) Change ctpl implementation to use STL queue & mutex. Use ctpl synchronized queue instead of boost lockfree queue in bls worker aggregator. Use smart pointers for memory management of Aggregator and VectorAggregator. With 'delete this;' the objects are prone to data race on the delete operator. Use smart pointers for memory management of ContributionVerifier. Pass shared_ptr by value to other threads via worker pool. --- src/Makefile.am | 2 +- src/bls/bls_worker.cpp | 54 +++++++------- src/bls/bls_worker.h | 2 +- src/{ctpl.h => ctpl_stl.h} | 107 +++++++++++++++------------ src/llmq/quorums_dkgsessionhandler.h | 2 +- src/util/system.cpp | 2 +- test/lint/lint-include-guards.sh | 2 +- test/lint/lint-includes.sh | 1 - 8 files changed, 89 insertions(+), 83 deletions(-) rename src/{ctpl.h => ctpl_stl.h} (75%) diff --git a/src/Makefile.am b/src/Makefile.am index ccd7c9863f..4cb6c7269f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -152,7 +152,7 @@ BITCOIN_CORE_H = \ core_io.h \ core_memusage.h \ cuckoocache.h \ - ctpl.h \ + ctpl_stl.h \ cxxtimer.hpp \ evo/cbtx.h \ evo/deterministicmns.h \ diff --git a/src/bls/bls_worker.cpp b/src/bls/bls_worker.cpp index 5cf8aab173..379a9d2fbb 100644 --- a/src/bls/bls_worker.cpp +++ b/src/bls/bls_worker.cpp @@ -128,7 +128,7 @@ bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& i // The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the // input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator template -struct Aggregator { +struct Aggregator : public std::enable_shared_from_this> { typedef T ElementType; size_t batchSize{16}; @@ -140,7 +140,7 @@ struct Aggregator { std::mutex m; // items in the queue are all intermediate aggregation results of finished batches. // The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue) - boost::lockfree::queue aggQueue; + ctpl::detail::Queue aggQueue; std::atomic aggQueueSize{0}; // keeps track of currently queued/in-progress batches. If it reaches 0, we are done @@ -158,7 +158,6 @@ struct Aggregator { DoneCallback _doneCallback) : workerPool(_workerPool), parallel(_parallel), - aggQueue(0), doneCallback(std::move(_doneCallback)) { inputVec = std::make_shared >(count); @@ -182,19 +181,18 @@ struct Aggregator { } else { doneCallback(SyncAggregate(*inputVec, 0, inputVec->size())); } - delete this; return; } if (batchCount == 1) { // just a single batch of work, take a shortcut. - PushWork([this](int threadId) { + auto self(this->shared_from_this()); + PushWork([this, self](int threadId) { if (inputVec->size() == 1) { doneCallback(*(*inputVec)[0]); } else { doneCallback(SyncAggregate(*inputVec, 0, inputVec->size())); } - delete this; }); return; } @@ -250,17 +248,18 @@ struct Aggregator { delete rem[i]; } doneCallback(r); - - delete this; } - void AsyncAggregateAndPushAggQueue(std::shared_ptr >& vec, size_t start, size_t count, bool del) + void AsyncAggregateAndPushAggQueue(const std::shared_ptr>& vec, size_t start, size_t count, bool del) { IncWait(); - PushWork(std::bind(&Aggregator::SyncAggregateAndPushAggQueue, this, vec, start, count, del)); + auto self(this->shared_from_this()); + PushWork([self, vec, start, count, del](int threadId){ + self->SyncAggregateAndPushAggQueue(vec, start, count, del); + }); } - void SyncAggregateAndPushAggQueue(std::shared_ptr >& vec, size_t start, size_t count, bool del) + void SyncAggregateAndPushAggQueue(const std::shared_ptr>& vec, size_t start, size_t count, bool del) { // aggregate vec and push the intermediate result onto the work queue PushAggQueue(SyncAggregate(*vec, start, count)); @@ -331,7 +330,7 @@ struct Aggregator { // [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4] // Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive) template -struct VectorAggregator { +struct VectorAggregator : public std::enable_shared_from_this> { typedef Aggregator AggregatorType; typedef std::vector VectorType; typedef std::shared_ptr VectorPtrType; @@ -369,19 +368,15 @@ struct VectorAggregator { void Start() { - std::vector aggregators; for (size_t i = 0; i < vecSize; i++) { std::vector tmp(count); for (size_t j = 0; j < count; j++) { tmp[j] = &(*vecs[start + j])[i]; } - auto aggregator = new AggregatorType(std::move(tmp), 0, count, parallel, workerPool, std::bind(&VectorAggregator::CheckDone, this, std::placeholders::_1, i)); - // we can't directly start the aggregator here as it might be so fast that it deletes "this" while we are still in this loop - aggregators.emplace_back(aggregator); - } - for (auto agg : aggregators) { - agg->Start(); + auto self(this->shared_from_this()); + auto aggregator = std::make_shared(std::move(tmp), 0, count, parallel, workerPool, [self, i](const T& agg) {self->CheckDone(agg, i);}); + aggregator->Start(); } } @@ -390,14 +385,13 @@ struct VectorAggregator { (*result)[idx] = agg; if (++doneCount == vecSize) { doneCallback(result); - delete this; } } }; // See comment of AsyncVerifyContributionShares for a description on what this does // Same rules as in Aggregator apply for the inputs -struct ContributionVerifier { +struct ContributionVerifier : public std::enable_shared_from_this { struct BatchState { size_t start; size_t count; @@ -490,7 +484,6 @@ struct ContributionVerifier { } } doneCallback(result); - delete this; } void AsyncAggregate(size_t batchIdx) @@ -498,8 +491,9 @@ struct ContributionVerifier { auto& batchState = batchStates[batchIdx]; // aggregate vvecs and skShares of batch in parallel - auto vvecAgg = new VectorAggregator(vvecs, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggVvecDone, this, batchIdx, std::placeholders::_1)); - auto skShareAgg = new Aggregator(skShares, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggSkShareDone, this, batchIdx, std::placeholders::_1)); + auto self(this->shared_from_this()); + auto vvecAgg = std::make_shared>(vvecs, batchState.start, batchState.count, parallel, workerPool, [this, self, batchIdx] (const BLSVerificationVectorPtr& vvec) {HandleAggVvecDone(batchIdx, vvec);}); + auto skShareAgg = std::make_shared>(skShares, batchState.start, batchState.count, parallel, workerPool, [this, self, batchIdx] (const CBLSSecretKey& skShare) {HandleAggSkShareDone(batchIdx, skShare);}); vvecAgg->Start(); skShareAgg->Start(); @@ -547,7 +541,8 @@ struct ContributionVerifier { void AsyncAggregatedVerifyBatch(size_t batchIdx) { - auto f = [this, batchIdx](int threadId) { + auto self(this->shared_from_this()); + auto f = [this, self, batchIdx](int threadId) { auto& batchState = batchStates[batchIdx]; bool result = Verify(batchState.vvec, batchState.skShare); if (result) { @@ -567,7 +562,8 @@ struct ContributionVerifier { size_t count = batchStates[batchIdx].count; batchStates[batchIdx].verifyResults.assign(count, 0); for (size_t i = 0; i < count; i++) { - auto f = [this, i, batchIdx](int threadId) { + auto self(this->shared_from_this()); + auto f = [this, self, i, batchIdx](int threadId) { auto& batchState = batchStates[batchIdx]; batchState.verifyResults[i] = Verify(vvecs[batchState.start + i], skShares[batchState.start + i]); HandleVerifyDone(1); @@ -614,7 +610,7 @@ void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector(vvecs, start, count, parallel, workerPool, std::move(doneCallback)); + auto agg = std::make_shared>(vvecs, start, count, parallel, workerPool, std::move(doneCallback)); agg->Start(); } @@ -649,7 +645,7 @@ void AsyncAggregateHelper(ctpl::thread_pool& workerPool, return; } - auto agg = new Aggregator(vec, start, count, parallel, workerPool, std::move(doneCallback)); + auto agg = std::make_shared>(vec, start, count, parallel, workerPool, std::move(doneCallback)); agg->Start(); } @@ -734,7 +730,7 @@ void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::v return; } - auto verifier = new ContributionVerifier(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback)); + auto verifier = std::make_shared(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback)); verifier->Start(); } diff --git a/src/bls/bls_worker.h b/src/bls/bls_worker.h index a6f505bd0f..a2f8fad23a 100644 --- a/src/bls/bls_worker.h +++ b/src/bls/bls_worker.h @@ -7,7 +7,7 @@ #include -#include +#include #include #include diff --git a/src/ctpl.h b/src/ctpl_stl.h similarity index 75% rename from src/ctpl.h rename to src/ctpl_stl.h index 07c9a00be1..a25145e29b 100644 --- a/src/ctpl.h +++ b/src/ctpl_stl.h @@ -1,25 +1,24 @@ - /********************************************************* - * - * Copyright (C) 2014 by Vitaliy Vitsentiy - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - *********************************************************/ +* +* Copyright (C) 2014 by Vitaliy Vitsentiy +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*********************************************************/ -#ifndef __ctpl_thread_pool_H__ -#define __ctpl_thread_pool_H__ +#ifndef __ctpl_stl_thread_pool_H__ +#define __ctpl_stl_thread_pool_H__ #include #include @@ -29,13 +28,9 @@ #include #include #include -#include +#include -#ifndef _ctplThreadPoolLength_ -#define _ctplThreadPoolLength_ 100 -#endif - // thread pool to run user's functors with signature // ret func(int id, other_params) @@ -45,12 +40,40 @@ namespace ctpl { + namespace detail { + template + class Queue { + public: + bool push(T const & value) { + std::unique_lock lock(this->mutex); + this->q.push(value); + return true; + } + // deletes the retrieved element, do not use for non integral types + bool pop(T & v) { + std::unique_lock lock(this->mutex); + if (this->q.empty()) + return false; + v = this->q.front(); + this->q.pop(); + return true; + } + bool empty() { + std::unique_lock lock(this->mutex); + return this->q.empty(); + } + private: + std::queue q; + std::mutex mutex; + }; + } + class thread_pool { public: - thread_pool() : q(_ctplThreadPoolLength_) { this->init(); } - thread_pool(int nThreads, int queueSize = _ctplThreadPoolLength_) : q(queueSize) { this->init(); this->resize(nThreads); } + thread_pool() { this->init(); } + thread_pool(int nThreads) { this->init(); this->resize(nThreads); } // the destructor waits for all the functions in the queue to be finished ~thread_pool() { @@ -99,24 +122,22 @@ namespace ctpl { void clear_queue() { std::function * _f; while (this->q.pop(_f)) - delete _f; // empty the queue + delete _f; // empty the queue } - // pops a functional wraper to the original function + // pops a functional wrapper to the original function std::function pop() { std::function * _f = nullptr; this->q.pop(_f); - std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred - + std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred std::function f; if (_f) f = *_f; return f; } - // wait for all computing threads to finish and stop all threads - // may be called asyncronously to not pause the calling thread while waiting + // may be called asynchronously to not pause the calling thread while waiting // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions void stop(bool isWait = false) { if (!isWait) { @@ -151,17 +172,14 @@ namespace ctpl { template auto push(F && f, Rest&&... rest) ->std::future { auto pck = std::make_shared>( - std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...) + std::bind(std::forward(f), std::placeholders::_1, std::forward(rest)...) ); - auto _f = new std::function([pck](int id) { (*pck)(id); }); this->q.push(_f); - std::unique_lock lock(this->mutex); this->cv.notify_one(); - return pck->get_future(); } @@ -170,15 +188,12 @@ namespace ctpl { template auto push(F && f) ->std::future { auto pck = std::make_shared>(std::forward(f)); - auto _f = new std::function([pck](int id) { (*pck)(id); }); this->q.push(_f); - std::unique_lock lock(this->mutex); this->cv.notify_one(); - return pck->get_future(); } @@ -192,40 +207,37 @@ namespace ctpl { thread_pool & operator=(thread_pool &&);// = delete; void set_thread(int i) { - std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag + std::shared_ptr> flag(this->flags[i]); // a copy of the shared ptr to the flag auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { std::atomic & _flag = *flag; std::function * _f; bool isPop = this->q.pop(_f); while (true) { while (isPop) { // if there is anything in the queue - std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred + std::unique_ptr> func(_f); // at return, delete the function even if an exception occurred (*_f)(i); - if (_flag) return; // the thread is wanted to stop, return even if the queue is not empty yet else isPop = this->q.pop(_f); } - // the queue is empty here, wait for the next command std::unique_lock lock(this->mutex); ++this->nWaiting; this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); --this->nWaiting; - if (!isPop) return; // if the queue is empty and this->isDone == true or *flag then return } }; - this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() + this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() } void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } std::vector> threads; std::vector>> flags; - mutable boost::lockfree::queue *> q; + detail::Queue *> q; std::atomic isDone; std::atomic isStop; std::atomic nWaiting; // how many threads are waiting @@ -236,5 +248,4 @@ namespace ctpl { } -#endif // __ctpl_thread_pool_H__ - +#endif // __ctpl_stl_thread_pool_H__ diff --git a/src/llmq/quorums_dkgsessionhandler.h b/src/llmq/quorums_dkgsessionhandler.h index 6e82fa1991..abbefae5d6 100644 --- a/src/llmq/quorums_dkgsessionhandler.h +++ b/src/llmq/quorums_dkgsessionhandler.h @@ -6,7 +6,7 @@ #define BITCOIN_LLMQ_QUORUMS_DKGSESSIONHANDLER_H -#include +#include #include class CBLSWorker; diff --git a/src/util/system.cpp b/src/util/system.cpp index 1a5c008a2f..c3eb1ba361 100644 --- a/src/util/system.cpp +++ b/src/util/system.cpp @@ -8,7 +8,7 @@ #include #include -#include +#include #include #include #include diff --git a/test/lint/lint-include-guards.sh b/test/lint/lint-include-guards.sh index 6ec396196e..30d88dda03 100755 --- a/test/lint/lint-include-guards.sh +++ b/test/lint/lint-include-guards.sh @@ -10,7 +10,7 @@ export LC_ALL=C HEADER_ID_PREFIX="BITCOIN_" HEADER_ID_SUFFIX="_H" -REGEXP_EXCLUDE_FILES_WITH_PREFIX="src/(crypto/ctaes/|leveldb/|secp256k1/|tinyformat.h|bench/nanobench.h|univalue/|ctpl.h|bls/|crypto/sph)" +REGEXP_EXCLUDE_FILES_WITH_PREFIX="src/(crypto/ctaes/|leveldb/|secp256k1/|tinyformat.h|bench/nanobench.h|univalue/|ctpl_stl.h|bls/|crypto/sph)" EXIT_CODE=0 for HEADER_FILE in $(git ls-files -- "*.h" | grep -vE "^${REGEXP_EXCLUDE_FILES_WITH_PREFIX}") diff --git a/test/lint/lint-includes.sh b/test/lint/lint-includes.sh index 81bdb4661a..731d10307f 100755 --- a/test/lint/lint-includes.sh +++ b/test/lint/lint-includes.sh @@ -59,7 +59,6 @@ EXPECTED_BOOST_INCLUDES=( boost/filesystem/fstream.hpp boost/function.hpp boost/lexical_cast.hpp - boost/lockfree/queue.hpp boost/multi_index/hashed_index.hpp boost/multi_index/ordered_index.hpp boost/multi_index/sequenced_index.hpp