Skip to content

Commit 8526b90

Browse files
committed
fix some IOs using ByteBuffer.array() without ByteBuffer.arrayOffset()
1 parent d574294 commit 8526b90

File tree

6 files changed

+46
-19
lines changed

6 files changed

+46
-19
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/io/IOFromInputStream.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ protected void closeResources(SynchronizationPoint<Exception> ondone) {
8888

8989
@Override
9090
public int readSync(ByteBuffer buffer) throws IOException {
91-
int nb = stream.read(buffer.array(), buffer.arrayOffset(), buffer.remaining());
91+
int nb = stream.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
9292
if (nb >= 0)
9393
buffer.position(buffer.position() + nb);
9494
return nb;
@@ -98,7 +98,7 @@ public int readSync(ByteBuffer buffer) throws IOException {
9898
public int readFullySync(ByteBuffer buffer) throws IOException {
9999
int total = 0;
100100
do {
101-
int nb = stream.read(buffer.array(), buffer.arrayOffset() + total, buffer.remaining() - total);
101+
int nb = stream.read(buffer.array(), buffer.arrayOffset() + buffer.position() + total, buffer.remaining() - total);
102102
if (nb <= 0) break;
103103
total += nb;
104104
} while (total < buffer.remaining());
@@ -127,7 +127,7 @@ public AsyncWork<Integer,IOException> readAsync(ByteBuffer buffer, RunnableWithP
127127
@Override
128128
public Integer run() throws IOException, CancelException {
129129
try {
130-
int nb = stream.read(buffer.array(), buffer.arrayOffset(), buffer.remaining());
130+
int nb = stream.read(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
131131
if (nb >= 0)
132132
buffer.position(buffer.position() + nb);
133133
return Integer.valueOf(nb);
@@ -149,7 +149,7 @@ public Integer run() throws IOException, CancelException {
149149
int total = 0;
150150
do {
151151
try {
152-
int nb = stream.read(buffer.array(), buffer.arrayOffset() + total, buffer.remaining() - total);
152+
int nb = stream.read(buffer.array(), buffer.arrayOffset() + buffer.position() + total, buffer.remaining() - total);
153153
if (nb <= 0) break;
154154
total += nb;
155155
} catch (IOException e) {

net.lecousin.core/src/main/java/net/lecousin/framework/io/IOFromOutputStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public ISynchronizationPoint<IOException> canStartWriting() {
6868
public int writeSync(ByteBuffer buffer) throws IOException {
6969
int nb = buffer.remaining();
7070
if (buffer.hasArray()) {
71-
stream.write(buffer.array(), buffer.position(), buffer.remaining());
71+
stream.write(buffer.array(), buffer.arrayOffset() + buffer.position(), buffer.remaining());
7272
buffer.position(buffer.position() + nb);
7373
} else {
7474
byte[] buf = new byte[buffer.remaining()];

net.lecousin.core/src/main/java/net/lecousin/framework/io/encoding/Base64.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ public Void run() {
378378
int l = buffer.remaining() / 3;
379379
ISynchronizationPoint<IOException> write;
380380
if (l > 0) {
381-
byte[] out = encodeBase64(buffer.array(), buffer.position(), l * 3);
381+
byte[] out = encodeBase64(buffer.array(), buffer.arrayOffset() + buffer.position(), l * 3);
382382
write = writer.writeAsync(ByteBuffer.wrap(out));
383383
buffer.position(buffer.position() + l * 3);
384384
} else
@@ -428,7 +428,7 @@ public Void run() {
428428
int l = buffer.remaining() / 3;
429429
ISynchronizationPoint<IOException> write;
430430
if (l > 0) {
431-
char[] out = encodeBase64ToChars(buffer.array(), buffer.position(), l * 3);
431+
char[] out = encodeBase64ToChars(buffer.array(), buffer.arrayOffset() + buffer.position(), l * 3);
432432
write = writer.writeAsync(out);
433433
buffer.position(buffer.position() + l * 3);
434434
} else

net.lecousin.core/src/test/java/net/lecousin/framework/core/test/io/TestOutputToInput.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ public void testWriteSyncReadHalfSync() throws Exception {
4444
int nbRead = 0;
4545
byte[] b = new byte[testBuf.length];
4646
while (nbWrite < nbBuf) {
47-
o2i.writeSync(ByteBuffer.wrap(testBuf));
47+
ByteBuffer bu = ByteBuffer.wrap(testBuf);
48+
o2i.writeSync(bu);
49+
Assert.assertEquals(0, bu.remaining());
4850
nbWrite++;
4951
readHalfSync(o2i, (nbRead % 2) == 0, b);
5052
nbRead++;
@@ -86,19 +88,22 @@ public Void run() {
8688
spWrite.unblock();
8789
return null;
8890
}
89-
Mutable<AsyncWork<Integer, IOException>> write = new Mutable<>(o2i.writeAsync(ByteBuffer.wrap(testBuf)));
91+
Mutable<ByteBuffer> buf = new Mutable<>(ByteBuffer.wrap(testBuf));
92+
Mutable<AsyncWork<Integer, IOException>> write = new Mutable<>(o2i.writeAsync(buf.get()));
9093
MutableInteger nbWrite = new MutableInteger(1);
9194
write.get().listenInline(new Runnable() {
9295
@Override
9396
public void run() {
9497
do {
9598
if (write.get().hasError()) { spWrite.error(write.get().getError()); return; }
99+
if (buf.get().remaining() > 0) { spWrite.error(new IOException("writeAsync did not consume the buffer")); return; }
96100
if (nbWrite.get() == nbBuf) {
97101
o2i.endOfData();
98102
spWrite.unblock();
99103
return;
100104
}
101-
write.set(o2i.writeAsync(ByteBuffer.wrap(testBuf)));
105+
buf.set(ByteBuffer.wrap(testBuf));
106+
write.set(o2i.writeAsync(buf.get()));
102107
nbWrite.inc();
103108
} while (write.get().isUnblocked());
104109
write.get().listenInline(this);

net.lecousin.core/src/test/java/net/lecousin/framework/core/test/io/TestReadWrite.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,6 @@
55
import java.util.LinkedList;
66
import java.util.List;
77

8-
import org.junit.Assume;
9-
import org.junit.Test;
10-
118
import net.lecousin.framework.collections.ArrayUtil;
129
import net.lecousin.framework.concurrent.Task;
1310
import net.lecousin.framework.concurrent.synch.AsyncWork;
@@ -23,6 +20,10 @@
2320
import net.lecousin.framework.util.Pair;
2421
import net.lecousin.framework.util.RunnableWithParameter;
2522

23+
import org.junit.Assert;
24+
import org.junit.Assume;
25+
import org.junit.Test;
26+
2627
public abstract class TestReadWrite extends TestIO.UsingTestData {
2728

2829
protected TestReadWrite(byte[] testBuf, int nbBuf) {
@@ -46,6 +47,8 @@ public <T extends IO.Readable.Seekable & IO.Writable.Seekable> void testWriteThe
4647
int nb = io.writeSync(buf);
4748
if (nb != testBuf.length)
4849
throw new Exception("Write only " + nb + " bytes");
50+
if (buf.remaining() > 0)
51+
throw new Exception("Buffer not fully consumed by write operation");
4952
}
5053
io.seekSync(SeekType.FROM_BEGINNING, 0);
5154
byte[] b = new byte[testBuf.length];
@@ -123,7 +126,8 @@ public void run() {
123126
}
124127
MutableBoolean ondoneCalled = new MutableBoolean(false);
125128
int bufIndex = index;
126-
AsyncWork<Integer, IOException> write = io.writeAsync(index * testBuf.length, ByteBuffer.wrap(testBuf),
129+
ByteBuffer buf = ByteBuffer.wrap(testBuf);
130+
AsyncWork<Integer, IOException> write = io.writeAsync(index * testBuf.length, buf,
127131
new RunnableWithParameter<Pair<Integer,IOException>>() {
128132
@Override
129133
public void run(Pair<Integer, IOException> param) {
@@ -135,6 +139,10 @@ public void run(Pair<Integer, IOException> param) {
135139
done.error(new IOException("ondone not called by writeAsync"));
136140
return;
137141
}
142+
if (buf.remaining() > 0) {
143+
done.error(new IOException("Buffer not fully consumed by write operation"));
144+
return;
145+
}
138146
synchronized (buffersToRead) {
139147
buffersToRead.add(Integer.valueOf(bufIndex));
140148
}
@@ -238,7 +246,9 @@ public <T extends IO.Readable.Seekable & IO.Writable.Seekable> void testRandomPa
238246
int bufOffset = (int)((range.min + startPos) % testBuf.length);
239247
int len = testBuf.length - bufOffset;
240248
if (range.min + startPos + len - 1 > range.max) len = (int)(range.max - (range.min + startPos) + 1);
241-
io.writeSync(range.min + startPos, ByteBuffer.wrap(testBuf, bufOffset, len));
249+
ByteBuffer buf = ByteBuffer.wrap(testBuf, bufOffset, len);
250+
io.writeSync(range.min + startPos, buf);
251+
Assert.assertEquals(0, buf.remaining());
242252
if (range.max == range.min + startPos + len - 1) {
243253
toWrite.removeRange(range.min + startPos, range.max);
244254
continue;
@@ -300,7 +310,10 @@ void dichotomicWriteSync(T io, int bufStart, int bufEnd, SeekType seekType) thro
300310
case FROM_END: io.seekSync(SeekType.FROM_END, (nbBuf - bufIndex) * testBuf.length - testBuf.length / 2); break;
301311
case FROM_CURRENT: io.seekSync(SeekType.FROM_CURRENT, bufIndex * testBuf.length + testBuf.length / 2 - io.getPosition()); break;
302312
}
303-
io.writeSync(ByteBuffer.wrap(testBuf, testBuf.length / 2, testBuf.length - testBuf.length / 2));
313+
ByteBuffer b = ByteBuffer.wrap(testBuf, testBuf.length / 2, testBuf.length - testBuf.length / 2);
314+
io.writeSync(b);
315+
Assert.assertEquals(0, b.remaining());
316+
b = null;
304317
// write before
305318
if (bufIndex > bufStart)
306319
dichotomicWriteSync(io, bufStart, bufIndex - 1,

net.lecousin.core/src/test/java/net/lecousin/framework/core/test/io/TestWritableToFile.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,9 @@ public void testWriteBufferByBufferSync() throws Exception {
6666
if (io instanceof IO.PositionKnown)
6767
Assert.assertEquals(0, ((IO.PositionKnown)io).getPosition());
6868
for (int i = 0; i < nbBuf; ++i) {
69-
io.writeSync(ByteBuffer.wrap(testBuf));
69+
ByteBuffer b = ByteBuffer.wrap(testBuf);
70+
io.writeSync(b);
71+
Assert.assertEquals(0, b.remaining());
7072
if (io instanceof IO.PositionKnown)
7173
Assert.assertEquals(testBuf.length * (i + 1), ((IO.PositionKnown)io).getPosition());
7274
}
@@ -83,6 +85,7 @@ public void testWriteBufferByBufferAsync() throws Exception {
8385
IO.Writable io = createWritableFromFile(file);
8486
MutableInteger i = new MutableInteger(0);
8587
Mutable<AsyncWork<Integer,IOException>> write = new Mutable<>(null);
88+
Mutable<ByteBuffer> buf = new Mutable<>(null);
8689
SynchronizationPoint<Exception> sp = new SynchronizationPoint<>();
8790
Runnable listener = new Runnable() {
8891
@Override
@@ -96,6 +99,10 @@ public void run() {
9699
sp.error(new Exception("Invalid write: returned " + write.get().getResult().intValue() + " on " + testBuf.length));
97100
return;
98101
}
102+
if (buf.get().remaining() > 0) {
103+
sp.error(new Exception("Write operation did not fully consumed buffer"));
104+
return;
105+
}
99106
if (io instanceof IO.PositionKnown)
100107
try { Assert.assertEquals(testBuf.length * (i.get() + 1), ((IO.PositionKnown)io).getPosition()); }
101108
catch (Throwable e) {
@@ -106,12 +113,14 @@ public void run() {
106113
sp.unblock();
107114
return;
108115
}
109-
write.set(io.writeAsync(ByteBuffer.wrap(testBuf)));
116+
buf.set(ByteBuffer.wrap(testBuf));
117+
write.set(io.writeAsync(buf.get()));
110118
} while (write.get().isUnblocked());
111119
write.get().listenInline(this);
112120
}
113121
};
114-
write.set(io.writeAsync(ByteBuffer.wrap(testBuf)));
122+
buf.set(ByteBuffer.wrap(testBuf));
123+
write.set(io.writeAsync(buf.get()));
115124
write.get().listenInline(listener);
116125

117126
sp.blockThrow(0);

0 commit comments

Comments
 (0)