Skip to content

Commit 29a9c25

Browse files
committed
fix LimitWriteOperations
1 parent 787869b commit 29a9c25

File tree

3 files changed

+329
-239
lines changed

3 files changed

+329
-239
lines changed
Lines changed: 90 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -1,107 +1,90 @@
1-
package net.lecousin.framework.io.util;
2-
3-
import java.io.IOException;
4-
import java.nio.ByteBuffer;
5-
6-
import net.lecousin.framework.collections.TurnArray;
7-
import net.lecousin.framework.concurrent.synch.AsyncWork;
8-
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
9-
import net.lecousin.framework.exception.NoException;
10-
import net.lecousin.framework.io.IO;
11-
import net.lecousin.framework.util.Pair;
12-
import net.lecousin.framework.util.RunnableWithParameter;
13-
14-
/**
15-
* This class allows to queue write operations, but blocks if too many are waiting.
16-
* This can typically used in operations reading from an IO, and writing to another, when the amount of data can be large:
17-
* usually read operations are faster than write operations, and we need to avoid having too much buffers in memory waiting
18-
* to write.
19-
*/
20-
public class LimitWriteOperations {
21-
22-
/** Constructor. */
23-
public LimitWriteOperations(IO.Writable io, int bufferSize, int maxOperations) {
24-
this.io = io;
25-
buffers = new Buffers(bufferSize, maxOperations);
26-
waiting = new TurnArray<>(maxOperations);
27-
}
28-
29-
private IO.Writable io;
30-
private Buffers buffers;
31-
private TurnArray<Pair<ByteBuffer,AsyncWork<Integer,IOException>>> waiting;
32-
private SynchronizationPoint<NoException> lock = null;
33-
34-
/**
35-
* @return a buffer to put data to write.
36-
*/
37-
public ByteBuffer getBuffer() {
38-
return buffers.getBuffer();
39-
}
40-
41-
/** Must be called only if the user has not been used for a write operation,
42-
* else it will be automatically free when write operation is done.
43-
*/
44-
public void freeBuffer(ByteBuffer buffer) {
45-
buffers.freeBuffer(buffer);
46-
}
47-
48-
/**
49-
* Queue the buffer to write. If there is no pending write, the write operation is started.
50-
* If too many write operations are pending, the method is blocking.
51-
* @param buffer the buffer to write.
52-
*/
53-
public AsyncWork<Integer,IOException> write(ByteBuffer buffer) throws IOException {
54-
do {
55-
synchronized (waiting) {
56-
if (waiting.isEmpty()) {
57-
return io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
58-
@Override
59-
public void run(Pair<Integer, IOException> param) {
60-
writeDone(buffer);
61-
}
62-
});
63-
}
64-
if (!waiting.isFull()) {
65-
AsyncWork<Integer,IOException> res = new AsyncWork<>();
66-
waiting.addLast(new Pair<>(buffer, res));
67-
return res;
68-
}
69-
if (lock != null)
70-
throw new IOException("Concurrent write");
71-
lock = new SynchronizationPoint<>();
72-
}
73-
lock.block(0);
74-
} while (true);
75-
}
76-
77-
private void writeDone(ByteBuffer buffer) {
78-
buffers.freeBuffer(buffer);
79-
SynchronizationPoint<NoException> sp = null;
80-
synchronized (waiting) {
81-
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.removeFirst();
82-
if (b != null) {
83-
io.writeAsync(b.getValue1(), new RunnableWithParameter<Pair<Integer,IOException>>() {
84-
@Override
85-
public void run(Pair<Integer, IOException> param) {
86-
writeDone(b.getValue1());
87-
}
88-
}).listenInline(b.getValue2());
89-
if (lock != null) {
90-
sp = lock;
91-
lock = null;
92-
}
93-
}
94-
}
95-
if (sp != null)
96-
sp.unblock();
97-
}
98-
99-
/** Return the last pending operation, or null. */
100-
public AsyncWork<Integer, IOException> getLastPendingOperation() {
101-
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollLast();
102-
if (b == null)
103-
return null;
104-
return b.getValue2();
105-
}
106-
107-
}
1+
package net.lecousin.framework.io.util;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
6+
import net.lecousin.framework.collections.TurnArray;
7+
import net.lecousin.framework.concurrent.synch.AsyncWork;
8+
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
9+
import net.lecousin.framework.exception.NoException;
10+
import net.lecousin.framework.io.IO;
11+
import net.lecousin.framework.util.Pair;
12+
import net.lecousin.framework.util.RunnableWithParameter;
13+
14+
/**
15+
* This class allows to queue write operations, but blocks if too many are waiting.
16+
* This can typically used in operations reading from an IO, and writing to another, when the amount of data can be large:
17+
* usually read operations are faster than write operations, and we need to avoid having too much buffers in memory waiting
18+
* to write.
19+
*/
20+
public class LimitWriteOperations {
21+
22+
/** Constructor. */
23+
public LimitWriteOperations(IO.Writable io, int maxOperations) {
24+
this.io = io;
25+
waiting = new TurnArray<>(maxOperations);
26+
}
27+
28+
private IO.Writable io;
29+
private TurnArray<Pair<ByteBuffer,AsyncWork<Integer,IOException>>> waiting;
30+
private SynchronizationPoint<NoException> lock = null;
31+
32+
/**
33+
* Queue the buffer to write. If there is no pending write, the write operation is started.
34+
* If too many write operations are pending, the method is blocking.
35+
* @param buffer the buffer to write.
36+
*/
37+
public AsyncWork<Integer,IOException> write(ByteBuffer buffer) throws IOException {
38+
do {
39+
synchronized (waiting) {
40+
if (waiting.isEmpty()) {
41+
return io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
42+
@Override
43+
public void run(Pair<Integer, IOException> param) {
44+
writeDone();
45+
}
46+
});
47+
}
48+
if (!waiting.isFull()) {
49+
AsyncWork<Integer,IOException> res = new AsyncWork<>();
50+
waiting.addLast(new Pair<>(buffer, res));
51+
return res;
52+
}
53+
if (lock != null)
54+
throw new IOException("Concurrent write");
55+
lock = new SynchronizationPoint<>();
56+
}
57+
lock.block(0);
58+
} while (true);
59+
}
60+
61+
private void writeDone() {
62+
SynchronizationPoint<NoException> sp = null;
63+
synchronized (waiting) {
64+
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollFirst();
65+
if (b != null) {
66+
io.writeAsync(b.getValue1(), new RunnableWithParameter<Pair<Integer,IOException>>() {
67+
@Override
68+
public void run(Pair<Integer, IOException> param) {
69+
writeDone();
70+
}
71+
}).listenInline(b.getValue2());
72+
if (lock != null) {
73+
sp = lock;
74+
lock = null;
75+
}
76+
}
77+
}
78+
if (sp != null)
79+
sp.unblock();
80+
}
81+
82+
/** Return the last pending operation, or null. */
83+
public AsyncWork<Integer, IOException> getLastPendingOperation() {
84+
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollLast();
85+
if (b == null)
86+
return null;
87+
return b.getValue2();
88+
}
89+
90+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
package net.lecousin.framework.io.util;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
6+
import net.lecousin.framework.collections.TurnArray;
7+
import net.lecousin.framework.concurrent.synch.AsyncWork;
8+
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
9+
import net.lecousin.framework.exception.NoException;
10+
import net.lecousin.framework.io.IO;
11+
import net.lecousin.framework.util.Pair;
12+
import net.lecousin.framework.util.RunnableWithParameter;
13+
14+
/**
15+
* This class allows to queue write operations, but blocks if too many are waiting.
16+
* This can typically used in operations reading from an IO, and writing to another, when the amount of data can be large:
17+
* usually read operations are faster than write operations, and we need to avoid having too much buffers in memory waiting
18+
* to write.
19+
*/
20+
public class LimitWriteOperationsReuseBuffers {
21+
22+
/** Constructor. */
23+
public LimitWriteOperationsReuseBuffers(IO.Writable io, int bufferSize, int maxOperations) {
24+
this.io = io;
25+
buffers = new Buffers(bufferSize, maxOperations);
26+
waiting = new TurnArray<>(maxOperations);
27+
}
28+
29+
private IO.Writable io;
30+
private Buffers buffers;
31+
private TurnArray<Pair<ByteBuffer,AsyncWork<Integer,IOException>>> waiting;
32+
private SynchronizationPoint<NoException> lock = null;
33+
34+
/**
35+
* @return a buffer to put data to write.
36+
*/
37+
public ByteBuffer getBuffer() {
38+
return buffers.getBuffer();
39+
}
40+
41+
/** Must be called only if the user has not been used for a write operation,
42+
* else it will be automatically free when write operation is done.
43+
*/
44+
public void freeBuffer(ByteBuffer buffer) {
45+
buffers.freeBuffer(buffer);
46+
}
47+
48+
/**
49+
* Queue the buffer to write. If there is no pending write, the write operation is started.
50+
* If too many write operations are pending, the method is blocking.
51+
* @param buffer the buffer to write.
52+
*/
53+
public AsyncWork<Integer,IOException> write(ByteBuffer buffer) throws IOException {
54+
do {
55+
synchronized (waiting) {
56+
if (waiting.isEmpty()) {
57+
return io.writeAsync(buffer, new RunnableWithParameter<Pair<Integer,IOException>>() {
58+
@Override
59+
public void run(Pair<Integer, IOException> param) {
60+
writeDone(buffer);
61+
}
62+
});
63+
}
64+
if (!waiting.isFull()) {
65+
AsyncWork<Integer,IOException> res = new AsyncWork<>();
66+
waiting.addLast(new Pair<>(buffer, res));
67+
return res;
68+
}
69+
if (lock != null)
70+
throw new IOException("Concurrent write");
71+
lock = new SynchronizationPoint<>();
72+
}
73+
lock.block(0);
74+
} while (true);
75+
}
76+
77+
private void writeDone(ByteBuffer buffer) {
78+
buffers.freeBuffer(buffer);
79+
SynchronizationPoint<NoException> sp = null;
80+
synchronized (waiting) {
81+
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollFirst();
82+
if (b != null) {
83+
io.writeAsync(b.getValue1(), new RunnableWithParameter<Pair<Integer,IOException>>() {
84+
@Override
85+
public void run(Pair<Integer, IOException> param) {
86+
writeDone(b.getValue1());
87+
}
88+
}).listenInline(b.getValue2());
89+
if (lock != null) {
90+
sp = lock;
91+
lock = null;
92+
}
93+
}
94+
}
95+
if (sp != null)
96+
sp.unblock();
97+
}
98+
99+
/** Return the last pending operation, or null. */
100+
public AsyncWork<Integer, IOException> getLastPendingOperation() {
101+
Pair<ByteBuffer,AsyncWork<Integer,IOException>> b = waiting.pollLast();
102+
if (b == null)
103+
return null;
104+
return b.getValue2();
105+
}
106+
107+
}

0 commit comments

Comments
 (0)