-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathevent_loop.cpp
More file actions
204 lines (184 loc) · 5.58 KB
/
event_loop.cpp
File metadata and controls
204 lines (184 loc) · 5.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
#include <stdexcept>
#include <algorithm>
#include "lock_many.h"
#include "event_loop.h"
namespace kaiu {
using namespace std;
thread_local EventLoopPool this_pool = EventLoopPool::unknown;
/*** EventLoop ***/
EventLoop::EventLoop(const EventLoopPool defaultPool)
{
this->defaultPool = defaultPool;
}
/*** SynchronousEventLoop ***/
SynchronousEventLoop::SynchronousEventLoop(const EventFunc& start) : EventLoop()
{
push(start);
do_loop();
}
void SynchronousEventLoop::do_loop()
{
while (events.size()) {
auto event = next();
(*event)(*this);
}
}
void SynchronousEventLoop::push(const EventLoopPool pool, const EventFunc& event)
{
events.emplace(new EventFunc(event));
}
auto SynchronousEventLoop::next(const EventLoopPool pool) -> Event
{
auto event = move(events.front());
events.pop();
return event;
}
/*** ParallelEventLoop ***/
ParallelEventLoop::ParallelEventLoop(const unordered_map<EventLoopPool, int, EventLoopPoolHash> pools) : EventLoop()
{
/* Count how many threads we need (including this thread) */
int total_threads = 0;
for (const auto& pair : pools) {
const auto pool_size = pair.second;
if (pool_size <= 0) {
throw invalid_argument("Thread count specified for a pool is zero or negative. Use SynchronousEventLoop for non-threaded event loop.");
return;
}
total_threads += pool_size;
}
/* Include this thread in count of threads to initialize */
starter_pistol.reset(total_threads + 1);
/* Iterate over requested thread pools, creating threads */
for (const auto& pair : pools) {
const auto pool_type = pair.first;
const auto pool_size = pair.second;
queues.emplace(piecewise_construct, forward_as_tuple(pool_type), forward_as_tuple(false));
for (int i = 0; i < pool_size; i++) {
threads.emplace_back(bind(&ParallelEventLoop::do_threaded_loop, this, pool_type));
}
}
/* Mark this thread as started, Wait for all threads to start */
starter_pistol.ready();
}
void ParallelEventLoop::do_threaded_loop(const EventLoopPool pool)
{
this_pool = pool;
/*
* Mark this thread as "working". This will be undone temporarily by any
* blocking wait operation on the event queue, via ConcurrentQueue<T>::pop.
*/
auto not_idle = threads_not_idle_counter.delta(+1);
starter_pistol.ready();
while (Event event = next(pool)) {
try {
(*event)(*this);
} catch (...) {
/* Store exception (uses exceptions_mutex) */
exceptions.push(current_exception());
/* Notify any ongoing join() that there is an exception to handle */
threads_not_idle_counter.notify();
}
}
}
void ParallelEventLoop::push(const EventLoopPool pool, const EventFunc& event)
{
/* const-param antipattern */
auto _pool = pool == EventLoopPool::same ? current_pool() : pool;
if (int(_pool) <= 0) {
throw invalid_argument("Invalid thread pool");
}
ConcurrentQueue<Event>& queue = queues.at(_pool);
queue.emplace(new EventFunc(event));
}
void ParallelEventLoop::process_exceptions(function<void(exception_ptr)> handler)
{
exception_ptr ptr;
while (exceptions.pop(ptr)) {
if (handler) {
handler(ptr);
}
}
}
auto ParallelEventLoop::next(const EventLoopPool pool) -> Event
{
ConcurrentQueue<Event>& queue = queues.at(pool);
Event event;
/*
* If pop waits, this thread is considered idle during the wait.
*
* queue_mutex is always acquired during this call, and may be
* released/reacquired several times if a wait occurs.
*
* threads_not_idle_counter mutex will be acquired at the start and at the
* end of a wait, always while queue_mutex is acquired.
*/
if (queue.pop<ScopedCounter<int>::Guard>(event, threads_not_idle_counter, -1)) {
return event;
} else {
/* No events available and queue is in non-blocking mode */
return nullptr;
}
}
void ParallelEventLoop::join(function<void(exception_ptr)> handler)
{
/* Proxy iterator for iterating over queue mutexes */
using src_it = typename decltype(queues)::const_iterator;
class queue_mutex_iterator {
public:
queue_mutex_iterator(src_it it) : it(it) { }
bool operator !=(const queue_mutex_iterator& b) { return it != b.it; }
void operator ++() { ++it; }
mutex& operator *() { return it->second.queue_mutex; }
mutex& operator ->() { return it->second.queue_mutex; }
private:
src_it it;
};
/* Sanity check */
if (current_pool() != EventLoopPool::unknown) {
throw logic_error("join called from worker thread");
}
/* Loop until all queues are empty and all threads are idle */
do {
/* Handle pending exceptions */
process_exceptions(handler);
/* Wait until all threads are idle */
threads_not_idle_counter.waitForZero();
/* Lock all queues */
lock_many lock(
queue_mutex_iterator(queues.cbegin()),
queue_mutex_iterator(queues.cend()));
bool all_queues_are_empty =
all_of(queues.cbegin(), queues.cend(),
[] (auto& pair) { return pair.second.isEmpty(true); });
/* If all queues are empty and all threads are idle, break */
if (all_queues_are_empty && threads_not_idle_counter.isZero()) {
break;
}
} while (true);
/* Handle pending exceptions */
process_exceptions(handler);
}
ParallelEventLoop::~ParallelEventLoop()
{
/* Wait for all workers to finish working */
join(nullptr);
/*
* Put queues into no-waiting mode so that they terminate when no events
* are left to be processed (there should be no events left since we just
* came back from a join(). Setting nowaiting will wake any threads that
* are waiting for events.
*/
for (auto& pair : queues) {
auto& queue = pair.second;
queue.set_nowaiting(true);
}
/* Wait for all workers to terminate */
for (auto& thread : threads) {
thread.join();
}
}
EventLoopPool ParallelEventLoop::current_pool()
{
return this_pool;
}
}