Merge #7966: http: Do a pending c++11 simplification handling work items
f0188f9 http: use std::move to move HTTPRequest into HTTPWorkItem (Wladimir J. van der Laan) 37b2137 http: Change boost::scoped_ptr to std::unique_ptr in HTTPRequest (Wladimir J. van der Laan) f97b410 http: Add log message when work queue is full (Wladimir J. van der Laan) 091d6e0 http: Do a pending c++11 simplification (Wladimir J. van der Laan)
This commit is contained in:
parent
90b00cfc66
commit
33233409a9
@ -36,7 +36,6 @@
|
|||||||
|
|
||||||
#include <boost/algorithm/string/case_conv.hpp> // for to_lower()
|
#include <boost/algorithm/string/case_conv.hpp> // for to_lower()
|
||||||
#include <boost/foreach.hpp>
|
#include <boost/foreach.hpp>
|
||||||
#include <boost/scoped_ptr.hpp>
|
|
||||||
|
|
||||||
/** Maximum size of http request (request line + headers) */
|
/** Maximum size of http request (request line + headers) */
|
||||||
static const size_t MAX_HEADERS_SIZE = 8192;
|
static const size_t MAX_HEADERS_SIZE = 8192;
|
||||||
@ -45,8 +44,8 @@ static const size_t MAX_HEADERS_SIZE = 8192;
|
|||||||
class HTTPWorkItem : public HTTPClosure
|
class HTTPWorkItem : public HTTPClosure
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
HTTPWorkItem(HTTPRequest* req, const std::string &path, const HTTPRequestHandler& func):
|
HTTPWorkItem(std::unique_ptr<HTTPRequest> req, const std::string &path, const HTTPRequestHandler& func):
|
||||||
req(req), path(path), func(func)
|
req(std::move(req)), path(path), func(func)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
void operator()()
|
void operator()()
|
||||||
@ -54,7 +53,7 @@ public:
|
|||||||
func(req.get(), path);
|
func(req.get(), path);
|
||||||
}
|
}
|
||||||
|
|
||||||
boost::scoped_ptr<HTTPRequest> req;
|
std::unique_ptr<HTTPRequest> req;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string path;
|
std::string path;
|
||||||
@ -71,8 +70,7 @@ private:
|
|||||||
/** Mutex protects entire object */
|
/** Mutex protects entire object */
|
||||||
CWaitableCriticalSection cs;
|
CWaitableCriticalSection cs;
|
||||||
CConditionVariable cond;
|
CConditionVariable cond;
|
||||||
/* XXX in C++11 we can use std::unique_ptr here and avoid manual cleanup */
|
std::deque<std::unique_ptr<WorkItem>> queue;
|
||||||
std::deque<WorkItem*> queue;
|
|
||||||
bool running;
|
bool running;
|
||||||
size_t maxDepth;
|
size_t maxDepth;
|
||||||
int numThreads;
|
int numThreads;
|
||||||
@ -101,15 +99,11 @@ public:
|
|||||||
numThreads(0)
|
numThreads(0)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
/*( Precondition: worker threads have all stopped
|
/** Precondition: worker threads have all stopped
|
||||||
* (call WaitExit)
|
* (call WaitExit)
|
||||||
*/
|
*/
|
||||||
~WorkQueue()
|
~WorkQueue()
|
||||||
{
|
{
|
||||||
while (!queue.empty()) {
|
|
||||||
delete queue.front();
|
|
||||||
queue.pop_front();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
/** Enqueue a work item */
|
/** Enqueue a work item */
|
||||||
bool Enqueue(WorkItem* item)
|
bool Enqueue(WorkItem* item)
|
||||||
@ -118,7 +112,7 @@ public:
|
|||||||
if (queue.size() >= maxDepth) {
|
if (queue.size() >= maxDepth) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
queue.push_back(item);
|
queue.emplace_back(std::unique_ptr<WorkItem>(item));
|
||||||
cond.notify_one();
|
cond.notify_one();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -127,18 +121,17 @@ public:
|
|||||||
{
|
{
|
||||||
ThreadCounter count(*this);
|
ThreadCounter count(*this);
|
||||||
while (true) {
|
while (true) {
|
||||||
WorkItem* i = 0;
|
std::unique_ptr<WorkItem> i;
|
||||||
{
|
{
|
||||||
boost::unique_lock<boost::mutex> lock(cs);
|
boost::unique_lock<boost::mutex> lock(cs);
|
||||||
while (running && queue.empty())
|
while (running && queue.empty())
|
||||||
cond.wait(lock);
|
cond.wait(lock);
|
||||||
if (!running)
|
if (!running)
|
||||||
break;
|
break;
|
||||||
i = queue.front();
|
i = std::move(queue.front());
|
||||||
queue.pop_front();
|
queue.pop_front();
|
||||||
}
|
}
|
||||||
(*i)();
|
(*i)();
|
||||||
delete i;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/** Interrupt and exit loops */
|
/** Interrupt and exit loops */
|
||||||
@ -294,12 +287,14 @@ static void http_request_cb(struct evhttp_request* req, void* arg)
|
|||||||
|
|
||||||
// Dispatch to worker thread
|
// Dispatch to worker thread
|
||||||
if (i != iend) {
|
if (i != iend) {
|
||||||
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(hreq.release(), path, i->handler));
|
std::unique_ptr<HTTPWorkItem> item(new HTTPWorkItem(std::move(hreq), path, i->handler));
|
||||||
assert(workQueue);
|
assert(workQueue);
|
||||||
if (workQueue->Enqueue(item.get()))
|
if (workQueue->Enqueue(item.get()))
|
||||||
item.release(); /* if true, queue took ownership */
|
item.release(); /* if true, queue took ownership */
|
||||||
else
|
else {
|
||||||
|
LogPrintf("WARNING: request rejected because http work queue depth exceeded, it can be increased with the -rpcworkqueue= setting\n");
|
||||||
item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");
|
item->req->WriteReply(HTTP_INTERNAL, "Work queue depth exceeded");
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
hreq->WriteReply(HTTP_NOTFOUND);
|
hreq->WriteReply(HTTP_NOTFOUND);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user