Skip to content

Commit 71cf943

Browse files
committed
fix LimitWriteOperations
1 parent 894b792 commit 71cf943

File tree

3 files changed

+51
-33
lines changed

3 files changed

+51
-33
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/io/util/LimitWriteOperations.java

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
import java.io.IOException;
44
import java.nio.ByteBuffer;
55

6+
import net.lecousin.framework.application.LCCore;
67
import net.lecousin.framework.collections.TurnArray;
8+
import net.lecousin.framework.concurrent.CancelException;
79
import net.lecousin.framework.concurrent.synch.AsyncWork;
810
import net.lecousin.framework.concurrent.synch.ISynchronizationPoint;
911
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
1012
import net.lecousin.framework.exception.NoException;
1113
import net.lecousin.framework.io.IO;
1214
import net.lecousin.framework.util.Pair;
13-
import net.lecousin.framework.util.RunnableWithParameter;
1415

1516
/**
1617
* This class allows to queue write operations, but blocks if too many are waiting.
@@ -42,55 +43,68 @@ public AsyncWork<Integer,IOException> write(ByteBuffer buffer) throws IOExceptio
4243
synchronized (waiting) {
4344
if (lastWrite.isCancelled()) return lastWrite;
4445
if (waiting.isEmpty() && lastWrite.isUnblocked()) {
45-
lastWrite = io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
46-
@Override
47-
public void run(Pair<Integer, IOException> param) {
48-
writeDone(buffer);
49-
}
50-
});
51-
lastWrite.onCancel((cancel) -> {
52-
writeDone(buffer);
53-
});
46+
LCCore.getApplication().getConsole().out("write direct");
47+
lastWrite = io.writeAsync(buffer);
48+
lastWrite.listenInline(
49+
(nb) -> { writeDone(buffer, null); },
50+
(error) -> { writeDone(buffer, null); },
51+
(cancel) -> { writeDone(buffer, cancel); }
52+
);
5453
return lastWrite;
5554
}
5655
if (!waiting.isFull()) {
56+
LCCore.getApplication().getConsole().out("push write");
5757
AsyncWork<Integer,IOException> res = new AsyncWork<>();
5858
waiting.addLast(new Pair<>(buffer, res));
59+
lastWrite = res;
5960
return res;
6061
}
62+
LCCore.getApplication().getConsole().out("waiting has " + waiting.size());
6163
if (lock != null)
6264
throw new IOException("Concurrent write");
6365
lock = new SynchronizationPoint<>();
6466
lk = lock;
6567
}
68+
LCCore.getApplication().getConsole().out("block");
6669
lk.block(0);
70+
LCCore.getApplication().getConsole().out("unblocked");
6771
} while (true);
6872
}
6973

70-
protected void writeDone(@SuppressWarnings("unused") ByteBuffer buffer) {
74+
protected void writeDone(@SuppressWarnings("unused") ByteBuffer buffer, CancelException cancelled) {
75+
LCCore.getApplication().getConsole().out("writeDone");
7176
SynchronizationPoint<NoException> sp = null;
7277
synchronized (waiting) {
7378
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollFirst();
79+
LCCore.getApplication().getConsole().out("b = " + b);
7480
if (b != null) {
7581
if (lock != null) {
7682
sp = lock;
7783
lock = null;
7884
}
79-
if (lastWrite.isCancelled()) {
85+
if (cancelled != null) {
8086
while (b != null) {
81-
b.getValue2().cancel(lastWrite.getCancelEvent());
87+
b.getValue2().cancel(cancelled);
8288
b = waiting.pollFirst();
8389
}
8490
} else {
8591
ByteBuffer buf = b.getValue1();
86-
lastWrite = io.writeAsync(buf, new RunnableWithParameter<Pair<Integer,IOException>>() {
87-
@Override
88-
public void run(Pair<Integer, IOException> param) {
89-
writeDone(buf);
92+
AsyncWork<Integer, IOException> write = io.writeAsync(buf);
93+
Pair<ByteBuffer,AsyncWork<Integer,IOException>> bb = b;
94+
write.listenInline(
95+
(nb) -> {
96+
bb.getValue2().unblockSuccess(nb);
97+
writeDone(buf, null);
98+
},
99+
(error) -> {
100+
bb.getValue2().error(error);
101+
writeDone(buf, null);
102+
},
103+
(cancel) -> {
104+
bb.getValue2().cancel(cancel);
105+
writeDone(buf, cancel);
90106
}
91-
});
92-
lastWrite.onCancel((cancel) -> { writeDone(buf); });
93-
lastWrite.listenInline(b.getValue2());
107+
);
94108
}
95109
}
96110
}
@@ -100,10 +114,7 @@ public void run(Pair<Integer, IOException> param) {
100114

101115
/** Return the last pending operation, or null. */
102116
public AsyncWork<Integer, IOException> getLastPendingOperation() {
103-
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.peekLast();
104-
if (b == null)
105-
return lastWrite.isUnblocked() ? null : lastWrite;
106-
return b.getValue2();
117+
return lastWrite.isUnblocked() ? null : lastWrite;
107118
}
108119

109120
/** Same as getLastPendingOperation but never return null (return an unblocked synchronization point instead). */

net.lecousin.core/src/main/java/net/lecousin/framework/io/util/LimitWriteOperationsReuseBuffers.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.nio.ByteBuffer;
44

5+
import net.lecousin.framework.concurrent.CancelException;
56
import net.lecousin.framework.io.IO;
67

78
/**
@@ -35,9 +36,9 @@ public void freeBuffer(ByteBuffer buffer) {
3536
}
3637

3738
@Override
38-
protected void writeDone(ByteBuffer buffer) {
39+
protected void writeDone(ByteBuffer buffer, CancelException cancelled) {
3940
buffers.freeBuffer(buffer);
40-
super.writeDone(buffer);
41+
super.writeDone(buffer, cancelled);
4142
}
4243

4344
}

net.lecousin.core/src/test/java/net/lecousin/framework/core/tests/io/util/TestLimitWriteOperations.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,24 @@ public void test() throws Exception {
1919
tmp.deleteOnExit();
2020
FileIO.WriteOnly io = new FileIO.WriteOnly(tmp, Task.PRIORITY_NORMAL);
2121
LimitWriteOperations writeOps = new LimitWriteOperations(io, 3);
22-
byte[] data = new byte[100];
23-
for (int i = 0; i < data.length; ++i)
24-
data[i] = (byte)i;
25-
for (int i = 0; i < 500; ++i)
22+
for (int i = 0; i < 500; ++i) {
23+
byte[] data = new byte[100];
24+
for (int j = 0; j < data.length; ++j)
25+
data[j] = (byte)j;
26+
data[i % 100] = (byte)((i % 69) + 5);
2627
writeOps.write(ByteBuffer.wrap(data));
28+
}
2729
writeOps.flush().blockThrow(0);
2830
io.close();
2931
FileIO.ReadOnly in = new FileIO.ReadOnly(tmp, Task.PRIORITY_NORMAL);
30-
byte[] buf = new byte[data.length];
32+
byte[] buf = new byte[100];
3133
for (int i = 0; i < 500; ++i) {
32-
Assert.assertEquals(data.length, in.readFullySync(ByteBuffer.wrap(buf)));
33-
Assert.assertArrayEquals(data, buf);
34+
Assert.assertEquals("buffer " + i, 100, in.readFullySync(ByteBuffer.wrap(buf)));
35+
byte[] data = new byte[100];
36+
for (int j = 0; j < data.length; ++j)
37+
data[j] = (byte)j;
38+
data[i % 100] = (byte)((i % 69) + 5);
39+
Assert.assertArrayEquals("buffer " + i, data, buf);
3440
}
3541
Assert.assertTrue(in.readFullySync(ByteBuffer.wrap(buf)) <= 0);
3642
in.close();

0 commit comments

Comments
 (0)