Skip to content

Commit d55ad39

Browse files
authored
Merge pull request #36 from realpython/jima/threading
Jima/threading
2 parents 43322e0 + 7b9ac20 commit d55ad39

File tree

10 files changed

+412
-0
lines changed

10 files changed

+412
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/usr/bin/env python3
2+
import logging
3+
import threading
4+
import time
5+
6+
7+
def thread_function(name):
8+
logging.info("Thread %s: starting", name)
9+
time.sleep(2)
10+
logging.info("Thread %s: finishing", name)
11+
12+
13+
if __name__ == "__main__":
14+
format = "%(asctime)s: %(message)s"
15+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
16+
17+
logging.info("Main : before creating thread")
18+
x = threading.Thread(target=thread_function, args=(1,), daemon=True)
19+
logging.info("Main : before running thread")
20+
x.start()
21+
logging.info("Main : wait for the thread to finish")
22+
x.join()
23+
logging.info("Main : all done")

intro-to-threading/executor.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#!/usr/bin/env python3
2+
import concurrent.futures
3+
import logging
4+
import time
5+
6+
7+
def thread_function(name):
8+
logging.info("Thread %s: starting", name)
9+
time.sleep(2)
10+
logging.info("Thread %s: finishing", name)
11+
12+
13+
if __name__ == "__main__":
14+
format = "%(asctime)s: %(message)s"
15+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
16+
17+
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
18+
executor.map(thread_function, range(3))

intro-to-threading/fixrace.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#!/usr/bin/env python3
2+
import concurrent.futures
3+
import logging
4+
import threading
5+
import time
6+
7+
8+
class FakeDatabase:
9+
def __init__(self):
10+
self.value = 0
11+
self._lock = threading.Lock()
12+
13+
def locked_update(self, name):
14+
logging.info("Thread %s: starting update", name)
15+
logging.debug("Thread %s about to lock", name)
16+
with self._lock:
17+
logging.debug("Thread %s has lock", name)
18+
local_copy = self.value
19+
local_copy += 1
20+
time.sleep(0.1)
21+
self.value = local_copy
22+
logging.debug("Thread %s about to release lock", name)
23+
logging.debug("Thread %s after release", name)
24+
logging.info("Thread %s: finishing update", name)
25+
26+
27+
if __name__ == "__main__":
28+
format = "%(asctime)s: %(message)s"
29+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
30+
# logging.getLogger().setLevel(logging.DEBUG)
31+
32+
database = FakeDatabase()
33+
logging.info(
34+
"Testing locked update. Starting value is %d.", database.value
35+
)
36+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
37+
for index in range(2):
38+
executor.submit(database.locked_update, index)
39+
logging.info("Testing locked update. Ending value is %d.", database.value)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
#!/usr/bin/env python3
2+
import logging
3+
import threading
4+
import time
5+
6+
7+
def thread_function(name):
8+
logging.info("Thread %s: starting", name)
9+
time.sleep(2)
10+
logging.info("Thread %s: finishing", name)
11+
12+
13+
if __name__ == "__main__":
14+
format = "%(asctime)s: %(message)s"
15+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
16+
17+
threads = list()
18+
for index in range(3):
19+
logging.info("Main : create and start thread %d.", index)
20+
x = threading.Thread(target=thread_function, args=(index,))
21+
threads.append(x)
22+
x.start()
23+
24+
for index, thread in enumerate(threads):
25+
logging.info("Main : before joining thread %d.", index)
26+
thread.join()
27+
logging.info("Main : thread %d done", index)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#!/usr/bin/env python3
2+
import concurrent.futures
3+
import logging
4+
import queue
5+
import random
6+
import threading
7+
import time
8+
9+
10+
def producer(queue, event):
11+
"""Pretend we're getting a number from the network."""
12+
while not event.is_set():
13+
message = random.randint(1, 101)
14+
logging.info("Producer got message: %s", message)
15+
queue.put(message)
16+
17+
logging.info("Producer received event. Exiting")
18+
19+
20+
def consumer(queue, event):
21+
""" Pretend we're saving a number in the database. """
22+
while not event.is_set() or not pipeline.empty():
23+
message = queue.get()
24+
logging.info(
25+
"Consumer storing message: %s (size=%d)", message, queue.qsize()
26+
)
27+
28+
logging.info("Consumer received event. Exiting")
29+
30+
31+
if __name__ == "__main__":
32+
format = "%(asctime)s: %(message)s"
33+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
34+
35+
pipeline = queue.Queue(maxsize=10)
36+
event = threading.Event()
37+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
38+
executor.submit(producer, pipeline, event)
39+
executor.submit(consumer, pipeline, event)
40+
41+
time.sleep(0.1)
42+
logging.info("Main: about to set event")
43+
event.set()
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
#!/usr/bin/env python3
2+
import concurrent.futures
3+
import logging
4+
import random
5+
import threading
6+
import time
7+
8+
9+
class Pipeline:
10+
"""Class to allow a single element pipeline between producer and consumer.
11+
"""
12+
13+
def __init__(self):
14+
self.value = 0
15+
self._set_lock = threading.Lock()
16+
self._get_lock = threading.Lock()
17+
self._get_lock.acquire()
18+
19+
def get_value(self, name):
20+
logging.debug("%s:about to acquire getlock", name)
21+
self._get_lock.acquire()
22+
logging.debug("%s:have getlock", name)
23+
value = self.value
24+
logging.debug("%s:about to release setlock", name)
25+
self._set_lock.release()
26+
logging.debug("%s:setlock released", name)
27+
return value
28+
29+
def set_value(self, value, name):
30+
logging.debug("%s:about to acquire setlock", name)
31+
self._set_lock.acquire()
32+
logging.debug("%s:have setlock", name)
33+
self.value = value
34+
logging.debug("%s:about to release getlock", name)
35+
self._get_lock.release()
36+
logging.debug("%s:getlock released", name)
37+
38+
39+
def producer(pipeline, event):
40+
"""Pretend we're getting a number from the network."""
41+
while not event.is_set():
42+
new_datapoint = random.randint(1, 101)
43+
# Sleep to simulate waiting for data from network
44+
# time.sleep(float(new_datapoint)/100)
45+
logging.info("Producer got data %d", new_datapoint)
46+
pipeline.set_value(new_datapoint, "Producer")
47+
if event.is_set():
48+
logging.info("Producer received internal event. Exiting")
49+
50+
# don't put sleep here as this will cause the system to stall when the
51+
# consumer is active. That will result in deadlock.
52+
# time.sleep(float(new_datapoint)/100)
53+
54+
logging.info("Producer received event. Exiting")
55+
56+
57+
def consumer(pipeline, event):
58+
""" Pretend we're saving a number in the database. """
59+
datapoint = 0
60+
while not event.is_set():
61+
datapoint = pipeline.get_value("Consumer")
62+
logging.info("Consumer storing data: %d", datapoint)
63+
64+
logging.info("Consumer received event. Exiting")
65+
66+
67+
if __name__ == "__main__":
68+
format = "%(asctime)s: %(message)s"
69+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
70+
# logging.getLogger().setLevel(logging.DEBUG)
71+
72+
pipeline = Pipeline()
73+
event = threading.Event()
74+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
75+
executor.submit(producer, pipeline, event)
76+
executor.submit(consumer, pipeline, event)
77+
78+
time.sleep(0.1)
79+
logging.info("Main: about to set event")
80+
event.set()

intro-to-threading/prodcom_lock.py

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
#!/usr/bin/env python3
2+
import concurrent.futures
3+
import logging
4+
import random
5+
import threading
6+
7+
SENTINEL = -1
8+
9+
10+
class Pipeline:
11+
"""Class to allow a single element pipeline between producer and consumer.
12+
"""
13+
14+
def __init__(self):
15+
self.message = 0
16+
self.producer_lock = threading.Lock()
17+
self.consumer_lock = threading.Lock()
18+
self.consumer_lock.acquire()
19+
20+
def get_message(self, name):
21+
logging.debug("%s:about to acquire getlock", name)
22+
self.consumer_lock.acquire()
23+
logging.debug("%s:have getlock", name)
24+
message = self.message
25+
logging.debug("%s:about to release setlock", name)
26+
self.producer_lock.release()
27+
logging.debug("%s:setlock released", name)
28+
return message
29+
30+
def set_message(self, message, name):
31+
logging.debug("%s:about to acquire setlock", name)
32+
self.producer_lock.acquire()
33+
logging.debug("%s:have setlock", name)
34+
self.message = message
35+
logging.debug("%s:about to release getlock", name)
36+
self.consumer_lock.release()
37+
logging.debug("%s:getlock released", name)
38+
39+
40+
def producer(pipeline):
41+
"""Pretend we're getting a message from the network."""
42+
for index in range(10):
43+
message = random.randint(1, 101)
44+
logging.info("Producer got message: %s", message)
45+
pipeline.set_message(message, "Producer")
46+
47+
# Send a sentinel message to tell consumer we're done
48+
pipeline.set_message(SENTINEL, "Producer")
49+
50+
51+
def consumer(pipeline):
52+
""" Pretend we're saving a number in the database. """
53+
message = 0
54+
while message != SENTINEL:
55+
message = pipeline.get_message("Consumer")
56+
if message != SENTINEL:
57+
logging.info("Consumer storing message: %s", message)
58+
59+
60+
if __name__ == "__main__":
61+
format = "%(asctime)s: %(message)s"
62+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
63+
# logging.getLogger().setLevel(logging.DEBUG)
64+
65+
pipeline = Pipeline()
66+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
67+
executor.submit(producer, pipeline)
68+
executor.submit(consumer, pipeline)
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#!/usr/bin/env python3
2+
import concurrent.futures
3+
import logging
4+
import queue
5+
import random
6+
import threading
7+
import time
8+
9+
10+
class Pipeline(queue.Queue):
11+
def __init__(self):
12+
super().__init__(maxsize=10)
13+
14+
def get_message(self, name):
15+
logging.debug("%s:about to get from queue", name)
16+
value = self.get()
17+
logging.debug("%s:got %d from queue", name, value)
18+
return value
19+
20+
def set_message(self, value, name):
21+
logging.debug("%s:about to add %d to queue", name, value)
22+
self.put(value)
23+
logging.debug("%s:added %d to queue", name, value)
24+
25+
26+
def producer(pipeline, event):
27+
"""Pretend we're getting a number from the network."""
28+
while not event.is_set():
29+
message = random.randint(1, 101)
30+
logging.info("Producer got message: %s", message)
31+
pipeline.set_message(message, "Producer")
32+
33+
logging.info("Producer received EXIT event. Exiting")
34+
35+
36+
def consumer(pipeline, event):
37+
""" Pretend we're saving a number in the database. """
38+
while not event.is_set() or not pipeline.empty():
39+
message = pipeline.get_message("Consumer")
40+
logging.info(
41+
"Consumer storing message: %s (queue size=%s)",
42+
message,
43+
pipeline.qsize(),
44+
)
45+
46+
logging.info("Consumer received EXIT event. Exiting")
47+
48+
49+
if __name__ == "__main__":
50+
format = "%(asctime)s: %(message)s"
51+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
52+
# logging.getLogger().setLevel(logging.DEBUG)
53+
54+
pipeline = Pipeline()
55+
event = threading.Event()
56+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
57+
executor.submit(producer, pipeline, event)
58+
executor.submit(consumer, pipeline, event)
59+
60+
time.sleep(0.1)
61+
logging.info("Main: about to set event")
62+
event.set()

intro-to-threading/racecond.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
#!/usr/bin/env python3
2+
import concurrent.futures
3+
import logging
4+
import time
5+
6+
7+
class FakeDatabase:
8+
def __init__(self):
9+
self.value = 0
10+
11+
def update(self, name):
12+
logging.info("Thread %s: starting update", name)
13+
local_copy = self.value
14+
local_copy += 1
15+
time.sleep(0.1)
16+
self.value = local_copy
17+
logging.info("Thread %s: finishing update", name)
18+
19+
20+
if __name__ == "__main__":
21+
format = "%(asctime)s: %(message)s"
22+
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")
23+
24+
database = FakeDatabase()
25+
logging.info("Testing update. Starting value is %d.", database.value)
26+
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
27+
for index in range(2):
28+
executor.submit(database.update, index)
29+
logging.info("Testing update. Ending value is %d.", database.value)

0 commit comments

Comments
 (0)