|
239 | 239 | 生产者和消费者问题 |
240 | 240 | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
241 | 241 |
|
242 | | -TODO: |
| 242 | +.. image:: mpsc-problem.png |
| 243 | + :align: center |
| 244 | + |
| 245 | +生产者-消费者问题(也称为有限缓冲问题)是 Dijkstra 自 1965 年以来描述的一系列同步互斥问题中的一个。如图所示,一共有 5 个线程在同进程下进行协作,其中有 4 个生产者(Producer,图中右侧)和 1 个消费者(Consumer,图中左侧),它们共享一个容量有限的环形缓冲区(图中间)。生产者的职责是将输入放入缓冲区,而消费者则从缓冲区中取出数据进行处理。然而,这两种操作并不总是能够立即成功的。比如,当缓冲区已满的情况下,生产者就无法将数据放入缓冲区,需要等消费者取出数据空出缓冲区槽位;而当缓冲区为空没有数据的时候,消费者也无法从中取出数据,需要等生产者将数据填充到缓冲区。考虑使用信号量来实现上述同步需求,可以看成管理以下资源: |
| 246 | + |
| 247 | +- 空闲槽位资源,初始数量 :math:`N` 等于缓冲区容量。生产者每次写入需要占用 1 个,消费者每次读取恢复 1 个; |
| 248 | +- 可用数据资源,初始数量 :math:`N=0` (最开始缓冲区为空)。消费者每次读取占用 1 个,生产者每次写入恢复 1 个; |
| 249 | +- 将缓冲区以及相应指针(即 ``front`` 和 ``tail`` )整体上视作一种共享资源,那么生产者和消费者的写入和读取都会对这个共享资源进行修改。注意 **信号量只保证无可用资源时进行阻塞,但并不保证访问共享资源的互斥性,甚至这可能是两种不同资源** 。因此,我们还需要引入互斥锁对缓冲区进行保护,这里使用一个 :math:`N=1` 的二值信号量来实现。 |
| 250 | + |
| 251 | +代码如下: |
| 252 | + |
| 253 | +.. code-block:: rust |
| 254 | + :linenos: |
| 255 | +
|
| 256 | + // user/src/bin/mpsc_sem.rs |
| 257 | +
|
| 258 | + const SEM_MUTEX: usize = 0; |
| 259 | + const SEM_EMPTY: usize = 1; |
| 260 | + const SEM_AVAIL: usize = 2; |
| 261 | + const BUFFER_SIZE: usize = 8; |
| 262 | + static mut BUFFER: [usize; BUFFER_SIZE] = [0; BUFFER_SIZE]; |
| 263 | + static mut FRONT: usize = 0; |
| 264 | + static mut TAIL: usize = 0; |
| 265 | + const PRODUCER_COUNT: usize = 4; |
| 266 | + const NUMBER_PER_PRODUCER: usize = 100; |
| 267 | +
|
| 268 | + unsafe fn producer(id: *const usize) -> ! { |
| 269 | + let id = *id; |
| 270 | + for _ in 0..NUMBER_PER_PRODUCER { |
| 271 | + semaphore_down(SEM_EMPTY); |
| 272 | + semaphore_down(SEM_MUTEX); |
| 273 | + BUFFER[TAIL] = id; |
| 274 | + TAIL = (TAIL + 1) % BUFFER_SIZE; |
| 275 | + semaphore_up(SEM_MUTEX); |
| 276 | + semaphore_up(SEM_AVAIL); |
| 277 | + } |
| 278 | + exit(0) |
| 279 | + } |
| 280 | +
|
| 281 | + unsafe fn consumer() -> ! { |
| 282 | + for _ in 0..PRODUCER_COUNT * NUMBER_PER_PRODUCER { |
| 283 | + semaphore_down(SEM_AVAIL); |
| 284 | + semaphore_down(SEM_MUTEX); |
| 285 | + print!("{} ", BUFFER[FRONT]); |
| 286 | + FRONT = (FRONT + 1) % BUFFER_SIZE; |
| 287 | + semaphore_up(SEM_MUTEX); |
| 288 | + semaphore_up(SEM_EMPTY); |
| 289 | + } |
| 290 | + println!(""); |
| 291 | + exit(0) |
| 292 | + } |
| 293 | +
|
| 294 | + #[no_mangle] |
| 295 | + pub fn main() -> i32 { |
| 296 | + // create semaphores |
| 297 | + assert_eq!(semaphore_create(1) as usize, SEM_MUTEX); |
| 298 | + assert_eq!(semaphore_create(BUFFER_SIZE) as usize, SEM_EMPTY); |
| 299 | + assert_eq!(semaphore_create(0) as usize, SEM_AVAIL); |
| 300 | + // create threads |
| 301 | + let ids: Vec<_> = (0..PRODUCER_COUNT).collect(); |
| 302 | + let mut threads = Vec::new(); |
| 303 | + for i in 0..PRODUCER_COUNT { |
| 304 | + threads.push(thread_create( |
| 305 | + producer as usize, |
| 306 | + &ids.as_slice()[i] as *const _ as usize, |
| 307 | + )); |
| 308 | + } |
| 309 | + threads.push(thread_create(consumer as usize, 0)); |
| 310 | + // wait for all threads to complete |
| 311 | + for thread in threads.iter() { |
| 312 | + waittid(*thread as usize); |
| 313 | + } |
| 314 | + println!("mpsc_sem passed!"); |
| 315 | + 0 |
| 316 | + } |
| 317 | +
|
| 318 | +第 42-44 行分别创建了二值信号量 ``SEM_MUTEX`` ,描述空闲槽位资源的信号量 ``SEM_EMPTY`` 以及描述可用数据资源的信号量 ``SEM_AVAIL`` 。生产者线程会执行 ``producer`` 函数,循环的每次迭代向共享缓冲区写入数据。于是在写入之前需要进行信号量 ``SEM_EMPTY`` 的 down 操作尝试占用一个空闲槽位资源,而在写入之后进行信号量 ``SEM_AVAIL`` 的 up 操作释放一个可用数据资源。相对的,消费者线程会执行 ``consumer`` 函数,循环的每次迭代从共享缓冲区读取数据。于是在读入之前需要进行信号量 ``SEM_AVAIL`` 的 down 操作尝试占用一个可用数据资源,而在读取之后进行信号量 ``SEM_EMPTY`` 的 up 操作释放一个空闲槽位资源。两个线程对共享缓冲区的操作都需要用二值信号量 ``SEM_MUTEX`` 来保护。 |
| 319 | + |
| 320 | +从这个例子可以看出,信号量的使用可以是非常灵活的。同一个信号量的 P 操作和 V 操作不一定是连续的,甚至可以不在一个线程上。 |
243 | 321 |
|
244 | 322 | 实现信号量 |
245 | 323 | ------------------------------------------ |
@@ -328,9 +406,15 @@ TODO: |
328 | 406 |
|
329 | 407 | .. 在线程的眼里,信号量是一种每个线程能看到的共享资源,且可以存在多个不同信号量来合理使用不同的资源。所以我们可以把信号量也看成四一种资源,可放在一起让进程来管理,如下面代码第9行所示。这里需要注意的是: ``semaphore_list: Vec<Option<Arc<Semaphore>>>`` 表示的是信号量资源的列表。而 ``Semaphore`` 是信号量的内核数据结构,由信号量值和等待队列组成。操作系统需要显式地施加某种控制,来确定当一个线程执行P操作和V操作时,如何让线程睡眠或唤醒线程。在这里,P操作是由 ``Semaphore`` 的 ``down`` 方法实现,而V操作是由 ``Semaphore`` 的 ``up`` 方法实现。 |
330 | 408 |
|
| 409 | +小结 |
| 410 | +----------------------------------------------------- |
| 411 | + |
| 412 | +本节我们介绍了相比互斥锁更加灵活强大的同步原语——信号量,并用它解决了条件同步和经典的生产者-消费者问题。但是要看到的是,信号量还是比较复杂的。对于程序员来说开发和阅读代码比较困难,且比较容易出错,对程序员的要求比较高。 |
| 413 | + |
331 | 414 | 参考文献 |
332 | 415 | ---------------------------------------------------- |
333 | 416 |
|
334 | 417 | - Dijkstra, Edsger W. Cooperating sequential processes (EWD-123) (PDF). E.W. Dijkstra Archive. Center for American History, University of Texas at Austin. (transcription) (September 1965) https://www.cs.utexas.edu/users/EWD/transcriptions/EWD01xx/EWD123.html |
335 | 418 | - Downey, Allen B. (2016) [2005]. "The Little Book of Semaphores" (2nd ed.). Green Tea Press. |
336 | | -- Leppäjärvi, Jouni (May 11, 2008). "A pragmatic, historically oriented survey on the universality of synchronization primitives" (pdf). University of Oulu, Finland. |
| 419 | +- Leppäjärvi, Jouni (May 11, 2008). "A pragmatic, historically oriented survey on the universality of synchronization primitives" (pdf). University of Oulu, Finland. |
| 420 | +- `Producer-consumer problem, Wikipedia <https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem>`_ |
0 commit comments