Skip to content

Commit 8d9cf77

Browse files
authored
Merge pull request #269 from realpython/queue
Queue - Sample Code
2 parents 20a6526 + da95fe7 commit 8d9cf77

File tree

16 files changed

+1248
-0
lines changed

16 files changed

+1248
-0
lines changed

queue/README.md

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
# A Practical Look at Stacks, Queues, and Priority Queues in Python
2+
3+
Sample code supplementing the tutorial on [Python queues](https://realpython.com/queue-in-python/) hosted on Real Python.
4+
5+
## Installation
6+
7+
To get started, create and activate a new virtual environment, and then install the required dependencies into it:
8+
9+
```shell
10+
$ python3 -m venv venv/ --prompt=queue
11+
$ source venv/bin/activate
12+
(queue) $ python -m pip install -r requirements.txt -c constraints.txt
13+
```
14+
15+
## Usage
16+
17+
### Queue Implementation
18+
19+
Change directory to `src/` and run the interactive Python interpreter:
20+
21+
```shell
22+
(queue) $ cd src/
23+
(queue) $ python -q
24+
```
25+
26+
Then, import various queue data types from the `queues` module and start using them:
27+
28+
```python
29+
>>> from queues import Queue, Stack, PriorityQueue
30+
31+
>>> fifo, stack, heap = Queue(), Stack(), PriorityQueue()
32+
>>> for priority, element in enumerate(["1st", "2nd", "3rd"]):
33+
... fifo.enqueue(element)
34+
... stack.enqueue(element)
35+
... heap.enqueue_with_priority(priority, element)
36+
37+
>>> for elements in zip(fifo, stack, heap):
38+
... print(elements)
39+
...
40+
('1st', '3rd', '3rd')
41+
('2nd', '2nd', '2nd')
42+
('3rd', '1st', '1st')
43+
```
44+
45+
### Graph Algorithms
46+
47+
Change directory to `src/` and run the interactive Python interpreter:
48+
49+
```shell
50+
(queue) $ cd src/
51+
(queue) $ python -q
52+
```
53+
54+
Then, import various `graph` module members and start using them:
55+
56+
```python
57+
>>> from graph import *
58+
59+
>>> nodes, graph = load_graph("roadmap.dot", City.from_dict)
60+
61+
>>> city1 = nodes["london"]
62+
>>> city2 = nodes["edinburgh"]
63+
64+
>>> def distance(weights):
65+
... return float(weights["distance"])
66+
67+
>>> for city in dijkstra_shortest_path(graph, city1, city2, distance):
68+
... print(city.name)
69+
...
70+
City of London
71+
St Albans
72+
Coventry
73+
Birmingham
74+
Stoke-on-Trent
75+
Manchester
76+
Salford
77+
Preston
78+
Lancaster
79+
Carlisle
80+
Edinburgh
81+
82+
>>> for city in shortest_path(graph, city1, city2):
83+
... print(city.name)
84+
...
85+
City of London
86+
Bristol
87+
Newport
88+
St Asaph
89+
Liverpool
90+
Preston
91+
Lancaster
92+
Carlisle
93+
Edinburgh
94+
95+
>>> connected(graph, city1, city2)
96+
True
97+
98+
>>> def is_twentieth_century(city):
99+
... return city.year and 1901 <= city.year <= 2000
100+
101+
>>> breadth_first_search(graph, city2, is_twentieth_century)
102+
City(
103+
name='Lancaster',
104+
country='England',
105+
year=1937,
106+
latitude=54.047,
107+
longitude=-2.801
108+
)
109+
110+
>>> depth_first_search(graph, city2, is_twentieth_century)
111+
City(
112+
name='Lancaster',
113+
country='England',
114+
year=1937,
115+
latitude=54.047,
116+
longitude=-2.801
117+
)
118+
```
119+
120+
### Thread-Safe Queues
121+
122+
Change directory to `src/` and run the script with optional parameters. For example:
123+
124+
```shell
125+
(queue) $ cd src/
126+
(queue) $ python thread_safe_queues.py --queue fifo \
127+
--producers 3 \
128+
--consumers 2 \
129+
--producer-speed 1 \
130+
--consumer-speed 1
131+
```
132+
133+
**Parameters:**
134+
135+
| Short Name | Long Name | Value |
136+
|-----------:|-------------------:|------------------------|
137+
| `-q` | `--queue` | `fifo`, `lifo`, `heap` |
138+
| `-p` | `--producers` | number |
139+
| `-c` | `--consumers` | number |
140+
| `-ps` | `--producer-speed` | number |
141+
| `-cs` | `--consumer-speed` | number |
142+
143+
### Asynchronous Queues
144+
145+
Change directory to `src/` and run the script with a mandatory URL and optional parameters:
146+
147+
```shell
148+
(queue) $ cd src/
149+
(queue) $ python async_queues.py http://localhost:8000/ --max-depth 2 \
150+
--num-workers 3
151+
```
152+
153+
**Parameters:**
154+
155+
| Short Name | Long Name | Value |
156+
|-----------:|----------------:|--------|
157+
| `-d` | `--max-depth` | number |
158+
| `-w` | `--num-workers` | number |
159+
160+
Note that to change between the available queue types, you'll need to edit your `main()` coroutine function:
161+
162+
```python
163+
# async_queues.py
164+
165+
# ...
166+
167+
async def main(args):
168+
session = aiohttp.ClientSession()
169+
try:
170+
links = Counter()
171+
queue = asyncio.Queue()
172+
# queue = asyncio.LifoQueue()
173+
# queue = asyncio.PriorityQueue()
174+
175+
# ...
176+
```
177+
178+
### Multiprocessing Queue
179+
180+
Change directory to `src/` and run the script with a mandatory MD5 hash value and optional parameters:
181+
182+
```shell
183+
(queue) $ cd src/
184+
(queue) $ python async_queues.py a9d1cbf71942327e98b40cf5ef38a960 -m 6 -w 4
185+
```
186+
187+
**Parameters:**
188+
189+
| Short Name | Long Name | Value |
190+
|-----------:|----------------:|--------|
191+
| `-m` | `--max-length` | number |
192+
| `-w` | `--num-workers` | number |
193+
194+
The maximum length determines the maximum number of characters in a text to guess. If you skip the number of workers, then the script will create as many of them as the number of CPU cores detected.
195+
196+
### Message Brokers
197+
198+
#### RabbitMQ
199+
200+
Start a RabbitMQ broker with Docker:
201+
202+
```shell
203+
$ docker run -it --rm --name rabbitmq -p 5672:5672 rabbitmq
204+
```
205+
206+
Open separate terminal windows, activate your virtual environment, change directory to `message_brokers/rabbitmq/`, and run your producer and consumer scripts:
207+
208+
```shell
209+
(queue) $ cd message_brokers/rabbitmq/
210+
(queue) $ python producer.py
211+
(queue) $ python consumer.py
212+
```
213+
214+
You can have as many producers and consumers as you like.
215+
216+
#### Redis
217+
218+
Start a Redis server with Docker:
219+
220+
```shell
221+
$ docker run -it --rm --name redis -p 6379:6379 redis
222+
```
223+
224+
Open separate terminal windows, activate your virtual environment, change directory to `message_brokers/redis/`, and run your publisher and subscriber scripts:
225+
226+
```shell
227+
(queue) $ cd message_brokers/redis/
228+
(queue) $ python publisher.py
229+
(queue) $ python subscriber.py
230+
```
231+
232+
You can have as many publishers and subscribers as you like.
233+
234+
#### Apache Kafka
235+
236+
Change directory to `message_brokers/kafka/` and start an Apache Kafka cluster with Docker Compose:
237+
238+
```shell
239+
$ cd message_brokers/kafka/
240+
$ docker-compose up
241+
```
242+
243+
Open separate terminal windows, activate your virtual environment, change directory to `message_brokers/kafka/`, and run your producer and consumer scripts:
244+
245+
```shell
246+
(queue) $ cd message_brokers/kafka/
247+
(queue) $ python producer.py
248+
(queue) $ python consumer.py
249+
```
250+
251+
You can have as many producers and consumers as you like.

queue/constraints.txt

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
aiohttp==3.8.1
2+
aiosignal==1.2.0
3+
async-timeout==4.0.2
4+
attrs==21.4.0
5+
beautifulsoup4==4.11.1
6+
charset-normalizer==2.1.0
7+
commonmark==0.9.1
8+
Deprecated==1.2.13
9+
frozenlist==1.3.0
10+
idna==3.3
11+
kafka-python3==3.0.0
12+
multidict==6.0.2
13+
networkx==2.8.4
14+
packaging==21.3
15+
pika==1.2.1
16+
pydot==1.4.2
17+
Pygments==2.12.0
18+
pygraphviz==1.9
19+
pyparsing==3.0.9
20+
redis==4.3.3
21+
rich==12.4.4
22+
soupsieve==2.3.2.post1
23+
wrapt==1.14.1
24+
yarl==1.7.2
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# consumer.py
2+
3+
from kafka3 import KafkaConsumer
4+
5+
consumer = KafkaConsumer("datascience")
6+
for record in consumer:
7+
message = record.value.decode("utf-8")
8+
print(f"Got message: {message}")
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# docker-compose.yml
2+
3+
version: "3"
4+
services:
5+
zookeeper:
6+
image: 'bitnami/zookeeper:latest'
7+
ports:
8+
- '2181:2181'
9+
environment:
10+
- ALLOW_ANONYMOUS_LOGIN=yes
11+
kafka:
12+
image: 'bitnami/kafka:latest'
13+
ports:
14+
- '9092:9092'
15+
environment:
16+
- KAFKA_BROKER_ID=1
17+
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
18+
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
19+
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
20+
- ALLOW_PLAINTEXT_LISTENER=yes
21+
depends_on:
22+
- zookeeper
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# producer.py
2+
3+
from kafka3 import KafkaProducer
4+
5+
producer = KafkaProducer(bootstrap_servers="localhost:9092")
6+
while True:
7+
message = input("Message: ")
8+
producer.send(topic="datascience", value=message.encode("utf-8"))
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# consumer.py
2+
3+
import pika
4+
5+
QUEUE_NAME = "mailbox"
6+
7+
8+
def callback(channel, method, properties, body):
9+
message = body.decode("utf-8")
10+
print(f"Got message: {message}")
11+
12+
13+
with pika.BlockingConnection() as connection:
14+
channel = connection.channel()
15+
channel.queue_declare(queue=QUEUE_NAME)
16+
channel.basic_consume(
17+
queue=QUEUE_NAME, auto_ack=True, on_message_callback=callback
18+
)
19+
channel.start_consuming()
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# producer.py
2+
3+
import pika
4+
5+
QUEUE_NAME = "mailbox"
6+
7+
with pika.BlockingConnection() as connection:
8+
channel = connection.channel()
9+
channel.queue_declare(queue=QUEUE_NAME)
10+
while True:
11+
message = input("Message: ")
12+
channel.basic_publish(
13+
exchange="", routing_key=QUEUE_NAME, body=message.encode("utf-8")
14+
)
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# publisher.py
2+
3+
import redis
4+
5+
with redis.Redis() as client:
6+
while True:
7+
message = input("Message: ")
8+
client.publish("chatroom", message)
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# subscriber.py
2+
3+
import redis
4+
5+
with redis.Redis() as client:
6+
pubsub = client.pubsub()
7+
pubsub.subscribe("chatroom")
8+
for message in pubsub.listen():
9+
if message["type"] == "message":
10+
body = message["data"].decode("utf-8")
11+
print(f"Got message: {body}")

queue/requirements.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
aiohttp
2+
beautifulsoup4
3+
kafka-python3
4+
networkx
5+
pika
6+
pydot
7+
pygraphviz
8+
redis
9+
rich

0 commit comments

Comments
 (0)