37
37
import java .util .concurrent .ExecutorService ;
38
38
import java .util .concurrent .Executors ;
39
39
import java .util .concurrent .Future ;
40
- import java .util .concurrent .locks .Condition ;
41
40
import java .util .concurrent .locks .ReentrantLock ;
42
41
43
42
import org .apache .hc .core5 .http .Header ;
44
43
import org .apache .hc .core5 .http .WritableByteChannelMock ;
45
44
import org .apache .hc .core5 .http .nio .DataStreamChannel ;
46
45
import org .apache .hc .core5 .util .Timeout ;
47
46
import org .junit .jupiter .api .Assertions ;
47
+ import org .junit .jupiter .api .RepeatedTest ;
48
48
import org .junit .jupiter .api .Test ;
49
49
import org .mockito .Mockito ;
50
50
@@ -57,12 +57,10 @@ static class DataStreamChannelMock implements DataStreamChannel {
57
57
private final WritableByteChannelMock channel ;
58
58
59
59
private final ReentrantLock lock ;
60
- private final Condition condition ;
61
60
62
61
DataStreamChannelMock (final WritableByteChannelMock channel ) {
63
62
this .channel = channel ;
64
63
this .lock = new ReentrantLock ();
65
- this .condition = lock .newCondition ();
66
64
}
67
65
68
66
@ Override
@@ -77,20 +75,13 @@ public int write(final ByteBuffer src) throws IOException {
77
75
78
76
@ Override
79
77
public void requestOutput () {
80
- lock .lock ();
81
- try {
82
- condition .signalAll ();
83
- } finally {
84
- lock .unlock ();
85
- }
86
78
}
87
79
88
80
@ Override
89
81
public void endStream (final List <? extends Header > trailers ) throws IOException {
90
82
lock .lock ();
91
83
try {
92
84
channel .close ();
93
- condition .signalAll ();
94
85
} finally {
95
86
lock .unlock ();
96
87
}
@@ -101,15 +92,6 @@ public void endStream() throws IOException {
101
92
endStream (null );
102
93
}
103
94
104
- public void awaitOutputRequest () throws InterruptedException {
105
- lock .lock ();
106
- try {
107
- condition .await ();
108
- } finally {
109
- lock .unlock ();
110
- }
111
- }
112
-
113
95
}
114
96
115
97
@ Test
@@ -164,7 +146,7 @@ void testFlush() throws Exception {
164
146
Assertions .assertEquals (30 , outputBuffer .capacity ());
165
147
}
166
148
167
- @ Test
149
+ @ RepeatedTest ( 20 )
168
150
void testMultithreadingWriteStream () throws Exception {
169
151
170
152
final Charset charset = StandardCharsets .US_ASCII ;
@@ -174,36 +156,37 @@ void testMultithreadingWriteStream() throws Exception {
174
156
final DataStreamChannelMock dataStreamChannel = new DataStreamChannelMock (channel );
175
157
176
158
final ExecutorService executorService = Executors .newFixedThreadPool (2 );
177
- final Future <Boolean > task1 = executorService .submit (() -> {
178
- final byte [] tmp = "1234567890" .getBytes (charset );
179
- outputBuffer .write (tmp , 0 , tmp .length );
180
- outputBuffer .write (tmp , 0 , tmp .length );
181
- outputBuffer .write ('1' );
182
- outputBuffer .write ('2' );
183
- outputBuffer .write (tmp , 0 , tmp .length );
184
- outputBuffer .write (tmp , 0 , tmp .length );
185
- outputBuffer .write (tmp , 0 , tmp .length );
186
- outputBuffer .writeCompleted ();
187
- outputBuffer .writeCompleted ();
188
- return Boolean .TRUE ;
189
- });
190
- final Future <Boolean > task2 = executorService .submit (() -> {
191
- for (;;) {
192
- outputBuffer .flush (dataStreamChannel );
193
- if (outputBuffer .isEndStream ()) {
194
- break ;
195
- }
196
- if (!outputBuffer .hasData ()) {
197
- dataStreamChannel .awaitOutputRequest ();
159
+ try {
160
+ final Future <Boolean > task1 = executorService .submit (() -> {
161
+ final byte [] tmp = "1234567890" .getBytes (charset );
162
+ outputBuffer .write (tmp , 0 , tmp .length );
163
+ outputBuffer .write (tmp , 0 , tmp .length );
164
+ outputBuffer .write ('1' );
165
+ outputBuffer .write ('2' );
166
+ outputBuffer .write (tmp , 0 , tmp .length );
167
+ outputBuffer .write (tmp , 0 , tmp .length );
168
+ outputBuffer .write (tmp , 0 , tmp .length );
169
+ outputBuffer .writeCompleted ();
170
+ outputBuffer .writeCompleted ();
171
+ return Boolean .TRUE ;
172
+ });
173
+ final Future <Boolean > task2 = executorService .submit (() -> {
174
+ for (;;) {
175
+ outputBuffer .flush (dataStreamChannel );
176
+ if (outputBuffer .isEndStream ()) {
177
+ break ;
178
+ }
198
179
}
199
- }
200
- return Boolean .TRUE ;
201
- });
180
+ return Boolean .TRUE ;
181
+ });
202
182
203
- Assertions .assertEquals (Boolean .TRUE , task1 .get (TIMEOUT .getDuration (), TIMEOUT .getTimeUnit ()));
204
- Assertions .assertEquals (Boolean .TRUE , task2 .get (TIMEOUT .getDuration (), TIMEOUT .getTimeUnit ()));
183
+ Assertions .assertEquals (Boolean .TRUE , task1 .get (TIMEOUT .getDuration (), TIMEOUT .getTimeUnit ()));
184
+ Assertions .assertEquals (Boolean .TRUE , task2 .get (TIMEOUT .getDuration (), TIMEOUT .getTimeUnit ()));
205
185
206
- Assertions .assertEquals ("1234567890123456789012123456789012345678901234567890" , new String (channel .toByteArray (), charset ));
186
+ Assertions .assertEquals ("1234567890123456789012123456789012345678901234567890" , new String (channel .toByteArray (), charset ));
187
+ } finally {
188
+ executorService .shutdownNow ();
189
+ }
207
190
}
208
191
209
192
@ Test
0 commit comments