Skip to content

Commit bc6f24b

Browse files
EnqueueDuckEnqueueDuck
authored andcommitted
Support server workers with EPOLL
1 parent 6212049 commit bc6f24b

6 files changed

Lines changed: 133 additions & 31 deletions

File tree

README.md

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,33 @@ server to achieve beter through-put.
66

77
# Releases
88

9-
### Version 1.1
10-
11-
To run the server:
9+
To run the server, default host is 14396:
1210

1311
```
1412
bazel run -c opt src/main/cpp:server
1513
```
1614

17-
Simple HTTP Server with a multi-threaded backend. The backend employs a ThreadPool, which manages
15+
### Version 2.0
16+
17+
HTTP Server with N workers. Connections are distributed to the workers, each work handles IO
18+
using EPOLL.
19+
20+
```
21+
user~/wrk (master) ./wrk -t8 -c 10000 -d20s http://host:14396
22+
Running 20s test @ http://host:14396
23+
8 threads and 10000 connections
24+
Thread Stats Avg Stdev Max +/- Stdev
25+
Latency 51.63us 129.97us 48.81ms 99.68%
26+
Req/Sec 103.74k 11.08k 134.55k 71.14%
27+
2073920 requests in 20.10s, 79.11MB read
28+
Socket errors: connect 8987, read 0, write 0, timeout 0
29+
Requests/sec: 103189.69
30+
Transfer/sec: 3.94MB
31+
```
32+
33+
### Version 1.1
34+
35+
HTTP Server with a multi-threaded backend. The backend employs a ThreadPool, which manages
1836
N worker threads. Each request is encapsulated in a Task object and passed to one of the worker
1937
threads for handling.
2038

src/main/cpp/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ cc_binary(
3434
name = "server",
3535
srcs = ["main.cpp"],
3636
linkopts = ["-lpthread"],
37+
copts = ["-std=c++1z"],
3738
deps = [
3839
":http",
3940
"@third_party-glog//:glog",

src/main/cpp/http/http_server.cpp

Lines changed: 60 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
namespace http {
66

7-
void HttpServer::InternalHandle(int connection_sd) {
7+
void HttpServer::Worker::HandleConnection(int connection_sd) {
88
char buffer[opts_.max_http_request_size];
99

1010
int bytes_read = 1;
@@ -17,9 +17,6 @@ void HttpServer::InternalHandle(int connection_sd) {
1717
}
1818
if (bytes_read == 0) break;
1919

20-
VLOG(1) << "Received " << bytes_read << " bytes";
21-
VLOG(1) << buffer;
22-
2320
// Parse the http request
2421
request::HttpRequest request;
2522
request.ParseFromString(buffer);
@@ -34,8 +31,66 @@ void HttpServer::InternalHandle(int connection_sd) {
3431
close(connection_sd);
3532
}
3633

34+
void HttpServer::Worker::Stop() {
35+
stopped_ = true;
36+
thread_.join();
37+
}
38+
39+
void HttpServer::Worker::Initialize() {
40+
epoll_fd_ = epoll_create1(0);
41+
thread_ = std::thread([this]() {
42+
struct epoll_event events[MAX_EPOLL_EVENTS];
43+
int num_events;
44+
while (!stopped_) {
45+
num_events = epoll_wait(epoll_fd_, events, MAX_EPOLL_EVENTS, EPOLL_TIMEOUT_MS);
46+
for (int i = 0; i < num_events; ++i) {
47+
// Client hang up
48+
if (events[i].events & EPOLLHUP) {
49+
close(events[i].data.fd);
50+
continue;
51+
}
52+
53+
// Error
54+
if ((events[i].events & EPOLLERR) || !(events[i].events & EPOLLIN)) {
55+
LOG(ERROR) << "Error caught with fd " << events[i].data.fd;
56+
continue;
57+
}
58+
59+
HandleConnection(events[i].data.fd);
60+
}
61+
}
62+
63+
});
64+
}
65+
66+
void HttpServer::Worker::AddConnection(int connection_sd) {
67+
struct epoll_event event;
68+
event.data.fd = connection_sd;
69+
event.events = EPOLLIN;
70+
71+
epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, connection_sd, &event);
72+
}
73+
3774
void HttpServer::Handle(int connection_sd) {
38-
executor_.Execute(std::bind(&HttpServer::InternalHandle, this, connection_sd));
75+
auto &worker = workers_[current_worker_idx_++];
76+
worker->AddConnection(connection_sd);
77+
if (current_worker_idx_ >= workers_.size()) current_worker_idx_ = 0;
78+
}
79+
80+
void HttpServer::Initialize() {
81+
Server::Initialize();
82+
for (auto &worker : workers_) {
83+
worker->Initialize();
84+
}
85+
}
86+
87+
response::HttpResponse HttpServer::Worker::HandleRequest(
88+
const request::HttpRequest &request
89+
) const {
90+
response::HttpResponse response;
91+
response.status_code = response::HTTP_200_OK;
92+
response.body = "ok";
93+
return response;
3994
}
4095

4196
} // namespace http

src/main/cpp/http/http_server.h

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,74 @@
11
#pragma once
22

3+
#include <sys/epoll.h>
4+
#include <thread>
5+
#include <atomic>
6+
#include <memory>
7+
38
#include "http/server.h"
4-
#include "thread/executor.h"
59
#include "http/http_response.h"
610
#include "http/http_request.h"
711

812
namespace http {
913

1014
struct HttpServerOpts {
11-
constexpr static int DEFAULT_MAX_HTTP_REQUEST_SIZE = 65536;
12-
1315
ServerOpts server_opts;
14-
thread::ExecutorOpts executor_opts;
15-
16-
int max_http_request_size = DEFAULT_MAX_HTTP_REQUEST_SIZE;
16+
int num_workers;
1717
};
1818

1919
class HttpServer : public Server {
2020
public:
21-
explicit HttpServer(HttpServerOpts opts)
22-
: Server(opts.server_opts), opts_(opts), executor_(opts.executor_opts) { }
21+
constexpr static int DEFAULT_HTTP_REQUEST_SIZE = 1024;
2322

24-
void Initialize() {
25-
Server::Initialize();
26-
executor_.Initialize();
23+
explicit HttpServer(const HttpServerOpts &opts) : Server(opts.server_opts), opts_(opts) {
24+
for (int i = 0; i < opts_.num_workers; ++i) {
25+
workers_.emplace_back(std::make_unique<Worker>(WorkerOpts()));
26+
}
27+
LOG(INFO) << "Started " << opts_.num_workers << " workers";
2728
}
2829

30+
struct WorkerOpts {
31+
int max_http_request_size = DEFAULT_HTTP_REQUEST_SIZE;
32+
};
33+
34+
class Worker {
35+
public:
36+
Worker() = default;
37+
explicit Worker(const WorkerOpts &opts) : opts_(opts) { }
38+
39+
constexpr static int MAX_EPOLL_EVENTS = 64;
40+
constexpr static int EPOLL_TIMEOUT_MS = 10000; // 10 seconds
41+
42+
void Initialize();
43+
44+
void Stop();
45+
46+
void AddConnection(int connection_sd);
47+
48+
private:
49+
WorkerOpts opts_;
50+
51+
int epoll_fd_;
52+
std::thread thread_;
53+
std::atomic_bool stopped_{false};
54+
55+
void HandleConnection(int connection_sd);
56+
57+
virtual response::HttpResponse HandleRequest(const request::HttpRequest &request) const;
58+
};
59+
60+
using WorkerPtr = std::unique_ptr<Worker>;
61+
62+
void Initialize();
63+
2964
protected:
3065
void Handle(int connection_sd) override;
3166

32-
virtual response::HttpResponse HandleRequest(const request::HttpRequest &request) const {
33-
response::HttpResponse response;
34-
response.status_code = response::HTTP_200_OK;
35-
response.body = "ok";
36-
return response;
37-
}
3867

3968
private:
4069
HttpServerOpts opts_;
41-
thread::Executor executor_;
42-
43-
void InternalHandle(int connection_sd);
70+
std::vector<WorkerPtr> workers_;
71+
int current_worker_idx_ = 0;
4472
};
4573

4674
} // namespace http

src/main/cpp/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ int main(int argc, char *argv[]) {
1111
google::ParseCommandLineFlags(&argc, &argv, true);
1212

1313
http::HttpServerOpts opts;
14-
opts.executor_opts.num_worker_threads = std::thread::hardware_concurrency();
14+
opts.num_workers = std::thread::hardware_concurrency();
1515

1616
http::HttpServer server(opts);
1717
server.Initialize();

src/main/cpp/tests/test_server.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ int main(int argc, char *argv[]) {
1111
google::ParseCommandLineFlags(&argc, &argv, true);
1212

1313
http::HttpServerOpts opts;
14-
opts.executor_opts.num_worker_threads = std::thread::hardware_concurrency();
14+
opts.num_workers = std::thread::hardware_concurrency();
1515

1616
http::HttpServer server(opts);
1717
server.Initialize();

0 commit comments

Comments
 (0)