Skip to content

Commit d318b93

Browse files
committed
Python多进程多线程
1 parent 7d76bb7 commit d318b93

File tree

1 file changed

+211
-0
lines changed
  • 04. 训练/20. Python多进程多线程

1 file changed

+211
-0
lines changed
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
# 一. 参考资料
2+
3+
[链接](https://www.biaodianfu.com/python-multi-thread-and-multi-process.html)
4+
5+
# 二. 常用代码
6+
7+
## 1. 获得当前进程ID
8+
9+
```python
10+
import os
11+
os.getpid()
12+
```
13+
14+
## 2. 普通多线程
15+
16+
```python
17+
from threading import Thread
18+
19+
def test(name):
20+
print(name)
21+
22+
if __name__ == '__main__':
23+
t1 = Thread(target=test, args=('thread1',))
24+
t2 = Thread(target=test, args=('thread2',))
25+
t1.start()
26+
t2.start()
27+
t1.join()
28+
t2.join()
29+
print('main')
30+
```
31+
32+
## 3. 普通线程池
33+
34+
python由于GIL的存在, 多线程适用于IO操作比较多的函数
35+
36+
```python
37+
import os
38+
import concurrent.futures
39+
import threading
40+
import time
41+
42+
def process_image(image_path):
43+
file_name = os.path.basename(image_path)
44+
thread_id = threading.current_thread().ident
45+
print(f"Processing image: {file_name} in Thread ID: {thread_id}")
46+
time.sleep(0.1)
47+
48+
def process_images_in_parallel(image_paths, num_threads=4):
49+
with concurrent.futures.ThreadPoolExecutor(max_workers=num_threads) as executor:
50+
# Submit each image processing task to the thread pool
51+
executor.map(process_image, image_paths)
52+
53+
if __name__ == "__main__":
54+
image_paths = [f"image_{i}" for i in range(10000)]
55+
num_threads = 4
56+
process_images_in_parallel(image_paths, num_threads)
57+
```
58+
59+
## 3. 普通多进程
60+
61+
```python
62+
from multiprocessing import Process
63+
import os
64+
65+
def run_proc(name):
66+
print('Run child process %s (%s)...' % (name, os.getpid()))
67+
68+
if __name__=='__main__':
69+
print('Parent process %s.' % os.getpid())
70+
p = Process(target=run_proc, args=('test',))
71+
print('Child process will start.')
72+
p.start()
73+
p.join()
74+
print('Child process end.')
75+
```
76+
77+
## 4. 普通进程池
78+
79+
```python
80+
from multiprocessing import Pool
81+
82+
def test(i):
83+
print(i)
84+
85+
if __name__ == "__main__":
86+
lists = range(100)
87+
pool = Pool(8)
88+
print("多进程开始执行")
89+
pool.map(test, lists)
90+
pool.close()
91+
pool.join()
92+
```
93+
94+
## 5. 向进程池中添加任务(全部添加结束后执行)
95+
96+
向进程池中添加任务, 所有任务全部添加完才执行, 进程池中进程数为8, 也就是开始执行后每次最多同时执行8个任务
97+
98+
```python
99+
from multiprocessing import Pool
100+
101+
def test(i):
102+
print(i)
103+
104+
if __name__ == "__main__":
105+
pool = Pool(8)
106+
for i in range(100):
107+
'''
108+
For循环中执行步骤:
109+
(1)循环遍历,将100个子进程添加到进程池(相对父进程会阻塞)
110+
(2)每次执行8个子进程,等一个子进程执行完后,立马启动新的子进程。(相对父进程不阻塞)
111+
apply_async为异步进程池写法。异步指的是启动子进程的过程,与父进程本身的执行(print)是异步的,而For循环中往进程池添加子进程的过程,与父进程本身的执行却是同步的。
112+
'''
113+
# 只是添加进程并不执行
114+
pool.apply_async(test, args=(i,)) # 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.
115+
# 关闭进程池
116+
pool.close()
117+
print("多进程开始执行")
118+
# 等待子进程结束后再继续往下运行,通常用于进程间的同步
119+
pool.join()
120+
print("多进程结束执行")
121+
```
122+
123+
## 6. 向进程池中添加任务(边添加边执行)
124+
125+
向进程池中添加任务, 只要有任务就执行, 进程池中进程数为8, 也就是最多同时执行8个任务
126+
127+
```python
128+
from multiprocessing import Pool
129+
130+
def test(i):
131+
print(i)
132+
133+
if __name__ == "__main__":
134+
pool = Pool(8)
135+
for i in range(100):
136+
'''
137+
实际测试发现,for循环内部执行步骤:
138+
(1)遍历100个可迭代对象,往进程池放一个子进程
139+
(2)执行这个子进程,等子进程执行完毕,再往进程池放一个子进程,再执行。(同时只执行一个子进程)
140+
for循环执行完毕,再执行print函数。
141+
'''
142+
pool.apply(test, args=(i,)) # 维持执行的进程总数为8,当一个进程执行完后启动一个新进程.
143+
print("多进程结束执行")
144+
pool.close()
145+
pool.join()
146+
```
147+
148+
## 7. JoinableQueue实现多进程之间的通信
149+
150+
多进程间的通信(JoinableQueue)
151+
task_done():消费者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
152+
join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止
153+
154+
```python
155+
from multiprocessing import Process, JoinableQueue
156+
import time, random
157+
158+
def consumer(q):
159+
while True:
160+
res = q.get()
161+
print('消费者拿到了 %s' % res)
162+
q.task_done()
163+
164+
def producer(seq, q):
165+
for item in seq:
166+
time.sleep(random.randrange(1,2))
167+
q.put(item)
168+
print('生产者做好了 %s' % item)
169+
q.join()
170+
171+
if __name__ == "__main__":
172+
q = JoinableQueue()
173+
seq = ('产品%s' % i for i in range(5))
174+
p = Process(target=consumer, args=(q,))
175+
p.daemon = True # 设置为守护进程,在主线程停止时p也停止,但是不用担心,producer内调用q.join保证了consumer已经处理完队列中的所有元素
176+
p.start()
177+
producer(seq, q)
178+
print('主线程')
179+
```
180+
181+
## 8. 进程间数据共享(不常用)
182+
183+
进程间的数据共享(multiprocessing.Queue)(基本不用, 因为进程间本来就是资源独立的)
184+
185+
```python
186+
from multiprocessing import Process, Queue
187+
import os, time, random
188+
189+
def write(q):
190+
print('Process to write: %s' % os.getpid())
191+
for value in ['A', 'B', 'C']:
192+
print('Put %s to queue...' % value)
193+
q.put(value)
194+
time.sleep(random.random())
195+
196+
def read(q):
197+
print('Process to read: %s' % os.getpid())
198+
while True:
199+
value = q.get(True)
200+
print('Get %s from queue.' % value)
201+
202+
if __name__ == "__main__":
203+
q = Queue()
204+
pw = Process(target=write, args=(q,))
205+
pr = Process(target=read, args=(q,))
206+
pw.start()
207+
pr.start()
208+
pw.join() # 等待pw结束
209+
pr.terminate() # pr进程里是死循环,无法等待其结束,只能强行终止
210+
```
211+

0 commit comments

Comments
 (0)