@@ -700,13 +700,13 @@ protected int read() throws IOException {
700
700
int bufferIndex = getBufferIndex (pos );
701
701
Buffer buffer = useBufferSync (bufferIndex );
702
702
buffer .lastRead = System .currentTimeMillis ();
703
- buffer .inUse --;
704
703
byte b = buffer .buffer [getBufferOffset (pos )];
705
704
pos ++;
706
705
if (pos < size && (bufferIndex != 0 || startReadSecondBufferWhenFirstBufferFullyRead )) {
707
706
int nextIndex = getBufferIndex (pos );
708
707
if (nextIndex != bufferIndex ) loadBuffer (nextIndex );
709
708
}
709
+ buffer .inUse --;
710
710
return b & 0xFF ;
711
711
}
712
712
@@ -751,11 +751,14 @@ protected int readAsync() throws IOException {
751
751
int bufferIndex = getBufferIndex (pos );
752
752
Buffer buffer = useBufferAsync (bufferIndex );
753
753
SynchronizationPoint <NoException > sp = buffer .loading ;
754
- if (sp != null && !sp .isUnblocked ()) return -2 ;
754
+ if (sp != null && !sp .isUnblocked ()) {
755
+ buffer .lastRead = System .currentTimeMillis ();
756
+ buffer .inUse --;
757
+ return -2 ;
758
+ }
755
759
if (buffer .error != null ) throw buffer .error ;
756
- buffer .lastRead = System .currentTimeMillis ();
757
- buffer .inUse --;
758
760
byte b = buffer .buffer [getBufferOffset (pos )];
761
+ buffer .inUse --;
759
762
pos ++;
760
763
if (pos < size && (bufferIndex != 0 || startReadSecondBufferWhenFirstBufferFullyRead )) {
761
764
int nextIndex = getBufferIndex (pos );
@@ -786,6 +789,7 @@ public Integer run() throws IOException {
786
789
throw buffer .error ;
787
790
int start = getBufferOffset (pos );
788
791
if (start >= buffer .len ) {
792
+ buffer .inUse --;
789
793
if (ondone != null ) ondone .run (new Pair <>(Integer .valueOf (0 ), null ));
790
794
return Integer .valueOf (0 );
791
795
}
@@ -1034,14 +1038,15 @@ public void run() {
1034
1038
buffers .add (new Buffer (this ));
1035
1039
while (firstIndex <= lastIndex ) {
1036
1040
Buffer b ;
1037
- if (firstIndex > = buffers .size ()) {
1041
+ if (firstIndex = = buffers .size ()) {
1038
1042
b = new Buffer (this );
1039
- newBuffers .add (b );
1040
1043
buffers .add (b );
1041
1044
} else
1042
1045
b = buffers .get (firstIndex );
1043
- if (b .buffer == null )
1046
+ if (b .buffer == null ) {
1044
1047
b .buffer = new byte [firstIndex > 0 ? bufferSize : firstBufferSize ];
1048
+ newBuffers .add (b );
1049
+ }
1045
1050
b .len = firstIndex < lastIndex ? b .buffer .length : (int )(newSize - getBufferStart (firstIndex ));
1046
1051
b .lastRead = System .currentTimeMillis ();
1047
1052
firstIndex ++;
@@ -1151,22 +1156,26 @@ protected AsyncWork<Integer, IOException> writeAsync(
1151
1156
List <Buffer > buffers = this .buffers ;
1152
1157
if (closing || buffers == null )
1153
1158
return new AsyncWork <>(null , null , new CancelException ("IO closed" ));
1159
+ boolean isNew = false ;
1154
1160
synchronized (buffers ) {
1155
1161
if (bufferIndex == buffers .size ()) {
1156
1162
buffer = new Buffer (this );
1157
1163
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1158
1164
buffer .len = 0 ;
1159
1165
buffer .inUse = 1 ;
1160
1166
buffers .add (buffer );
1167
+ isNew = true ;
1161
1168
} else {
1162
1169
buffer = buffers .get (bufferIndex );
1163
1170
if (buffer .buffer == null ) {
1164
1171
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1165
1172
buffer .len = 0 ;
1173
+ isNew = true ;
1166
1174
}
1167
1175
buffer .inUse ++;
1168
1176
}
1169
1177
}
1178
+ if (isNew ) manager .newBuffer (buffer );
1170
1179
}
1171
1180
SynchronizationPoint <NoException > sp = buffer .loading ;
1172
1181
AsyncWork <Integer ,IOException > done = new AsyncWork <>();
@@ -1230,22 +1239,26 @@ protected int writeSync(long pos, ByteBuffer buf, int alreadyDone) throws IOExce
1230
1239
if (pos < size || (size > 0 && getBufferIndex (size - 1 ) == bufferIndex ))
1231
1240
buffer = useBufferAsync (bufferIndex );
1232
1241
else {
1242
+ boolean isNew = false ;
1233
1243
synchronized (buffers ) {
1234
1244
if (bufferIndex == buffers .size ()) {
1235
1245
buffer = new Buffer (this );
1236
1246
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1237
1247
buffer .len = 0 ;
1238
1248
buffer .inUse = 1 ;
1239
1249
buffers .add (buffer );
1250
+ isNew = true ;
1240
1251
} else {
1241
1252
buffer = buffers .get (bufferIndex );
1242
1253
if (buffer .buffer == null ) {
1243
1254
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1244
1255
buffer .len = 0 ;
1256
+ isNew = true ;
1245
1257
}
1246
1258
buffer .inUse ++;
1247
1259
}
1248
1260
}
1261
+ if (isNew ) manager .newBuffer (buffer );
1249
1262
}
1250
1263
SynchronizationPoint <NoException > sp = buffer .loading ;
1251
1264
if (sp != null )
@@ -1278,22 +1291,26 @@ protected void write(byte[] buf, int offset, int length) throws IOException {
1278
1291
if (pos < size || (size > 0 && getBufferIndex (size - 1 ) == bufferIndex ))
1279
1292
buffer = useBufferAsync (bufferIndex );
1280
1293
else {
1294
+ boolean isNew = false ;
1281
1295
synchronized (buffers ) {
1282
1296
if (bufferIndex == buffers .size ()) {
1283
1297
buffer = new Buffer (this );
1284
1298
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1285
1299
buffer .len = 0 ;
1286
1300
buffer .inUse = 1 ;
1287
1301
buffers .add (buffer );
1302
+ isNew = true ;
1288
1303
} else {
1289
1304
buffer = buffers .get (bufferIndex );
1290
1305
if (buffer .buffer == null ) {
1291
1306
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1292
1307
buffer .len = 0 ;
1308
+ isNew = true ;
1293
1309
}
1294
1310
buffer .inUse ++;
1295
1311
}
1296
1312
}
1313
+ if (isNew ) manager .newBuffer (buffer );
1297
1314
}
1298
1315
SynchronizationPoint <NoException > sp = buffer .loading ;
1299
1316
if (sp != null )
@@ -1325,22 +1342,26 @@ protected void write(byte b) throws IOException {
1325
1342
if (pos < size || (size > 0 && getBufferIndex (size - 1 ) == bufferIndex ))
1326
1343
buffer = useBufferAsync (bufferIndex );
1327
1344
else {
1345
+ boolean isNew = false ;
1328
1346
synchronized (buffers ) {
1329
1347
if (bufferIndex == buffers .size ()) {
1330
1348
buffer = new Buffer (this );
1331
1349
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1332
1350
buffer .len = 0 ;
1333
1351
buffer .inUse = 1 ;
1334
1352
buffers .add (buffer );
1353
+ isNew = true ;
1335
1354
} else {
1336
1355
buffer = buffers .get (bufferIndex );
1337
1356
if (buffer .buffer == null ) {
1338
1357
buffer .buffer = new byte [bufferIndex == 0 ? firstBufferSize : bufferSize ];
1339
1358
buffer .len = 0 ;
1359
+ isNew = true ;
1340
1360
}
1341
1361
buffer .inUse ++;
1342
1362
}
1343
1363
}
1364
+ if (isNew ) manager .newBuffer (buffer );
1344
1365
}
1345
1366
SynchronizationPoint <NoException > sp = buffer .loading ;
1346
1367
if (sp != null )
0 commit comments