// Copyright (c) 2018-2021 The Dash Core developers // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. #include #include #include #include #include #include template bool VerifyVectorHelper(const std::vector& vec, size_t start, size_t count) { if (start == 0 && count == 0) { count = vec.size(); } std::set set; for (size_t i = start; i < start + count; i++) { if (!vec[i].IsValid()) return false; // check duplicates if (!set.emplace(vec[i].GetHash()).second) { return false; } } return true; } // Creates a doneCallback and a future. The doneCallback simply finishes the future template std::pair, std::future > BuildFutureDoneCallback() { auto p = std::make_shared >(); std::function f = [p](const T& v) { p->set_value(v); }; return std::make_pair(std::move(f), p->get_future()); } template std::pair, std::future > BuildFutureDoneCallback2() { auto p = std::make_shared >(); std::function f = [p](T v) { p->set_value(v); }; return std::make_pair(std::move(f), p->get_future()); } ///// CBLSWorker::CBLSWorker() { } CBLSWorker::~CBLSWorker() { Stop(); } void CBLSWorker::Start() { int workerCount = std::thread::hardware_concurrency() / 2; workerCount = std::max(std::min(1, workerCount), 4); workerPool.resize(workerCount); RenameThreadPool(workerPool, "dash-bls-work"); } void CBLSWorker::Stop() { workerPool.clear_queue(); workerPool.stop(true); } bool CBLSWorker::GenerateContributions(int quorumThreshold, const BLSIdVector& ids, BLSVerificationVectorPtr& vvecRet, BLSSecretKeyVector& skSharesRet) { BLSSecretKeyVectorPtr svec = std::make_shared((size_t)quorumThreshold); vvecRet = std::make_shared((size_t)quorumThreshold); skSharesRet.resize(ids.size()); for (int i = 0; i < quorumThreshold; i++) { (*svec)[i].MakeNewKey(); } std::list > futures; size_t batchSize = 8; for (size_t i = 0; i < quorumThreshold; i += batchSize) { size_t start = i; size_t count = std::min(batchSize, quorumThreshold - start); auto f = [&, start, count](int threadId) { for (size_t j = start; j < start + count; j++) { (*vvecRet)[j] = (*svec)[j].GetPublicKey(); } return true; }; futures.emplace_back(workerPool.push(f)); } for (size_t i = 0; i < ids.size(); i += batchSize) { size_t start = i; size_t count = std::min(batchSize, ids.size() - start); auto f = [&, start, count](int threadId) { for (size_t j = start; j < start + count; j++) { if (!skSharesRet[j].SecretKeyShare(*svec, ids[j])) { return false; } } return true; }; futures.emplace_back(workerPool.push(f)); } bool success = true; for (auto& f : futures) { if (!f.get()) { success = false; } } return success; } // aggregates a single vector of BLS objects in parallel // the input vector is split into batches and each batch is aggregated in parallel // when enough batches are finished to form a new batch, the new batch is queued for further parallel aggregation // when no more batches can be created from finished batch results, the final aggregated is created and the doneCallback // called. // The Aggregator object needs to be created on the heap and it will delete itself after calling the doneCallback // The input vector is not copied into the Aggregator but instead a vector of pointers to the original entries from the // input vector is stored. This means that the input vector must stay alive for the whole lifetime of the Aggregator template struct Aggregator { typedef T ElementType; size_t batchSize{16}; std::shared_ptr > inputVec; bool parallel; ctpl::thread_pool& workerPool; std::mutex m; // items in the queue are all intermediate aggregation results of finished batches. // The intermediate results must be deleted by us again (which we do in SyncAggregateAndPushAggQueue) boost::lockfree::queue aggQueue; std::atomic aggQueueSize{0}; // keeps track of currently queued/in-progress batches. If it reaches 0, we are done std::atomic waitCount{0}; typedef std::function DoneCallback; DoneCallback doneCallback; // TP can either be a pointer or a reference template Aggregator(const std::vector& _inputVec, size_t start, size_t count, bool _parallel, ctpl::thread_pool& _workerPool, DoneCallback _doneCallback) : workerPool(_workerPool), parallel(_parallel), aggQueue(0), doneCallback(std::move(_doneCallback)) { inputVec = std::make_shared >(count); for (size_t i = 0; i < count; i++) { (*inputVec)[i] = pointer(_inputVec[start + i]); } } const T* pointer(const T& v) { return &v; } const T* pointer(const T* v) { return v; } // Starts aggregation. // If parallel=true, then this will return fast, otherwise this will block until aggregation is done void Start() { size_t batchCount = (inputVec->size() + batchSize - 1) / batchSize; if (!parallel) { if (inputVec->size() == 1) { doneCallback(*(*inputVec)[0]); } else { doneCallback(SyncAggregate(*inputVec, 0, inputVec->size())); } delete this; return; } if (batchCount == 1) { // just a single batch of work, take a shortcut. PushWork([this](int threadId) { if (inputVec->size() == 1) { doneCallback(*(*inputVec)[0]); } else { doneCallback(SyncAggregate(*inputVec, 0, inputVec->size())); } delete this; }); return; } // increment wait counter as otherwise the first finished async aggregation might signal that we're done IncWait(); for (size_t i = 0; i < batchCount; i++) { size_t start = i * batchSize; size_t count = std::min(batchSize, inputVec->size() - start); AsyncAggregateAndPushAggQueue(inputVec, start, count, false); } // this will decrement the wait counter and in most cases NOT finish, as async work is still in progress CheckDone(); } void IncWait() { ++waitCount; } void CheckDone() { if (--waitCount == 0) { Finish(); } } void Finish() { // All async work is done, but we might have items in the aggQueue which are the results of the async // work. This is the case when these did not add up to a new batch. In this case, we have to aggregate // the items into the final result std::vector rem(aggQueueSize); for (size_t i = 0; i < rem.size(); i++) { T* p = nullptr; bool s = aggQueue.pop(p); assert(s); rem[i] = p; } T r; if (rem.size() == 1) { // just one intermediate result, which is actually the final result r = *rem[0]; } else { // multiple intermediate results left which did not add up to a new batch. aggregate them now r = SyncAggregate(rem, 0, rem.size()); } // all items which are left in the queue are intermediate results, so we must delete them for (size_t i = 0; i < rem.size(); i++) { delete rem[i]; } doneCallback(r); delete this; } void AsyncAggregateAndPushAggQueue(std::shared_ptr >& vec, size_t start, size_t count, bool del) { IncWait(); PushWork(std::bind(&Aggregator::SyncAggregateAndPushAggQueue, this, vec, start, count, del)); } void SyncAggregateAndPushAggQueue(std::shared_ptr >& vec, size_t start, size_t count, bool del) { // aggregate vec and push the intermediate result onto the work queue PushAggQueue(SyncAggregate(*vec, start, count)); if (del) { for (size_t i = 0; i < count; i++) { delete (*vec)[start + i]; } } CheckDone(); } void PushAggQueue(const T& v) { aggQueue.push(new T(v)); if (++aggQueueSize >= batchSize) { // we've collected enough intermediate results to form a new batch. std::shared_ptr > newBatch; { std::unique_lock l(m); if (aggQueueSize < batchSize) { // some other worker thread grabbed this batch return; } newBatch = std::make_shared >(batchSize); // collect items for new batch for (size_t i = 0; i < batchSize; i++) { T* p = nullptr; bool s = aggQueue.pop(p); assert(s); (*newBatch)[i] = p; } aggQueueSize -= batchSize; } // push new batch to work queue. del=true this time as these items are intermediate results and need to be deleted // after aggregation is done AsyncAggregateAndPushAggQueue(newBatch, 0, newBatch->size(), true); } } template T SyncAggregate(const std::vector& vec, size_t start, size_t count) { T result = *vec[start]; for (size_t j = 1; j < count; j++) { result.AggregateInsecure(*vec[start + j]); } return result; } template void PushWork(Callable&& f) { workerPool.push(f); } }; // Aggregates multiple input vectors into a single output vector // Inputs are in the following form: // [ // [a1, b1, c1, d1], // [a2, b2, c2, d2], // [a3, b3, c3, d3], // [a4, b4, c4, d4], // ] // The result is in the following form: // [ a1+a2+a3+a4, b1+b2+b3+b4, c1+c2+c3+c4, d1+d2+d3+d4] // Same rules for the input vectors apply to the VectorAggregator as for the Aggregator (they must stay alive) template struct VectorAggregator { typedef Aggregator AggregatorType; typedef std::vector VectorType; typedef std::shared_ptr VectorPtrType; typedef std::vector VectorVectorType; typedef std::function DoneCallback; DoneCallback doneCallback; const VectorVectorType& vecs; size_t start; size_t count; bool parallel; ctpl::thread_pool& workerPool; std::atomic doneCount; VectorPtrType result; size_t vecSize; VectorAggregator(const VectorVectorType& _vecs, size_t _start, size_t _count, bool _parallel, ctpl::thread_pool& _workerPool, DoneCallback _doneCallback) : vecs(_vecs), parallel(_parallel), start(_start), count(_count), workerPool(_workerPool), doneCallback(std::move(_doneCallback)) { assert(!vecs.empty()); vecSize = vecs[0]->size(); result = std::make_shared(vecSize); doneCount = 0; } void Start() { std::vector aggregators; for (size_t i = 0; i < vecSize; i++) { std::vector tmp(count); for (size_t j = 0; j < count; j++) { tmp[j] = &(*vecs[start + j])[i]; } auto aggregator = new AggregatorType(std::move(tmp), 0, count, parallel, workerPool, std::bind(&VectorAggregator::CheckDone, this, std::placeholders::_1, i)); // we can't directly start the aggregator here as it might be so fast that it deletes "this" while we are still in this loop aggregators.emplace_back(aggregator); } for (auto agg : aggregators) { agg->Start(); } } void CheckDone(const T& agg, size_t idx) { (*result)[idx] = agg; if (++doneCount == vecSize) { doneCallback(result); delete this; } } }; // See comment of AsyncVerifyContributionShares for a description on what this does // Same rules as in Aggregator apply for the inputs struct ContributionVerifier { struct BatchState { size_t start; size_t count; BLSVerificationVectorPtr vvec; CBLSSecretKey skShare; // starts with 0 and is incremented if either vvec or skShare aggregation finishes. If it reaches 2, we know // that aggregation for this batch is fully done. We can then start verification. std::unique_ptr > aggDone; // we can't directly update a vector in parallel // as vector is not thread safe (uses bitsets internally) // so we must use vector temporarily and concatenate/convert // each batch result into a final vector std::vector verifyResults; }; CBLSId forId; const std::vector& vvecs; const BLSSecretKeyVector& skShares; size_t batchSize; bool parallel; bool aggregated; ctpl::thread_pool& workerPool; size_t batchCount; size_t verifyCount; std::vector batchStates; std::atomic verifyDoneCount{0}; std::function&)> doneCallback; ContributionVerifier(CBLSId _forId, const std::vector& _vvecs, const BLSSecretKeyVector& _skShares, size_t _batchSize, bool _parallel, bool _aggregated, ctpl::thread_pool& _workerPool, std::function&)> _doneCallback) : forId(std::move(_forId)), vvecs(_vvecs), skShares(_skShares), batchSize(_batchSize), parallel(_parallel), aggregated(_aggregated), workerPool(_workerPool), doneCallback(std::move(_doneCallback)) { } void Start() { if (!aggregated) { // treat all inputs as one large batch batchSize = vvecs.size(); batchCount = 1; } else { batchCount = (vvecs.size() + batchSize - 1) / batchSize; } verifyCount = vvecs.size(); batchStates.resize(batchCount); for (size_t i = 0; i < batchCount; i++) { auto& batchState = batchStates[i]; batchState.aggDone = std::make_unique>(0); batchState.start = i * batchSize; batchState.count = std::min(batchSize, vvecs.size() - batchState.start); batchState.verifyResults.assign(batchState.count, 0); } if (aggregated) { size_t batchCount2 = batchCount; // 'this' might get deleted while we're still looping for (size_t i = 0; i < batchCount2; i++) { AsyncAggregate(i); } } else { // treat all inputs as a single batch and verify one-by-one AsyncVerifyBatchOneByOne(0); } } void Finish() { size_t batchIdx = 0; std::vector result(vvecs.size()); for (size_t i = 0; i < vvecs.size(); i += batchSize) { auto& batchState = batchStates[batchIdx++]; for (size_t j = 0; j < batchState.count; j++) { result[batchState.start + j] = batchState.verifyResults[j] != 0; } } doneCallback(result); delete this; } void AsyncAggregate(size_t batchIdx) { auto& batchState = batchStates[batchIdx]; // aggregate vvecs and skShares of batch in parallel auto vvecAgg = new VectorAggregator(vvecs, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggVvecDone, this, batchIdx, std::placeholders::_1)); auto skShareAgg = new Aggregator(skShares, batchState.start, batchState.count, parallel, workerPool, std::bind(&ContributionVerifier::HandleAggSkShareDone, this, batchIdx, std::placeholders::_1)); vvecAgg->Start(); skShareAgg->Start(); } void HandleAggVvecDone(size_t batchIdx, const BLSVerificationVectorPtr& vvec) { auto& batchState = batchStates[batchIdx]; batchState.vvec = vvec; if (++(*batchState.aggDone) == 2) { HandleAggDone(batchIdx); } } void HandleAggSkShareDone(size_t batchIdx, const CBLSSecretKey& skShare) { auto& batchState = batchStates[batchIdx]; batchState.skShare = skShare; if (++(*batchState.aggDone) == 2) { HandleAggDone(batchIdx); } } void HandleVerifyDone(size_t batchIdx, size_t count) { size_t c = verifyDoneCount += count; if (c == verifyCount) { Finish(); } } void HandleAggDone(size_t batchIdx) { auto& batchState = batchStates[batchIdx]; if (batchState.vvec == nullptr || batchState.vvec->empty() || !batchState.skShare.IsValid()) { // something went wrong while aggregating and there is nothing we can do now except mark the whole batch as failed // this can only happen if inputs were invalid in some way batchState.verifyResults.assign(batchState.count, 0); HandleVerifyDone(batchIdx, batchState.count); return; } AsyncAggregatedVerifyBatch(batchIdx); } void AsyncAggregatedVerifyBatch(size_t batchIdx) { auto f = [this, batchIdx](int threadId) { auto& batchState = batchStates[batchIdx]; bool result = Verify(batchState.vvec, batchState.skShare); if (result) { // whole batch is valid batchState.verifyResults.assign(batchState.count, 1); HandleVerifyDone(batchIdx, batchState.count); } else { // at least one entry in the batch is invalid, revert to per-contribution verification (but parallelized) AsyncVerifyBatchOneByOne(batchIdx); } }; PushOrDoWork(std::move(f)); } void AsyncVerifyBatchOneByOne(size_t batchIdx) { size_t count = batchStates[batchIdx].count; batchStates[batchIdx].verifyResults.assign(count, 0); for (size_t i = 0; i < count; i++) { auto f = [this, i, batchIdx](int threadId) { auto& batchState = batchStates[batchIdx]; batchState.verifyResults[i] = Verify(vvecs[batchState.start + i], skShares[batchState.start + i]); HandleVerifyDone(batchIdx, 1); }; PushOrDoWork(std::move(f)); } } bool Verify(const BLSVerificationVectorPtr& vvec, const CBLSSecretKey& skShare) const { CBLSPublicKey pk1; if (!pk1.PublicKeyShare(*vvec, forId)) { return false; } CBLSPublicKey pk2 = skShare.GetPublicKey(); return pk1 == pk2; } template void PushOrDoWork(Callable&& f) { if (parallel) { workerPool.push(std::forward(f)); } else { f(0); } } }; void CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector& vvecs, size_t start, size_t count, bool parallel, std::function doneCallback) { if (start == 0 && count == 0) { count = vvecs.size(); } if (vvecs.empty() || count == 0 || start > vvecs.size() || start + count > vvecs.size()) { doneCallback(nullptr); return; } if (!VerifyVerificationVectors(vvecs, start, count)) { doneCallback(nullptr); return; } auto agg = new VectorAggregator(vvecs, start, count, parallel, workerPool, std::move(doneCallback)); agg->Start(); } std::future CBLSWorker::AsyncBuildQuorumVerificationVector(const std::vector& vvecs, size_t start, size_t count, bool parallel) { auto p = BuildFutureDoneCallback(); AsyncBuildQuorumVerificationVector(vvecs, start, count, parallel, std::move(p.first)); return std::move(p.second); } BLSVerificationVectorPtr CBLSWorker::BuildQuorumVerificationVector(const std::vector& vvecs, size_t start, size_t count, bool parallel) { return AsyncBuildQuorumVerificationVector(vvecs, start, count, parallel).get(); } template void AsyncAggregateHelper(ctpl::thread_pool& workerPool, const std::vector& vec, size_t start, size_t count, bool parallel, std::function doneCallback) { if (start == 0 && count == 0) { count = vec.size(); } if (vec.empty() || count == 0 || start > vec.size() || start + count > vec.size()) { doneCallback(T()); return; } if (!VerifyVectorHelper(vec, start, count)) { doneCallback(T()); return; } auto agg = new Aggregator(vec, start, count, parallel, workerPool, std::move(doneCallback)); agg->Start(); } void CBLSWorker::AsyncAggregateSecretKeys(const BLSSecretKeyVector& secKeys, size_t start, size_t count, bool parallel, std::function doneCallback) { AsyncAggregateHelper(workerPool, secKeys, start, count, parallel, std::move(doneCallback)); } std::future CBLSWorker::AsyncAggregateSecretKeys(const BLSSecretKeyVector& secKeys, size_t start, size_t count, bool parallel) { auto p = BuildFutureDoneCallback(); AsyncAggregateSecretKeys(secKeys, start, count, parallel, std::move(p.first)); return std::move(p.second); } CBLSSecretKey CBLSWorker::AggregateSecretKeys(const BLSSecretKeyVector& secKeys, size_t start, size_t count, bool parallel) { return AsyncAggregateSecretKeys(secKeys, start, count, parallel).get(); } void CBLSWorker::AsyncAggregatePublicKeys(const BLSPublicKeyVector& pubKeys, size_t start, size_t count, bool parallel, std::function doneCallback) { AsyncAggregateHelper(workerPool, pubKeys, start, count, parallel, std::move(doneCallback)); } std::future CBLSWorker::AsyncAggregatePublicKeys(const BLSPublicKeyVector& pubKeys, size_t start, size_t count, bool parallel) { auto p = BuildFutureDoneCallback(); AsyncAggregatePublicKeys(pubKeys, start, count, parallel, std::move(p.first)); return std::move(p.second); } CBLSPublicKey CBLSWorker::AggregatePublicKeys(const BLSPublicKeyVector& pubKeys, size_t start, size_t count, bool parallel) { return AsyncAggregatePublicKeys(pubKeys, start, count, parallel).get(); } void CBLSWorker::AsyncAggregateSigs(const BLSSignatureVector& sigs, size_t start, size_t count, bool parallel, std::function doneCallback) { AsyncAggregateHelper(workerPool, sigs, start, count, parallel, std::move(doneCallback)); } std::future CBLSWorker::AsyncAggregateSigs(const BLSSignatureVector& sigs, size_t start, size_t count, bool parallel) { auto p = BuildFutureDoneCallback(); AsyncAggregateSigs(sigs, start, count, parallel, std::move(p.first)); return std::move(p.second); } CBLSSignature CBLSWorker::AggregateSigs(const BLSSignatureVector& sigs, size_t start, size_t count, bool parallel) { return AsyncAggregateSigs(sigs, start, count, parallel).get(); } CBLSPublicKey CBLSWorker::BuildPubKeyShare(const BLSVerificationVectorPtr& vvec, const CBLSId& id) { CBLSPublicKey pkShare; pkShare.PublicKeyShare(*vvec, id); return pkShare; } void CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::vector& vvecs, const BLSSecretKeyVector& skShares, bool parallel, bool aggregated, std::function&)> doneCallback) { if (!forId.IsValid() || !VerifyVerificationVectors(vvecs)) { std::vector result; result.assign(vvecs.size(), false); doneCallback(result); return; } auto verifier = new ContributionVerifier(forId, vvecs, skShares, 8, parallel, aggregated, workerPool, std::move(doneCallback)); verifier->Start(); } std::future > CBLSWorker::AsyncVerifyContributionShares(const CBLSId& forId, const std::vector& vvecs, const BLSSecretKeyVector& skShares, bool parallel, bool aggregated) { auto p = BuildFutureDoneCallback >(); AsyncVerifyContributionShares(forId, vvecs, skShares, parallel, aggregated, std::move(p.first)); return std::move(p.second); } std::vector CBLSWorker::VerifyContributionShares(const CBLSId& forId, const std::vector& vvecs, const BLSSecretKeyVector& skShares, bool parallel, bool aggregated) { return AsyncVerifyContributionShares(forId, vvecs, skShares, parallel, aggregated).get(); } std::future CBLSWorker::AsyncVerifyContributionShare(const CBLSId& forId, const BLSVerificationVectorPtr& vvec, const CBLSSecretKey& skContribution) { if (!forId.IsValid() || !VerifyVerificationVector(*vvec)) { auto p = BuildFutureDoneCallback(); p.first(false); return std::move(p.second); } auto f = [this, &forId, &vvec, &skContribution](int threadId) { CBLSPublicKey pk1; if (!pk1.PublicKeyShare(*vvec, forId)) { return false; } CBLSPublicKey pk2 = skContribution.GetPublicKey(); return pk1 == pk2; }; return workerPool.push(f); } bool CBLSWorker::VerifyContributionShare(const CBLSId& forId, const BLSVerificationVectorPtr& vvec, const CBLSSecretKey& skContribution) { CBLSPublicKey pk1; if (!pk1.PublicKeyShare(*vvec, forId)) { return false; } CBLSPublicKey pk2 = skContribution.GetPublicKey(); return pk1 == pk2; } bool CBLSWorker::VerifyVerificationVector(const BLSVerificationVector& vvec, size_t start, size_t count) { return VerifyVectorHelper(vvec, start, count); } bool CBLSWorker::VerifyVerificationVectors(const std::vector& vvecs, size_t start, size_t count) { if (start == 0 && count == 0) { count = vvecs.size(); } std::set set; for (size_t i = 0; i < count; i++) { auto& vvec = vvecs[start + i]; if (vvec == nullptr) { return false; } if (vvec->size() != vvecs[start]->size()) { return false; } for (size_t j = 0; j < vvec->size(); j++) { if (!(*vvec)[j].IsValid()) { return false; } // check duplicates if (!set.emplace((*vvec)[j].GetHash()).second) { return false; } } } return true; } bool CBLSWorker::VerifySecretKeyVector(const BLSSecretKeyVector& secKeys, size_t start, size_t count) { return VerifyVectorHelper(secKeys, start, count); } bool CBLSWorker::VerifySignatureVector(const BLSSignatureVector& sigs, size_t start, size_t count) { return VerifyVectorHelper(sigs, start, count); } void CBLSWorker::AsyncSign(const CBLSSecretKey& secKey, const uint256& msgHash, const CBLSWorker::SignDoneCallback& doneCallback) { workerPool.push([secKey, msgHash, doneCallback](int threadId) { doneCallback(secKey.Sign(msgHash)); }); } std::future CBLSWorker::AsyncSign(const CBLSSecretKey& secKey, const uint256& msgHash) { auto p = BuildFutureDoneCallback(); AsyncSign(secKey, msgHash, p.first); return std::move(p.second); } void CBLSWorker::AsyncVerifySig(const CBLSSignature& sig, const CBLSPublicKey& pubKey, const uint256& msgHash, CBLSWorker::SigVerifyDoneCallback doneCallback, CancelCond cancelCond) { if (!sig.IsValid() || !pubKey.IsValid()) { doneCallback(false); return; } std::unique_lock l(sigVerifyMutex); bool foundDuplicate = false; for (auto& s : sigVerifyQueue) { if (s.msgHash == msgHash) { foundDuplicate = true; break; } } if (foundDuplicate) { // batched/aggregated verification does not allow duplicate hashes, so we push what we currently have and start // with a fresh batch PushSigVerifyBatch(); } sigVerifyQueue.emplace_back(std::move(doneCallback), std::move(cancelCond), sig, pubKey, msgHash); if (sigVerifyBatchesInProgress == 0 || sigVerifyQueue.size() >= SIG_VERIFY_BATCH_SIZE) { PushSigVerifyBatch(); } } std::future CBLSWorker::AsyncVerifySig(const CBLSSignature& sig, const CBLSPublicKey& pubKey, const uint256& msgHash, CancelCond cancelCond) { auto p = BuildFutureDoneCallback2(); AsyncVerifySig(sig, pubKey, msgHash, std::move(p.first), std::move(cancelCond)); return std::move(p.second); } bool CBLSWorker::IsAsyncVerifyInProgress() { std::unique_lock l(sigVerifyMutex); return sigVerifyBatchesInProgress != 0; } // sigVerifyMutex must be held while calling void CBLSWorker::PushSigVerifyBatch() { auto f = [this](int threadId, const std::shared_ptr >& _jobs) { auto& jobs = *_jobs; if (jobs.size() == 1) { auto& job = jobs[0]; if (!job.cancelCond()) { bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash); job.doneCallback(valid); } std::unique_lock l(sigVerifyMutex); sigVerifyBatchesInProgress--; if (!sigVerifyQueue.empty()) { PushSigVerifyBatch(); } return; } CBLSSignature aggSig; std::vector indexes; std::vector pubKeys; std::vector msgHashes; indexes.reserve(jobs.size()); pubKeys.reserve(jobs.size()); msgHashes.reserve(jobs.size()); for (size_t i = 0; i < jobs.size(); i++) { auto& job = jobs[i]; if (job.cancelCond()) { continue; } if (pubKeys.empty()) { aggSig = job.sig; } else { aggSig.AggregateInsecure(job.sig); } indexes.emplace_back(i); pubKeys.emplace_back(job.pubKey); msgHashes.emplace_back(job.msgHash); } if (!pubKeys.empty()) { bool allValid = aggSig.VerifyInsecureAggregated(pubKeys, msgHashes); if (allValid) { for (size_t i = 0; i < pubKeys.size(); i++) { jobs[indexes[i]].doneCallback(true); } } else { // one or more sigs were not valid, revert to per-sig verification // TODO this could be improved if we would cache pairing results in some way as the previous aggregated verification already calculated all the pairings for the hashes for (size_t i = 0; i < pubKeys.size(); i++) { auto& job = jobs[indexes[i]]; bool valid = job.sig.VerifyInsecure(job.pubKey, job.msgHash); job.doneCallback(valid); } } } std::unique_lock l(sigVerifyMutex); sigVerifyBatchesInProgress--; if (!sigVerifyQueue.empty()) { PushSigVerifyBatch(); } }; auto batch = std::make_shared >(std::move(sigVerifyQueue)); sigVerifyQueue.reserve(SIG_VERIFY_BATCH_SIZE); sigVerifyBatchesInProgress++; workerPool.push(f, batch); }