Skip to content

Commit 640f8e3

Browse files
committed
Update code
1 parent b6c3ebb commit 640f8e3

File tree

1 file changed

+25
-52
lines changed

1 file changed

+25
-52
lines changed

python-deque/producer_consumer.py

Lines changed: 25 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,69 +1,42 @@
1-
import collections
21
import logging
32
import random
43
import threading
54
import time
5+
from collections import deque
66

7-
logging.basicConfig(
8-
level=logging.INFO,
9-
format="%(threadName)s %(message)s",
10-
)
7+
logging.basicConfig(level=logging.INFO, format="%(message)s")
118

129

13-
class SynchronizedBuffer:
14-
def __init__(self, capacity):
15-
self.values = collections.deque(maxlen=capacity)
16-
self.lock = threading.RLock()
17-
self.consumed = threading.Condition(self.lock)
18-
self.produced = threading.Condition(self.lock)
10+
def wait_seconds(mins, maxs):
11+
time.sleep(mins + random.random() * (maxs - mins))
1912

20-
def __repr__(self):
21-
return repr(list(self.values))
2213

23-
@property
24-
def empty(self):
25-
return len(self.values) == 0
26-
27-
@property
28-
def full(self):
29-
return len(self.values) == self.values.maxlen
30-
31-
def put(self, value):
32-
with self.lock:
33-
self.consumed.wait_for(lambda: not self.full)
34-
self.values.append(value)
35-
self.produced.notify()
36-
37-
def get(self):
38-
with self.lock:
39-
self.produced.wait_for(lambda: not self.empty)
40-
try:
41-
return self.values.popleft()
42-
finally:
43-
self.consumed.notify()
44-
45-
46-
def producer(buffer):
14+
def produce(queue, size):
4715
while True:
48-
value = random.randint(1, 10)
49-
buffer.put(value)
50-
logging.info("produced %d: %s", value, buffer)
51-
time.sleep(random.random())
16+
if len(queue) < size:
17+
value = random.randint(0, 9)
18+
queue.append(value)
19+
logging.info("Produced: %d -> %s", value, str(queue))
20+
else:
21+
logging.info("Queue is saturated")
22+
wait_seconds(0.1, 0.5)
5223

5324

54-
def consumer(buffer):
25+
def consume(queue):
5526
while True:
56-
value = buffer.get()
57-
logging.info("consumed %d: %s", value, buffer)
58-
time.sleep(random.random())
59-
27+
try:
28+
value = queue.popleft()
29+
except IndexError:
30+
logging.info("Queue is empty")
31+
else:
32+
logging.info("Consumed: %d -> %s", value, str(queue))
33+
wait_seconds(0.2, 0.7)
6034

61-
if __name__ == "__main__":
6235

63-
buffer = SynchronizedBuffer(5)
36+
logging.info("Starting Threads...\n")
37+
logging.info("Press Ctrl+C to interrupt the execution\n")
6438

65-
for _ in range(3):
66-
threading.Thread(target=producer, args=(buffer,)).start()
39+
shared_queue = deque()
6740

68-
for _ in range(2):
69-
threading.Thread(target=consumer, args=(buffer,)).start()
41+
threading.Thread(target=produce, args=(shared_queue, 10)).start()
42+
threading.Thread(target=consume, args=(shared_queue,)).start()

0 commit comments

Comments
 (0)