1
+ //! This module implements per-component parallelism.
2
+ //! It should be possible to implement per-row parallelism as well,
3
+ //! which should also boost performance of grayscale images
4
+ //! and allow scaling to more cores.
5
+ //! However, that would be more complex, so we use this as a starting point.
6
+
7
+ use decoder:: MAX_COMPONENTS ;
8
+ use error:: Result ;
9
+ use std:: { mem, sync:: mpsc:: { self , Sender } } ;
10
+ use std:: thread;
11
+ use super :: { RowData , Worker } ;
12
+ use super :: immediate:: ImmediateWorker ;
13
+
14
+ enum WorkerMsg {
15
+ Start ( RowData ) ,
16
+ AppendRow ( Vec < i16 > ) ,
17
+ GetResult ( Sender < Vec < u8 > > ) ,
18
+ }
19
+ pub struct MultiThreadedWorker {
20
+ senders : [ Option < Sender < WorkerMsg > > ; MAX_COMPONENTS ]
21
+ }
22
+
23
+ impl Worker for MultiThreadedWorker {
24
+ fn new ( ) -> Result < Self > {
25
+ Ok ( MultiThreadedWorker {
26
+ senders : [ None , None , None , None ]
27
+ } )
28
+ }
29
+ fn start ( & mut self , row_data : RowData ) -> Result < ( ) > {
30
+ // if there is no worker thread for this component yet, start one
31
+ let component = row_data. index ;
32
+ if let None = self . senders [ component] {
33
+ let sender = spawn_worker_thread ( component) ?;
34
+ self . senders [ component] = Some ( sender) ;
35
+ }
36
+ // we do the "take out value and put it back in once we're done" dance here
37
+ // and in all other message-passing methods because there's not that many rows
38
+ // and this should be cheaper than spawning MAX_COMPONENTS many threads up front
39
+ let sender = mem:: replace ( & mut self . senders [ component] , None ) . unwrap ( ) ;
40
+ sender. send ( WorkerMsg :: Start ( row_data) ) . expect ( "jpeg-decoder worker thread error" ) ;
41
+ self . senders [ component] = Some ( sender) ;
42
+ Ok ( ( ) )
43
+ }
44
+ fn append_row ( & mut self , row : ( usize , Vec < i16 > ) ) -> Result < ( ) > {
45
+ let component = row. 0 ;
46
+ let sender = mem:: replace ( & mut self . senders [ component] , None ) . unwrap ( ) ;
47
+ sender. send ( WorkerMsg :: AppendRow ( row. 1 ) ) . expect ( "jpeg-decoder worker thread error" ) ;
48
+ self . senders [ component] = Some ( sender) ;
49
+ Ok ( ( ) )
50
+ }
51
+ fn get_result ( & mut self , index : usize ) -> Result < Vec < u8 > > {
52
+ let ( tx, rx) = mpsc:: channel ( ) ;
53
+ let sender = mem:: replace ( & mut self . senders [ index] , None ) . unwrap ( ) ;
54
+ sender. send ( WorkerMsg :: GetResult ( tx) ) . expect ( "jpeg-decoder worker thread error" ) ;
55
+ Ok ( rx. recv ( ) . expect ( "jpeg-decoder worker thread error" ) )
56
+ }
57
+ }
58
+
59
+ fn spawn_worker_thread ( component : usize ) -> Result < Sender < WorkerMsg > > {
60
+ let thread_builder = thread:: Builder :: new ( ) . name ( format ! ( "worker thread for component {}" , component) ) ;
61
+ let ( tx, rx) = mpsc:: channel ( ) ;
62
+
63
+ thread_builder. spawn ( move || {
64
+ let mut worker = ImmediateWorker :: new_immediate ( ) ;
65
+
66
+ while let Ok ( message) = rx. recv ( ) {
67
+ match message {
68
+ WorkerMsg :: Start ( mut data) => {
69
+ // we always set component index to 0 for worker threads
70
+ // because they only ever handle one per thread and we don't want them
71
+ // to attempt to access nonexistent components
72
+ data. index = 0 ;
73
+ worker. start_immediate ( data) ;
74
+ } ,
75
+ WorkerMsg :: AppendRow ( row) => {
76
+ worker. append_row_immediate ( ( 0 , row) ) ;
77
+ } ,
78
+ WorkerMsg :: GetResult ( chan) => {
79
+ let _ = chan. send ( worker. get_result_immediate ( 0 ) ) ;
80
+ break ;
81
+ } ,
82
+ }
83
+ }
84
+ } ) ?;
85
+
86
+ Ok ( tx)
87
+ }
0 commit comments