Skip to content

Commit 180062d

Browse files
committed
split IOUtil.skipAsync into byUsingSync and byReading
1 parent 976892d commit 180062d

File tree

4 files changed

+54
-8
lines changed

4 files changed

+54
-8
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/io/IOUtil.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import net.lecousin.framework.io.text.BufferedReadableCharacterStream;
2525
import net.lecousin.framework.mutable.Mutable;
2626
import net.lecousin.framework.mutable.MutableInteger;
27+
import net.lecousin.framework.mutable.MutableLong;
2728
import net.lecousin.framework.progress.WorkProgress;
2829
import net.lecousin.framework.util.Pair;
2930
import net.lecousin.framework.util.RunnableWithParameter;
@@ -365,7 +366,8 @@ public Integer run() throws IOException {
365366
/**
366367
* Implement a synchronous skip using a synchronous read.
367368
*/
368-
public static long skipSync(IO.Readable io, long n) throws IOException {
369+
public static long skipSyncByReading(IO.Readable io, long n) throws IOException {
370+
if (n <= 0) return 0;
369371
int l = n > 65536 ? 65536 : (int)n;
370372
ByteBuffer b = ByteBuffer.allocate(l);
371373
long total = 0;
@@ -384,16 +386,57 @@ public static long skipSync(IO.Readable io, long n) throws IOException {
384386
* Implement an asynchronous skip using a task calling a synchronous skip.
385387
* This must be used only if the synchronous skip is using only CPU.
386388
*/
387-
public static Task<Long,IOException> skipAsync(IO.Readable io, long n, RunnableWithParameter<Pair<Long,IOException>> ondone) {
389+
public static AsyncWork<Long,IOException> skipAsyncUsingSync(IO.Readable io, long n, RunnableWithParameter<Pair<Long,IOException>> ondone) {
390+
// TODO this should be skipAsyncUsingSync, then we should implement a real skipAsync
388391
Task<Long,IOException> task = new Task.Cpu<Long,IOException>("Skipping bytes", io.getPriority(), ondone) {
389392
@Override
390393
public Long run() throws IOException {
391-
long total = skipSync(io, n);
394+
long total = skipSyncByReading(io, n);
392395
return Long.valueOf(total);
393396
}
394397
};
395398
task.start();
396-
return task;
399+
return task.getSynch();
400+
}
401+
402+
public static AsyncWork<Long, IOException> skipAsyncByReading(IO.Readable io, long n, RunnableWithParameter<Pair<Long,IOException>> ondone) {
403+
if (n <= 0) {
404+
if (ondone != null) ondone.run(new Pair<>(Long.valueOf(0), null));
405+
return new AsyncWork<>(Long.valueOf(0), null);
406+
}
407+
ByteBuffer b = ByteBuffer.allocate(n > 65536 ? 65536 : (int)n);
408+
MutableLong done = new MutableLong(0);
409+
AsyncWork<Long, IOException> result = new AsyncWork<>();
410+
io.readAsync(b).listenInline(new AsyncWorkListener<Integer, IOException>() {
411+
@Override
412+
public void ready(Integer nb) {
413+
int read = nb.intValue();
414+
if (read <= 0) {
415+
if (ondone != null) ondone.run(new Pair<>(Long.valueOf(done.get()), null));
416+
result.unblockSuccess(Long.valueOf(done.get()));
417+
return;
418+
}
419+
done.add(nb.intValue());
420+
if (done.get() == n) {
421+
if (ondone != null) ondone.run(new Pair<>(Long.valueOf(n), null));
422+
result.unblockSuccess(Long.valueOf(n));
423+
return;
424+
}
425+
b.clear();
426+
if (n - done.get() < b.remaining())
427+
b.limit((int)(n - done.get()));
428+
io.readAsync(b).listenInline(this);
429+
}
430+
@Override
431+
public void error(IOException error) {
432+
result.error(error);
433+
}
434+
@Override
435+
public void cancelled(CancelException event) {
436+
result.cancel(event);
437+
}
438+
});
439+
return result;
397440
}
398441

399442
/**

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/SimpleBufferedReadable.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,14 +170,17 @@ public AsyncWork<ByteBuffer, IOException> readNextBufferAsync(RunnableWithParame
170170
}
171171
Task.Cpu<ByteBuffer, IOException> task = new Task.Cpu<ByteBuffer, IOException>("Read next buffer", getPriority(), ondone) {
172172
@Override
173-
public ByteBuffer run() throws IOException {
173+
public ByteBuffer run() throws IOException, CancelException {
174174
if (pos == len) {
175175
if (buffer == null) return null;
176176
fill();
177177
if (pos == len) return null;
178178
}
179179
ByteBuffer buf = ByteBuffer.allocate(len - pos);
180-
buf.put(buffer, pos, len - pos);
180+
try { buf.put(buffer, pos, len - pos); }
181+
catch (NullPointerException e) {
182+
throw new CancelException("IO closed");
183+
}
181184
pos = len;
182185
buf.flip();
183186
return buf;

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/SingleBufferReadable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public long skipSync(long n) throws IOException {
125125

126126
@Override
127127
public AsyncWork<Long, IOException> skipAsync(long n, RunnableWithParameter<Pair<Long,IOException>> ondone) {
128-
return IOUtil.skipAsync(this, n, ondone).getSynch();
128+
return IOUtil.skipAsyncByReading(this, n, ondone);
129129
}
130130

131131
@Override

net.lecousin.core/src/main/java/net/lecousin/framework/io/out2in/OutputToInputBuffers.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ public long skipSync(long n) throws IOException {
206206

207207
@Override
208208
public AsyncWork<Long, IOException> skipAsync(long n, RunnableWithParameter<Pair<Long,IOException>> ondone) {
209-
return IOUtil.skipAsync(this, n, ondone).getSynch();
209+
return IOUtil.skipAsyncUsingSync(this, n, ondone);
210210
}
211211

212212
@SuppressFBWarnings("IS2_INCONSISTENT_SYNC")

0 commit comments

Comments
 (0)