4141
4242import java .io .IOException ;
4343import java .nio .ByteBuffer ;
44+ import java .util .Random ;
4445import java .util .concurrent .ExecutorService ;
4546import java .util .concurrent .Executors ;
47+ import java .util .concurrent .Semaphore ;
4648import java .util .concurrent .TimeUnit ;
49+ import java .util .concurrent .atomic .AtomicBoolean ;
4750
4851import org .glassfish .jersey .internal .LocalizationMessages ;
4952
5255import static org .junit .Assert .assertNotEquals ;
5356import static org .junit .Assert .assertNotNull ;
5457import static org .junit .Assert .assertNull ;
58+ import static org .junit .Assert .assertTrue ;
5559import static org .junit .Assert .fail ;
5660
5761/**
6165 */
6266public class ByteBufferInputStreamTest {
6367
68+ @ Test
69+ public void testBlockingReadAByteEmptyStream () throws Exception {
70+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
71+ bbis .closeQueue ();
72+ assertEquals (-1 , bbis .read ());
73+ }
74+
75+ @ Test
76+ public void testNonBlockingReadAByteEmptyStream () throws Exception {
77+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
78+ bbis .closeQueue ();
79+ assertEquals (-1 , bbis .tryRead ());
80+ }
81+
82+ @ Test
83+ public void testBlockingReadByteArrayEmptyStream () throws Exception {
84+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
85+ bbis .closeQueue ();
86+ byte [] buf = new byte [1024 ];
87+ assertEquals (-1 , bbis .read (buf ));
88+ }
89+
90+ @ Test
91+ public void testNonBlockingReadByteArrayEmptyStream () throws Exception {
92+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
93+ bbis .closeQueue ();
94+ byte [] buf = new byte [1024 ];
95+ assertEquals (-1 , bbis .tryRead (buf ));
96+ }
97+
98+ @ Test
99+ public void testBlockingReadByteArrayFromFinishedExactLengthStream () throws Exception {
100+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
101+ byte [] sourceData = new byte [1024 ];
102+ new Random ().nextBytes (sourceData );
103+ ByteBuffer byteBuf = ByteBuffer .wrap (sourceData );
104+ bbis .put (byteBuf );
105+ bbis .closeQueue ();
106+ byte [] buf = new byte [1024 ];
107+ assertEquals (1024 , bbis .read (buf ));
108+ // no more data to read; so it should return -1
109+ assertEquals (-1 , bbis .read (buf ));
110+ }
111+
112+ @ Test
113+ public void testNonBlockingReadByteArrayFromFinishedExactLengthStream () throws Exception {
114+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
115+ byte [] sourceData = new byte [1024 ];
116+ new Random ().nextBytes (sourceData );
117+ ByteBuffer byteBuf = ByteBuffer .wrap (sourceData );
118+ bbis .put (byteBuf );
119+ byte [] buf = new byte [1024 ];
120+ assertEquals (1024 , bbis .tryRead (buf ));
121+ // the queue has not been close; so it should return 0
122+ assertEquals (0 , bbis .tryRead (buf ));
123+ bbis .closeQueue ();
124+ }
125+
126+ @ Test
127+ public void testBlockingReadByteArrayFromUnfinishedExactLengthStream () throws Exception {
128+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
129+ byte [] sourceData = new byte [1024 ];
130+ new Random ().nextBytes (sourceData );
131+ ByteBuffer byteBuf = ByteBuffer .wrap (sourceData );
132+ bbis .put (byteBuf );
133+ final byte [] buf = new byte [1024 ];
134+ assertEquals (1024 , bbis .read (buf ));
135+ final AtomicBoolean closed = new AtomicBoolean (false );
136+ final Semaphore s = new Semaphore (1 );
137+ s .acquire ();
138+ Thread t = new Thread (new Runnable () {
139+ @ Override
140+ public void run () {
141+ try {
142+ // it should return -1 since there is no more data
143+ assertEquals (-1 , bbis .read (buf ));
144+ // it should only reach here if the stream has been closed
145+ assertTrue (closed .get ());
146+ } catch (IOException e ) {
147+ e .printStackTrace ();
148+ } finally {
149+ s .release ();
150+ }
151+ }
152+ });
153+ t .start ();
154+ Thread .sleep (500 );
155+ closed .set (true );
156+ bbis .closeQueue ();
157+ // wait until the job is done
158+ s .acquire ();
159+ }
160+
161+ @ Test
162+ public void testNonBlockingReadByteArrayFromUnfinishedExactLengthStream () throws Exception {
163+ final ByteBufferInputStream bbis = new ByteBufferInputStream ();
164+ byte [] sourceData = new byte [1024 ];
165+ new Random ().nextBytes (sourceData );
166+ ByteBuffer byteBuf = ByteBuffer .wrap (sourceData );
167+ bbis .put (byteBuf );
168+ bbis .closeQueue ();
169+ byte [] buf = new byte [1024 ];
170+ assertEquals (1024 , bbis .tryRead (buf ));
171+ assertEquals (-1 , bbis .tryRead (buf ));
172+ }
173+
64174 /**
65175 * Test for non blocking single-byte read of the stream.
66176 *
@@ -85,7 +195,7 @@ public void run() {
85195 }
86196 data .clear ();
87197 for (int j = 0 ; j < data .capacity (); j ++) {
88- data .put ((byte ) (i % 128 ));
198+ data .put ((byte ) (i & 0xFF ));
89199 }
90200 data .flip ();
91201 if (!bbis .put (data )) {
@@ -113,7 +223,7 @@ public void run() {
113223 Thread .yield (); // Give the other thread a chance to run.
114224 continue ;
115225 }
116- assertEquals ("At position: " + j , (byte ) (i % 128 ), c );
226+ assertEquals ("At position: " + j , (byte ) (i & 0xFF ), ( byte ) ( c & 0xFF ) );
117227 if (++j % BUFFER_SIZE == 0 ) {
118228 i ++;
119229 Thread .yield (); // Give the other thread a chance to run.
@@ -155,7 +265,7 @@ public void run() {
155265 }
156266 data .clear ();
157267 for (int j = 0 ; j < data .capacity (); j ++) {
158- data .put ((byte ) (i % 128 ));
268+ data .put ((byte ) (i & 0xFF ));
159269 }
160270 data .flip ();
161271 if (!bbis .put (data )) {
@@ -185,7 +295,7 @@ public void run() {
185295 continue ;
186296 }
187297 for (int p = 0 ; p < c ; p ++) {
188- assertEquals ("At position: " + j , (byte ) (i % 128 ), buffer [p ]);
298+ assertEquals ("At position: " + j , (byte ) (i & 0xFF ), ( byte ) buffer [p ]);
189299 if (++j % BUFFER_SIZE == 0 ) {
190300 i ++;
191301 Thread .yield (); // Give the other thread a chance to run.
@@ -228,7 +338,7 @@ public void run() {
228338 }
229339 data .clear ();
230340 for (int j = 0 ; j < data .capacity (); j ++) {
231- data .put ((byte ) (i % 128 ));
341+ data .put ((byte ) (i & 0xFF ));
232342 }
233343 data .flip ();
234344 if (!bbis .put (data )) {
@@ -253,7 +363,7 @@ public void run() {
253363 while ((c = bbis .read ()) != -1 ) {
254364 assertNotEquals ("Should not read 'nothing' in blocking mode." , Integer .MIN_VALUE , c );
255365
256- assertEquals ("At position: " + j , (byte ) (i % 128 ), c );
366+ assertEquals ("At position: " + j , (byte ) (i & 0xFF ), ( byte ) c );
257367 if (++j % BUFFER_SIZE == 0 ) {
258368 i ++;
259369 Thread .yield (); // Give the other thread a chance to run.
@@ -295,7 +405,7 @@ public void run() {
295405 }
296406 data .clear ();
297407 for (int j = 0 ; j < data .capacity (); j ++) {
298- data .put ((byte ) (i % 128 ));
408+ data .put ((byte ) (i & 0xFF ));
299409 }
300410 data .flip ();
301411 if (!bbis .put (data )) {
@@ -322,7 +432,7 @@ public void run() {
322432 assertNotEquals ("Should not read 0 bytes in blocking mode." , 0 , c );
323433
324434 for (int p = 0 ; p < c ; p ++) {
325- assertEquals ("At position: " + j , (byte ) (i % 128 ), buffer [p ]);
435+ assertEquals ("At position: " + j , (byte ) (i & 0xFF ), buffer [p ]);
326436 if (++j % BUFFER_SIZE == 0 ) {
327437 i ++;
328438 Thread .yield (); // Give the other thread a chance to run.
0 commit comments