|
6 | 6 | import threading |
7 | 7 | import time |
8 | 8 |
|
9 | | -class Pipeline(): |
10 | | - def __init__(self): |
11 | | - self.Q = queue.Queue(maxsize=10) |
12 | | - |
13 | | - def empty(self): |
14 | | - return self.Q.empty() |
15 | 9 |
|
16 | | - def size(self): |
17 | | - return self.Q.qsize() |
| 10 | +class Pipeline(queue.Queue): |
| 11 | + def __init__(self): |
| 12 | + super().__init__(maxsize=10) |
18 | 13 |
|
19 | 14 | def get_message(self, name): |
20 | 15 | logging.debug("%s:about to get from queue", name) |
21 | | - value = self.Q.get() |
| 16 | + value = self.get() |
22 | 17 | logging.debug("%s:got %d from queue", name, value) |
23 | 18 | return value |
24 | 19 |
|
25 | 20 | def set_message(self, value, name): |
26 | 21 | logging.debug("%s:about to add %d to queue", name, value) |
27 | | - self.Q.put(value) |
| 22 | + self.put(value) |
28 | 23 | logging.debug("%s:added %d to queue", name, value) |
29 | 24 |
|
30 | 25 |
|
31 | 26 | def producer(pipeline, event): |
32 | | - '''Pretend we're getting a number from the network.''' |
| 27 | + """Pretend we're getting a number from the network.""" |
33 | 28 | while not event.is_set(): |
34 | | - message = random.randint(1,101) |
| 29 | + message = random.randint(1, 101) |
35 | 30 | logging.info("Producer got message: %s", message) |
36 | 31 | pipeline.set_message(message, "Producer") |
37 | 32 |
|
38 | 33 | logging.info("Producer received EXIT event. Exiting") |
39 | 34 |
|
| 35 | + |
40 | 36 | def consumer(pipeline, event): |
41 | | - ''' Pretend we're saving a number in the database. ''' |
| 37 | + """ Pretend we're saving a number in the database. """ |
42 | 38 | while not event.is_set() or not pipeline.empty(): |
43 | 39 | message = pipeline.get_message("Consumer") |
44 | | - logging.info("Consumer storing message: %s (queue size=%s)", message, |
45 | | - pipeline.size()) |
| 40 | + logging.info( |
| 41 | + "Consumer storing message: %s (queue size=%s)", |
| 42 | + message, |
| 43 | + pipeline.qsize(), |
| 44 | + ) |
46 | 45 |
|
47 | 46 | logging.info("Consumer received EXIT event. Exiting") |
48 | 47 |
|
49 | 48 |
|
50 | 49 | if __name__ == "__main__": |
51 | | - format='%(asctime)s: %(message)s' |
52 | | - logging.basicConfig(format=format, level=logging.INFO, datefmt='%H:%M:%S') |
| 50 | + format = "%(asctime)s: %(message)s" |
| 51 | + logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S") |
53 | 52 | # logging.basicConfig(format=format, level=logging.DEBUG, datefmt='%H:%M:%S') |
54 | 53 |
|
55 | 54 | pipeline = Pipeline() |
|
0 commit comments