-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.py
More file actions
30 lines (26 loc) · 1.03 KB
/
consumer.py
File metadata and controls
30 lines (26 loc) · 1.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# For Process-based threading
import multiprocessing
from logger import warn, info, debug, error
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
"""Consumer task processing function. Retrieves a task, i.e., an URL, from
a tasks queue and finds new URLs within its content. These URLs are then
written to a result queue that gets validated by the producer for URL
freshness.
"""
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
debug('{}: Exiting'.format(proc_name))
self.task_queue.task_done()
break
debug('{}: {}'.format(proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return