Skip to content

Commit ceaf0bd

Browse files
committed
fix LimitAsyncOperations
1 parent 55c13c5 commit ceaf0bd

File tree

2 files changed

+166
-147
lines changed

2 files changed

+166
-147
lines changed
Lines changed: 162 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -1,144 +1,162 @@
1-
package net.lecousin.framework.concurrent.util;
2-
3-
import java.io.IOException;
4-
5-
import net.lecousin.framework.collections.TurnArray;
6-
import net.lecousin.framework.concurrent.CancelException;
7-
import net.lecousin.framework.concurrent.synch.AsyncWork;
8-
import net.lecousin.framework.concurrent.synch.ISynchronizationPoint;
9-
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
10-
import net.lecousin.framework.exception.NoException;
11-
import net.lecousin.framework.util.Pair;
12-
13-
/**
14-
* This class allows to queue asynchronous operations, but blocks if too many are waiting.
15-
* This can typically used in operations receiving data, and writing to an IO, when the amount of data can be large:
16-
* if we receive more data than the write operations can do, and we need to avoid having too much buffers in memory waiting
17-
* to write.
18-
*
19-
* @param <InputType> input data
20-
* @param <OutputResultType> output result
21-
* @param <OutputErrorType> error
22-
*/
23-
public class LimitAsyncOperations<InputType, OutputResultType, OutputErrorType extends Exception> {
24-
25-
/** Constructor. */
26-
public LimitAsyncOperations(int maxOperations, Executor<InputType, OutputResultType, OutputErrorType> executor) {
27-
waiting = new TurnArray<>(maxOperations);
28-
this.executor = executor;
29-
}
30-
31-
private Executor<InputType, OutputResultType, OutputErrorType> executor;
32-
private TurnArray<Pair<InputType,AsyncWork<OutputResultType,OutputErrorType>>> waiting;
33-
private SynchronizationPoint<NoException> lock = null;
34-
private AsyncWork<OutputResultType, OutputErrorType> lastWrite = new AsyncWork<>(null, null);
35-
36-
/**
37-
* Executor of write operation.
38-
*
39-
* @param <InputType> input data
40-
* @param <OutputResultType> output result
41-
* @param <OutputErrorType> error
42-
*/
43-
public static interface Executor<InputType, OutputResultType, OutputErrorType extends Exception> {
44-
/** Launch asynchronous operation. */
45-
AsyncWork<OutputResultType, OutputErrorType> execute(InputType data);
46-
}
47-
48-
/**
49-
* Queue the data to write. If there is no pending write, the write operation is started.
50-
* If too many write operations are pending, the method is blocking.
51-
* @param data the data to write.
52-
*/
53-
public AsyncWork<OutputResultType, OutputErrorType> write(InputType data) throws IOException {
54-
do {
55-
SynchronizationPoint<NoException> lk;
56-
synchronized (waiting) {
57-
if (lastWrite.isCancelled()) return lastWrite;
58-
if (waiting.isEmpty() && lastWrite.isUnblocked()) {
59-
AsyncWork<OutputResultType, OutputErrorType> res = executor.execute(data);
60-
AsyncWork<OutputResultType, OutputErrorType> done = new AsyncWork<>();
61-
lastWrite = done;
62-
res.listenInline(
63-
(nb) -> {
64-
writeDone(data, null);
65-
done.unblockSuccess(nb);
66-
},
67-
(error) -> {
68-
writeDone(data, null);
69-
done.error(error);
70-
},
71-
(cancel) -> {
72-
writeDone(data, cancel);
73-
done.cancel(cancel);
74-
}
75-
);
76-
return lastWrite;
77-
}
78-
if (!waiting.isFull()) {
79-
AsyncWork<OutputResultType, OutputErrorType> res = new AsyncWork<>();
80-
waiting.addLast(new Pair<>(data, res));
81-
lastWrite = res;
82-
return res;
83-
}
84-
if (lock != null)
85-
throw new IOException("Concurrent write");
86-
lock = new SynchronizationPoint<>();
87-
lk = lock;
88-
}
89-
lk.block(0);
90-
} while (true);
91-
}
92-
93-
protected void writeDone(@SuppressWarnings("unused") InputType data, CancelException cancelled) {
94-
SynchronizationPoint<NoException> sp = null;
95-
synchronized (waiting) {
96-
Pair<InputType, AsyncWork<OutputResultType, OutputErrorType>> b = waiting.pollFirst();
97-
if (b != null) {
98-
if (lock != null) {
99-
sp = lock;
100-
lock = null;
101-
}
102-
if (cancelled != null) {
103-
while (b != null) {
104-
b.getValue2().cancel(cancelled);
105-
b = waiting.pollFirst();
106-
}
107-
} else {
108-
InputType newData = b.getValue1();
109-
AsyncWork<OutputResultType, OutputErrorType> write = executor.execute(newData);
110-
Pair<InputType, AsyncWork<OutputResultType, OutputErrorType>> bb = b;
111-
write.listenInline(
112-
(nb) -> {
113-
writeDone(newData, null);
114-
bb.getValue2().unblockSuccess(nb);
115-
},
116-
(error) -> {
117-
writeDone(newData, null);
118-
bb.getValue2().error(error);
119-
},
120-
(cancel) -> {
121-
writeDone(newData, cancel);
122-
bb.getValue2().cancel(cancel);
123-
}
124-
);
125-
}
126-
}
127-
}
128-
if (sp != null)
129-
sp.unblock();
130-
}
131-
132-
/** Return the last pending operation, or null. */
133-
public AsyncWork<OutputResultType, OutputErrorType> getLastPendingOperation() {
134-
return lastWrite.isUnblocked() ? null : lastWrite;
135-
}
136-
137-
/** Same as getLastPendingOperation but never return null (return an unblocked synchronization point instead). */
138-
public ISynchronizationPoint<OutputErrorType> flush() {
139-
ISynchronizationPoint<OutputErrorType> sp = getLastPendingOperation();
140-
if (sp == null) sp = new SynchronizationPoint<>(true);
141-
return sp;
142-
}
143-
144-
}
1+
package net.lecousin.framework.concurrent.util;
2+
3+
import java.io.IOException;
4+
5+
import net.lecousin.framework.collections.TurnArray;
6+
import net.lecousin.framework.concurrent.CancelException;
7+
import net.lecousin.framework.concurrent.synch.AsyncWork;
8+
import net.lecousin.framework.concurrent.synch.ISynchronizationPoint;
9+
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
10+
import net.lecousin.framework.exception.NoException;
11+
import net.lecousin.framework.util.Pair;
12+
13+
/**
14+
* This class allows to queue asynchronous operations, but blocks if too many are waiting.
15+
* This can typically used in operations receiving data, and writing to an IO, when the amount of data can be large:
16+
* if we receive more data than the write operations can do, and we need to avoid having too much buffers in memory waiting
17+
* to write.
18+
*
19+
* @param <InputType> input data
20+
* @param <OutputResultType> output result
21+
* @param <OutputErrorType> error
22+
*/
23+
public class LimitAsyncOperations<InputType, OutputResultType, OutputErrorType extends Exception> {
24+
25+
/** Constructor. */
26+
public LimitAsyncOperations(int maxOperations, Executor<InputType, OutputResultType, OutputErrorType> executor) {
27+
waiting = new TurnArray<>(maxOperations);
28+
this.executor = executor;
29+
}
30+
31+
private Executor<InputType, OutputResultType, OutputErrorType> executor;
32+
private TurnArray<Pair<InputType,AsyncWork<OutputResultType,OutputErrorType>>> waiting;
33+
private SynchronizationPoint<NoException> lock = null;
34+
private AsyncWork<OutputResultType, OutputErrorType> lastWrite = new AsyncWork<>(null, null);
35+
private CancelException cancelled = null;
36+
private OutputErrorType error = null;
37+
private boolean isReady = true;
38+
39+
/**
40+
* Executor of write operation.
41+
*
42+
* @param <InputType> input data
43+
* @param <OutputResultType> output result
44+
* @param <OutputErrorType> error
45+
*/
46+
public static interface Executor<InputType, OutputResultType, OutputErrorType extends Exception> {
47+
/** Launch asynchronous operation. */
48+
AsyncWork<OutputResultType, OutputErrorType> execute(InputType data);
49+
}
50+
51+
/**
52+
* Queue the data to write. If there is no pending write, the write operation is started.
53+
* If too many write operations are pending, the method is blocking.
54+
* @param data the data to write.
55+
*/
56+
public AsyncWork<OutputResultType, OutputErrorType> write(InputType data) throws IOException {
57+
do {
58+
SynchronizationPoint<NoException> lk;
59+
synchronized (waiting) {
60+
// if cancelled or errored, return immediately
61+
if (error != null) return new AsyncWork<>(null, error);
62+
if (cancelled != null) return new AsyncWork<>(null, null, cancelled);
63+
64+
AsyncWork<OutputResultType, OutputErrorType> op;
65+
// if ready, write immediately
66+
if (isReady) {
67+
isReady = false;
68+
op = lastWrite = executor.execute(data);
69+
lastWrite.listenInline(new WriteListener(data, op, null));
70+
return op;
71+
}
72+
// not ready
73+
if (!waiting.isFull()) {
74+
op = new AsyncWork<>();
75+
waiting.addLast(new Pair<>(data, op));
76+
lastWrite = op;
77+
return op;
78+
}
79+
// full
80+
if (lock != null)
81+
throw new IOException("Concurrent write");
82+
lock = new SynchronizationPoint<>();
83+
lk = lock;
84+
}
85+
lk.block(0);
86+
} while (true);
87+
}
88+
89+
private class WriteListener implements Runnable {
90+
public WriteListener(
91+
InputType data, AsyncWork<OutputResultType, OutputErrorType> op, AsyncWork<OutputResultType, OutputErrorType> result
92+
) {
93+
this.data = data;
94+
this.op = op;
95+
this.result = result;
96+
}
97+
98+
private InputType data;
99+
private AsyncWork<OutputResultType, OutputErrorType> op;
100+
private AsyncWork<OutputResultType, OutputErrorType> result;
101+
102+
@Override
103+
public void run() {
104+
SynchronizationPoint<NoException> lk = null;
105+
synchronized (waiting) {
106+
if (lock != null) {
107+
lk = lock;
108+
lock = null;
109+
}
110+
if (op.hasError()) error = op.getError();
111+
else if (op.isCancelled()) cancelled = op.getCancelEvent();
112+
else {
113+
Pair<InputType, AsyncWork<OutputResultType, OutputErrorType>> b = waiting.pollFirst();
114+
if (b != null) {
115+
// something is waiting
116+
AsyncWork<OutputResultType, OutputErrorType> newOp = executor.execute(b.getValue1());
117+
lastWrite = newOp;
118+
newOp.listenInline(new WriteListener(b.getValue1(), newOp, b.getValue2()));
119+
} else
120+
isReady = true;
121+
}
122+
}
123+
if (result != null)
124+
op.listenInline(result);
125+
if (lk != null)
126+
lk.unblock();
127+
writeDone(data, op);
128+
}
129+
}
130+
131+
@SuppressWarnings("unused")
132+
protected void writeDone(InputType data, AsyncWork<OutputResultType, OutputErrorType> result) {
133+
// to be overriden if needed
134+
}
135+
136+
/** Return the last pending operation, or null. */
137+
public AsyncWork<OutputResultType, OutputErrorType> getLastPendingOperation() {
138+
return lastWrite.isUnblocked() ? null : lastWrite;
139+
}
140+
141+
/** Same as getLastPendingOperation but never return null (return an unblocked synchronization point instead). */
142+
public ISynchronizationPoint<OutputErrorType> flush() {
143+
SynchronizationPoint<OutputErrorType> sp = new SynchronizationPoint<>();
144+
Runnable callback = new Runnable() {
145+
@Override
146+
public void run() {
147+
AsyncWork<OutputResultType, OutputErrorType> last = null;
148+
synchronized (waiting) {
149+
if (error != null) sp.error(error);
150+
else if (cancelled != null) sp.cancel(cancelled);
151+
else if (isReady) sp.unblock();
152+
else last = lastWrite;
153+
}
154+
if (last != null)
155+
last.listenInline(this);
156+
}
157+
};
158+
callback.run();
159+
return sp;
160+
}
161+
162+
}

net.lecousin.core/src/main/java/net/lecousin/framework/io/util/LimitWriteOperationsReuseBuffers.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package net.lecousin.framework.io.util;
22

3+
import java.io.IOException;
34
import java.nio.ByteBuffer;
45

5-
import net.lecousin.framework.concurrent.CancelException;
6+
import net.lecousin.framework.concurrent.synch.AsyncWork;
67
import net.lecousin.framework.io.IO;
78

89
/**
@@ -36,9 +37,9 @@ public void freeBuffer(ByteBuffer buffer) {
3637
}
3738

3839
@Override
39-
protected void writeDone(ByteBuffer buffer, CancelException cancelled) {
40+
protected void writeDone(ByteBuffer buffer, AsyncWork<Integer, IOException> result) {
4041
buffers.freeBuffer(buffer);
41-
super.writeDone(buffer, cancelled);
42+
super.writeDone(buffer, result);
4243
}
4344

4445
}

0 commit comments

Comments
 (0)