Skip to content

Commit 26a4d6c

Browse files
committed
add test on LinkedIO + fix it
1 parent 2a9ac41 commit 26a4d6c

File tree

4 files changed

+258
-17
lines changed

4 files changed

+258
-17
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/io/IOWritePool.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,7 @@ public void write(ByteBuffer buffer) throws IOException {
4040
writing.listenInline(listener);
4141
return;
4242
}
43-
if (writing.hasError())
44-
throw writing.getError();
43+
if (writing.hasError()) throw writing.getError();
4544
buffers.add(buffer);
4645
}
4746
}
@@ -51,14 +50,10 @@ public void write(ByteBuffer buffer) throws IOException {
5150
*/
5251
public SynchronizationPoint<IOException> onDone() {
5352
synchronized (buffers) {
54-
if (writing == null)
55-
return new SynchronizationPoint<>(true);
56-
if (writing.hasError())
57-
return new SynchronizationPoint<IOException>(writing.getError());
58-
if (writing.isCancelled())
59-
return new SynchronizationPoint<IOException>(writing.getCancelEvent());
60-
if (waitDone == null)
61-
waitDone = new SynchronizationPoint<>();
53+
if (writing == null) return new SynchronizationPoint<>(true);
54+
if (writing.hasError()) return new SynchronizationPoint<IOException>(writing.getError());
55+
if (writing.isCancelled()) return new SynchronizationPoint<IOException>(writing.getCancelEvent());
56+
if (waitDone == null) waitDone = new SynchronizationPoint<>();
6257
}
6358
return waitDone;
6459
}
@@ -81,14 +76,12 @@ public void ready(Integer result) {
8176

8277
@Override
8378
public void error(IOException error) {
84-
if (waitDone != null)
85-
waitDone.error(error);
79+
if (waitDone != null) waitDone.error(error);
8680
}
8781

8882
@Override
8983
public void cancelled(CancelException event) {
90-
if (waitDone != null)
91-
waitDone.cancel(event);
84+
if (waitDone != null) waitDone.cancel(event);
9285
}
9386

9487
}

net.lecousin.core/src/main/java/net/lecousin/framework/io/LinkedIO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -924,7 +924,7 @@ protected long seekSync(SeekType type, long move) throws IOException {
924924
if (sizes.get(i) == null) {
925925
@SuppressWarnings("resource")
926926
IO.Readable.Seekable io = (IO.Readable.Seekable)ios.get(i);
927-
sizes.set(ioIndex, Long.valueOf(io.seekSync(SeekType.FROM_END, 0)));
927+
sizes.set(i, Long.valueOf(io.seekSync(SeekType.FROM_END, 0)));
928928
}
929929
p += sizes.get(i).longValue();
930930
}

net.lecousin.core/src/test/java/net/lecousin/framework/core/tests/io/TestLinkedIOWithSubIOReadableSeekable.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@
88
import org.junit.runners.Parameterized.Parameters;
99

1010
import net.lecousin.framework.core.test.io.TestFragmented;
11-
import net.lecousin.framework.core.test.io.TestReadableSeekable;
1211
import net.lecousin.framework.core.test.io.TestFragmented.FragmentedFile;
12+
import net.lecousin.framework.core.test.io.TestReadableSeekable;
1313
import net.lecousin.framework.io.FileIO;
1414
import net.lecousin.framework.io.IO;
1515
import net.lecousin.framework.io.LinkedIO;
@@ -52,5 +52,4 @@ protected IO.Readable.Seekable createReadableSeekableFromFile(FileIO.ReadOnly fi
5252
protected boolean canSetPriority() {
5353
return !f.fragments.isEmpty() && f.nbBuf > 0;
5454
}
55-
5655
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
package net.lecousin.framework.core.tests.io;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import java.util.Collection;
6+
7+
import org.junit.runner.RunWith;
8+
import org.junit.runners.Parameterized;
9+
import org.junit.runners.Parameterized.Parameters;
10+
11+
import net.lecousin.framework.concurrent.TaskManager;
12+
import net.lecousin.framework.concurrent.synch.AsyncWork;
13+
import net.lecousin.framework.concurrent.synch.ISynchronizationPoint;
14+
import net.lecousin.framework.core.test.io.TestFragmented;
15+
import net.lecousin.framework.core.test.io.TestFragmented.FragmentedFile;
16+
import net.lecousin.framework.core.test.io.TestReadableSeekable;
17+
import net.lecousin.framework.event.Listener;
18+
import net.lecousin.framework.io.FileIO;
19+
import net.lecousin.framework.io.IO;
20+
import net.lecousin.framework.io.LinkedIO;
21+
import net.lecousin.framework.io.SubIO;
22+
import net.lecousin.framework.io.buffering.BufferedIO;
23+
import net.lecousin.framework.math.RangeLong;
24+
import net.lecousin.framework.util.CloseableListenable;
25+
import net.lecousin.framework.util.Pair;
26+
import net.lecousin.framework.util.RunnableWithParameter;
27+
28+
@RunWith(Parameterized.class)
29+
public class TestLinkedIOWithSubIOReadableSeekable2 extends TestReadableSeekable {
30+
31+
@Parameters
32+
public static Collection<Object[]> parameters() throws IOException {
33+
return TestFragmented.generateTestCases();
34+
}
35+
36+
public TestLinkedIOWithSubIOReadableSeekable2(FragmentedFile f) {
37+
super(f.file, f.testBuf, f.nbBuf);
38+
this.f = f;
39+
}
40+
41+
private FragmentedFile f;
42+
43+
@SuppressWarnings("resource")
44+
@Override
45+
protected IO.Readable.Seekable createReadableSeekableFromFile(FileIO.ReadOnly file, long fileSize) throws Exception {
46+
// this test may be very slow, let's add a buffered layer
47+
BufferedIO buffered = new BufferedIO(file, f.realSize, 32768, 32768, false);
48+
IO.Readable.Seekable[] ios = new IO.Readable.Seekable[f.fragments.size()];
49+
int i = 0;
50+
for (RangeLong fragment : f.fragments)
51+
ios[i++] = new SizeNotKnown(new SubIO.Readable.Seekable(buffered, fragment.min, fragment.getLength(), "fragment " + i, false));
52+
LinkedIO.Readable.Seekable res = new LinkedIO.Readable.Seekable.DeterminedSize("linked IO", ios);
53+
res.addCloseListener(() -> {
54+
buffered.closeAsync();
55+
});
56+
return res;
57+
}
58+
59+
@Override
60+
protected boolean canSetPriority() {
61+
return !f.fragments.isEmpty() && f.nbBuf > 0;
62+
}
63+
64+
65+
public static class SizeNotKnown implements IO.Readable.Seekable {
66+
public SizeNotKnown(IO.Readable.Seekable io) {
67+
this.io = io;
68+
}
69+
70+
private IO.Readable.Seekable io;
71+
72+
@Override
73+
public boolean lockClose() {
74+
return io.lockClose();
75+
}
76+
77+
@Override
78+
public boolean isClosed() {
79+
return io.isClosed();
80+
}
81+
82+
@Override
83+
public void unlockClose() {
84+
io.unlockClose();
85+
}
86+
87+
@Override
88+
public void addCloseListener(Listener<CloseableListenable> listener) {
89+
io.addCloseListener(listener);
90+
}
91+
92+
@Override
93+
public void addCloseListener(Runnable listener) {
94+
io.addCloseListener(listener);
95+
}
96+
97+
@Override
98+
public ISynchronizationPoint<IOException> canStartReading() {
99+
return io.canStartReading();
100+
}
101+
102+
@Override
103+
public void close() throws Exception {
104+
io.close();
105+
}
106+
107+
@Override
108+
public ISynchronizationPoint<Exception> closeAsync() {
109+
return io.closeAsync();
110+
}
111+
112+
@Override
113+
public String getSourceDescription() {
114+
return io.getSourceDescription();
115+
}
116+
117+
@Override
118+
public void removeCloseListener(Listener<CloseableListenable> listener) {
119+
io.removeCloseListener(listener);
120+
}
121+
122+
@Override
123+
public IO getWrappedIO() {
124+
return io.getWrappedIO();
125+
}
126+
127+
@Override
128+
public void removeCloseListener(Runnable listener) {
129+
io.removeCloseListener(listener);
130+
}
131+
132+
@Override
133+
public byte getPriority() {
134+
return io.getPriority();
135+
}
136+
137+
@Override
138+
public void setPriority(byte priority) {
139+
io.setPriority(priority);
140+
}
141+
142+
@Override
143+
public TaskManager getTaskManager() {
144+
return io.getTaskManager();
145+
}
146+
147+
@Override
148+
public long getPosition() throws IOException {
149+
return io.getPosition();
150+
}
151+
152+
@Override
153+
public long seekSync(SeekType type, long move) throws IOException {
154+
return io.seekSync(type, move);
155+
}
156+
157+
@Override
158+
public AsyncWork<Long, IOException> seekAsync(SeekType type, long move,
159+
RunnableWithParameter<Pair<Long, IOException>> ondone) {
160+
return io.seekAsync(type, move, ondone);
161+
}
162+
163+
@Override
164+
public AsyncWork<Long, IOException> seekAsync(SeekType type, long move) {
165+
return io.seekAsync(type, move);
166+
}
167+
168+
@Override
169+
public int readSync(ByteBuffer buffer) throws IOException {
170+
return io.readSync(buffer);
171+
}
172+
173+
@Override
174+
public AsyncWork<Integer, IOException> readAsync(ByteBuffer buffer,
175+
RunnableWithParameter<Pair<Integer, IOException>> ondone) {
176+
return io.readAsync(buffer, ondone);
177+
}
178+
179+
@Override
180+
public AsyncWork<Integer, IOException> readAsync(ByteBuffer buffer) {
181+
return io.readAsync(buffer);
182+
}
183+
184+
@Override
185+
public int readFullySync(ByteBuffer buffer) throws IOException {
186+
return io.readFullySync(buffer);
187+
}
188+
189+
@Override
190+
public AsyncWork<Integer, IOException> readFullyAsync(ByteBuffer buffer,
191+
RunnableWithParameter<Pair<Integer, IOException>> ondone) {
192+
return io.readFullyAsync(buffer, ondone);
193+
}
194+
195+
@Override
196+
public AsyncWork<Integer, IOException> readFullyAsync(ByteBuffer buffer) {
197+
return io.readFullyAsync(buffer);
198+
}
199+
200+
@Override
201+
public long skipSync(long n) throws IOException {
202+
return io.skipSync(n);
203+
}
204+
205+
@Override
206+
public AsyncWork<Long, IOException> skipAsync(long n, RunnableWithParameter<Pair<Long, IOException>> ondone) {
207+
return io.skipAsync(n, ondone);
208+
}
209+
210+
@Override
211+
public AsyncWork<Long, IOException> skipAsync(long n) {
212+
return io.skipAsync(n);
213+
}
214+
215+
@Override
216+
public int readSync(long pos, ByteBuffer buffer) throws IOException {
217+
return io.readSync(pos, buffer);
218+
}
219+
220+
@Override
221+
public AsyncWork<Integer, IOException> readAsync(long pos, ByteBuffer buffer,
222+
RunnableWithParameter<Pair<Integer, IOException>> ondone) {
223+
return io.readAsync(pos, buffer, ondone);
224+
}
225+
226+
@Override
227+
public AsyncWork<Integer, IOException> readAsync(long pos, ByteBuffer buffer) {
228+
return io.readAsync(pos, buffer);
229+
}
230+
231+
@Override
232+
public int readFullySync(long pos, ByteBuffer buffer) throws IOException {
233+
return io.readFullySync(pos, buffer);
234+
}
235+
236+
@Override
237+
public AsyncWork<Integer, IOException> readFullyAsync(long pos, ByteBuffer buffer,
238+
RunnableWithParameter<Pair<Integer, IOException>> ondone) {
239+
return io.readFullyAsync(pos, buffer, ondone);
240+
}
241+
242+
@Override
243+
public AsyncWork<Integer, IOException> readFullyAsync(long pos, ByteBuffer buffer) {
244+
return io.readFullyAsync(pos, buffer);
245+
}
246+
247+
}
248+
249+
}

0 commit comments

Comments
 (0)