Skip to content

Commit da8923e

Browse files
committed
fix LimitWriteOperations
1 parent 905ecb7 commit da8923e

File tree

3 files changed

+26
-8
lines changed

3 files changed

+26
-8
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/SimpleBufferedReadable.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ private void fill() throws IOException, CancelException {
125125
}
126126
int nb = currentRead.getResult().intValue();
127127
if (nb <= 0) {
128+
state.pos = state.len = 0;
128129
state.buffer = null;
129130
bb = null;
130131
readBuffer = null;

net.lecousin.core/src/main/java/net/lecousin/framework/io/util/LimitWriteOperations.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,18 @@ public LimitWriteOperations(IO.Writable io, int maxOperations) {
3939
public AsyncWork<Integer,IOException> write(ByteBuffer buffer) throws IOException {
4040
do {
4141
synchronized (waiting) {
42+
if (lastWrite.isCancelled()) return lastWrite;
4243
if (waiting.isEmpty() && lastWrite.isUnblocked()) {
43-
return lastWrite = io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
44+
lastWrite = io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
4445
@Override
4546
public void run(Pair<Integer, IOException> param) {
4647
writeDone();
4748
}
4849
});
50+
lastWrite.onCancel((cancel) -> {
51+
writeDone();
52+
});
53+
return lastWrite;
4954
}
5055
if (!waiting.isFull()) {
5156
AsyncWork<Integer,IOException> res = new AsyncWork<>();
@@ -65,16 +70,25 @@ private void writeDone() {
6570
synchronized (waiting) {
6671
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollFirst();
6772
if (b != null) {
68-
(lastWrite = io.writeAsync(b.getValue1(), new RunnableWithParameter<Pair<Integer,IOException>>() {
69-
@Override
70-
public void run(Pair<Integer, IOException> param) {
71-
writeDone();
72-
}
73-
})).listenInline(b.getValue2());
7473
if (lock != null) {
7574
sp = lock;
7675
lock = null;
7776
}
77+
if (lastWrite.isCancelled()) {
78+
while (b != null) {
79+
b.getValue2().cancel(lastWrite.getCancelEvent());
80+
b = waiting.pollFirst();
81+
}
82+
} else {
83+
lastWrite = io.writeAsync(b.getValue1(), new RunnableWithParameter<Pair<Integer,IOException>>() {
84+
@Override
85+
public void run(Pair<Integer, IOException> param) {
86+
writeDone();
87+
}
88+
});
89+
lastWrite.onCancel((cancel) -> { writeDone(); });
90+
lastWrite.listenInline(b.getValue2());
91+
}
7892
}
7993
}
8094
if (sp != null)

net.lecousin.core/src/test/java/net/lecousin/framework/core/test/io/TestReadableSeekable.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,11 @@ public void testSeekableByteByByteSync() throws Exception {
6161
for (int j = 0; j < testBuf.length; ++j) {
6262
if (faster && (j%((i%5)+1)) == 1) continue;
6363
buffer.clear();
64-
if (io.readSync(offset.intValue()*testBuf.length+j, buffer) != 1)
64+
int nb = io.readSync(offset.intValue()*testBuf.length+j, buffer);
65+
if (nb <= 0)
6566
throw new Exception("Unexpected end of stream at " + (i*testBuf.length+j));
67+
if (nb > 1)
68+
throw new Exception("Unexpected number of bytes read at " + (i*testBuf.length+j) + ": " + nb + " bytes returned, only one requested");
6669
if (b[0] != testBuf[j])
6770
throw new Exception("Invalid byte "+(b[0]&0xFF)+" at "+(i*testBuf.length+j));
6871
}

0 commit comments

Comments
 (0)