-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreadpool.c
More file actions
65 lines (61 loc) · 2.07 KB
/
threadpool.c
File metadata and controls
65 lines (61 loc) · 2.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#include "threadpool.h"
#include <stdlib.h>
static void* worker(void *p) {
threadpool_t *tp = (threadpool_t*)p;
for (;;) {
pthread_mutex_lock(&tp->mtx);
while (!tp->stop && tp->size == 0)
pthread_cond_wait(&tp->cv_nonempty, &tp->mtx);
if (tp->stop && tp->size == 0) {
pthread_mutex_unlock(&tp->mtx);
break;
}
tp_task *t = tp->head;
tp->head = t->next;
if (!tp->head) tp->tail = NULL;
tp->size--;
pthread_cond_signal(&tp->cv_nonfull);
pthread_mutex_unlock(&tp->mtx);
t->fn(t->arg);
free(t);
}
return NULL;
}
int tp_init(threadpool_t *tp, int nthreads, int cap) {
*tp = (threadpool_t){0};
pthread_mutex_init(&tp->mtx, NULL);
pthread_cond_init(&tp->cv_nonempty, NULL);
pthread_cond_init(&tp->cv_nonfull, NULL);
tp->cap = cap;
tp->nthreads = nthreads;
tp->threads = (pthread_t*)calloc(nthreads, sizeof(pthread_t));
if (!tp->threads) return -1;
for (int i = 0; i < nthreads; ++i)
pthread_create(&tp->threads[i], NULL, worker, tp);
return 0;
}
void tp_submit(threadpool_t *tp, tp_task_fn fn, void *arg) {
tp_task *t = (tp_task*)malloc(sizeof(tp_task));
t->fn = fn; t->arg = arg; t->next = NULL;
pthread_mutex_lock(&tp->mtx);
while (!tp->stop && tp->size >= tp->cap)
pthread_cond_wait(&tp->cv_nonfull, &tp->mtx);
if (tp->stop) { pthread_mutex_unlock(&tp->mtx); free(t); return; }
if (tp->tail) tp->tail->next = t; else tp->head = t;
tp->tail = t; tp->size++;
pthread_cond_signal(&tp->cv_nonempty);
pthread_mutex_unlock(&tp->mtx);
}
void tp_destroy(threadpool_t *tp) {
pthread_mutex_lock(&tp->mtx);
tp->stop = true;
pthread_cond_broadcast(&tp->cv_nonempty);
pthread_cond_broadcast(&tp->cv_nonfull);
pthread_mutex_unlock(&tp->mtx);
for (int i = 0; i < tp->nthreads; ++i)
pthread_join(tp->threads[i], NULL);
free(tp->threads);
pthread_mutex_destroy(&tp->mtx);
pthread_cond_destroy(&tp->cv_nonempty);
pthread_cond_destroy(&tp->cv_nonfull);
}