Skip to content

Commit 4548b53

Browse files
committed
add tests on LinkedIO + fix it
1 parent b10a089 commit 4548b53

File tree

7 files changed

+265
-126
lines changed

7 files changed

+265
-126
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/io/LinkedIO.java

Lines changed: 163 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ public int writeSync(long pos, ByteBuffer buffer) throws IOException {
444444

445445
@Override
446446
public int writeSync(ByteBuffer buffer) throws IOException {
447-
return super.writeSync(pos, buffer);
447+
return super.writeSync(buffer);
448448
}
449449

450450
@Override
@@ -456,7 +456,7 @@ public AsyncWork<Integer, IOException> writeAsync(
456456

457457
@Override
458458
public AsyncWork<Integer, IOException> writeAsync(ByteBuffer buffer, RunnableWithParameter<Pair<Integer, IOException>> ondone) {
459-
return super.writeAsync(pos, buffer, ondone);
459+
return super.writeAsync(buffer, ondone);
460460
}
461461
}
462462

@@ -984,10 +984,7 @@ protected int readSync(long pos, ByteBuffer buffer) throws IOException {
984984
}
985985
if (p + s.longValue() > pos) {
986986
IO.Readable.Seekable io = (IO.Readable.Seekable)ios.get(i);
987-
int nb = io.readSync(pos - p, buffer);
988-
ioIndex = i;
989-
posInIO = pos - p + nb;
990-
return nb;
987+
return io.readSync(pos - p, buffer);
991988
}
992989
p += s.longValue();
993990
i++;
@@ -1013,21 +1010,13 @@ protected AsyncWork<Integer, IOException> readAsync(
10131010
sizes.set(ii, seek.getResult());
10141011
readAsync(pos, buffer, ondone).listenInline(result);
10151012
}), result);
1016-
return result;
1013+
return operation(result);
10171014
}
10181015
sizes.set(i, s = seek.getResult());
10191016
}
10201017
if (p + s.longValue() > pos) {
10211018
IO.Readable.Seekable io = (IO.Readable.Seekable)ios.get(i);
1022-
ioIndex = i;
1023-
posInIO = pos - p;
1024-
AsyncWork<Integer, IOException> result = io.readAsync(pos - p, buffer, (res) -> {
1025-
if (res.getValue1() != null) {
1026-
posInIO += res.getValue1().intValue();
1027-
}
1028-
if (ondone != null) ondone.run(res);
1029-
});
1030-
return result;
1019+
return operation(io.readAsync(pos - p, buffer, ondone));
10311020
}
10321021
p += s.longValue();
10331022
i++;
@@ -1046,10 +1035,40 @@ protected AsyncWork<Integer, IOException> readFullyAsync(
10461035
return operation(IOUtil.readFullyAsync((IO.Readable.Seekable)this, pos, buffer, ondone));
10471036
}
10481037

1038+
@SuppressWarnings("resource")
1039+
protected int writeSync(ByteBuffer buffer) throws IOException {
1040+
int done = 0;
1041+
while (ioIndex < ios.size()) {
1042+
Long s = sizes.get(ioIndex);
1043+
IO.Writable.Seekable io = (IO.Writable.Seekable)ios.get(ioIndex);
1044+
if (s == null) {
1045+
s = Long.valueOf(io.seekSync(SeekType.FROM_END, 0));
1046+
sizes.set(ioIndex, s);
1047+
io.seekSync(SeekType.FROM_BEGINNING, posInIO);
1048+
}
1049+
if (posInIO < s.longValue()) {
1050+
int len = (int)(s.longValue() - posInIO);
1051+
int limit = buffer.limit();
1052+
if (buffer.remaining() > len)
1053+
buffer.limit(limit - (buffer.remaining() - len));
1054+
int nb = io.writeSync(buffer);
1055+
buffer.limit(limit);
1056+
posInIO += nb;
1057+
done += nb;
1058+
pos += nb;
1059+
if (!buffer.hasRemaining())
1060+
return done;
1061+
}
1062+
nextIOSync();
1063+
}
1064+
return done;
1065+
}
1066+
10491067
@SuppressWarnings("resource")
10501068
protected int writeSync(long pos, ByteBuffer buffer) throws IOException {
10511069
long p = 0;
10521070
int i = 0;
1071+
int done = 0;
10531072
while (i < ios.size()) {
10541073
Long s = sizes.get(i);
10551074
if (s == null) {
@@ -1059,15 +1078,96 @@ protected int writeSync(long pos, ByteBuffer buffer) throws IOException {
10591078
}
10601079
if (p + s.longValue() > pos) {
10611080
IO.Writable.Seekable io = (IO.Writable.Seekable)ios.get(i);
1081+
int len = (int)(s.longValue() - (pos - p));
1082+
int limit = buffer.limit();
1083+
if (buffer.remaining() > len)
1084+
buffer.limit(limit - (buffer.remaining() - len));
10621085
int nb = io.writeSync(pos - p, buffer);
1063-
ioIndex = i;
1064-
posInIO = pos - p + nb;
1065-
return nb;
1086+
buffer.limit(limit);
1087+
done += nb;
1088+
pos += nb;
1089+
if (!buffer.hasRemaining())
1090+
return done;
10661091
}
10671092
p += s.longValue();
10681093
i++;
10691094
}
1070-
return -1;
1095+
return done;
1096+
}
1097+
1098+
@SuppressWarnings("resource")
1099+
protected AsyncWork<Integer, IOException> writeAsync(ByteBuffer buffer, RunnableWithParameter<Pair<Integer, IOException>> ondone) {
1100+
AsyncWork<Integer, IOException> result = new AsyncWork<>();
1101+
writeAsync(buffer, 0, result, ondone);
1102+
return operation(result);
1103+
}
1104+
1105+
private void writeAsync(
1106+
ByteBuffer buffer, int done, AsyncWork<Integer, IOException> result, RunnableWithParameter<Pair<Integer, IOException>> ondone
1107+
) {
1108+
if (ioIndex == ios.size()) {
1109+
IOUtil.success(Integer.valueOf(done), result, ondone);
1110+
return;
1111+
}
1112+
IO.Writable.Seekable io = (IO.Writable.Seekable)ios.get(ioIndex);
1113+
Long s = sizes.get(ioIndex);
1114+
if (s == null) {
1115+
AsyncWork<Long, IOException> seek = io.seekAsync(SeekType.FROM_END, 0);
1116+
if (!seek.isUnblocked()) {
1117+
seek.listenAsync(new Task.Cpu.FromRunnable("LinkedIO.writeAsync", getPriority(), () -> {
1118+
if (seek.hasError())
1119+
IOUtil.error(seek.getError(), result, ondone);
1120+
else {
1121+
sizes.set(ioIndex, seek.getResult());
1122+
writeAsync(buffer, done, result, ondone);
1123+
}
1124+
}), true);
1125+
return;
1126+
}
1127+
sizes.set(ioIndex, s = seek.getResult());
1128+
}
1129+
if (posInIO == s.longValue()) {
1130+
nextIOAsync(() -> {
1131+
writeAsync(buffer, done, result, ondone);
1132+
}, result, ondone);
1133+
return;
1134+
}
1135+
int len = (int)(s.longValue() - posInIO);
1136+
int limit = buffer.limit();
1137+
if (buffer.remaining() > len)
1138+
buffer.limit(limit - (buffer.remaining() - len));
1139+
AsyncWork<Long, IOException> seek = io.seekAsync(SeekType.FROM_BEGINNING, posInIO);
1140+
if (seek.isUnblocked())
1141+
writeAsync(io, limit, buffer, done, result, ondone);
1142+
else
1143+
seek.listenInline(() -> {
1144+
if (seek.hasError())
1145+
IOUtil.error(seek.getError(), result, ondone);
1146+
else
1147+
writeAsync(io, limit, buffer, done, result, ondone);
1148+
});
1149+
}
1150+
1151+
private void writeAsync(
1152+
IO.Writable.Seekable io, int limit, ByteBuffer buffer, int done,
1153+
AsyncWork<Integer, IOException> result, RunnableWithParameter<Pair<Integer, IOException>> ondone
1154+
) {
1155+
AsyncWork<Integer, IOException> write = io.writeAsync(buffer);
1156+
write.listenAsync(new Task.Cpu.FromRunnable("LinkedIO.writeAsync", getPriority(), () -> {
1157+
buffer.limit(limit);
1158+
if (write.hasError()) {
1159+
IOUtil.error(write.getError(), result, ondone);
1160+
return;
1161+
}
1162+
int nb = write.getResult().intValue();
1163+
posInIO += nb;
1164+
pos += nb;
1165+
if (!buffer.hasRemaining() || ioIndex == ios.size() - 1) {
1166+
IOUtil.success(Integer.valueOf(done + nb), result, ondone);
1167+
return;
1168+
}
1169+
writeAsync(buffer, done + nb, result, ondone);
1170+
}), true);
10711171
}
10721172

10731173
@SuppressWarnings("resource")
@@ -1082,7 +1182,7 @@ protected AsyncWork<Integer, IOException> writeAsync(long pos, ByteBuffer buffer
10821182
if (!seek.isUnblocked()) {
10831183
AsyncWork<Integer, IOException> result = new AsyncWork<>();
10841184
int ii = i;
1085-
seek.listenAsync(new Task.Cpu.FromRunnable("LinkedIO.readAsync", getPriority(), () -> {
1185+
seek.listenAsync(new Task.Cpu.FromRunnable("LinkedIO.writeAsync", getPriority(), () -> {
10861186
sizes.set(ii, seek.getResult());
10871187
writeAsync(pos, buffer, ondone).listenInline(result);
10881188
}), result);
@@ -1091,15 +1191,8 @@ protected AsyncWork<Integer, IOException> writeAsync(long pos, ByteBuffer buffer
10911191
sizes.set(i, s = seek.getResult());
10921192
}
10931193
if (p + s.longValue() > pos) {
1094-
IO.Writable.Seekable io = (IO.Writable.Seekable)ios.get(i);
1095-
ioIndex = i;
1096-
posInIO = pos - p;
1097-
AsyncWork<Integer, IOException> result = io.writeAsync(pos - p, buffer, (res) -> {
1098-
if (res.getValue1() != null) {
1099-
posInIO += res.getValue1().intValue();
1100-
}
1101-
if (ondone != null) ondone.run(res);
1102-
});
1194+
AsyncWork<Integer, IOException> result = new AsyncWork<>();
1195+
writeAsync(i, p, pos, 0, buffer, result, ondone);
11031196
return result;
11041197
}
11051198
p += s.longValue();
@@ -1109,4 +1202,44 @@ protected AsyncWork<Integer, IOException> writeAsync(long pos, ByteBuffer buffer
11091202
return new AsyncWork<>(Integer.valueOf(-1), null);
11101203
}
11111204

1205+
private void writeAsync(
1206+
int i, long p, long pos, int done, ByteBuffer buffer, AsyncWork<Integer, IOException> result,
1207+
RunnableWithParameter<Pair<Integer, IOException>> ondone
1208+
) {
1209+
IO.Writable.Seekable io = (IO.Writable.Seekable)ios.get(i);
1210+
Long s = sizes.get(i);
1211+
if (s == null) {
1212+
AsyncWork<Long, IOException> seek = io.seekAsync(SeekType.FROM_END, 0);
1213+
if (!seek.isUnblocked()) {
1214+
seek.listenAsync(new Task.Cpu.FromRunnable("LinkedIO.writeAsync", getPriority(), () -> {
1215+
sizes.set(i, seek.getResult());
1216+
writeAsync(i, p, pos, done, buffer, result, ondone);
1217+
}), result);
1218+
return;
1219+
}
1220+
sizes.set(i, s = seek.getResult());
1221+
}
1222+
int len = (int)(s.longValue() - (pos - p));
1223+
int limit = buffer.limit();
1224+
if (buffer.remaining() > len)
1225+
buffer.limit(limit - (buffer.remaining() - len));
1226+
AsyncWork<Integer, IOException> write = io.writeAsync(pos - p, buffer);
1227+
long ioSize = s.longValue();
1228+
write.listenInline(() -> {
1229+
buffer.limit(limit);
1230+
if (write.hasError()) {
1231+
IOUtil.error(write.getError(), result, ondone);
1232+
return;
1233+
}
1234+
int nb = write.getResult().intValue();
1235+
if (!buffer.hasRemaining() || i == ios.size() - 1) {
1236+
IOUtil.success(Integer.valueOf(done + nb), result, ondone);
1237+
return;
1238+
}
1239+
new Task.Cpu.FromRunnable("LinkedIO.writeAsync", getPriority(), () -> {
1240+
writeAsync(i + 1, p + ioSize, p + ioSize, done + nb, buffer, result, ondone);
1241+
}).start();
1242+
});
1243+
}
1244+
11121245
}

0 commit comments

Comments
 (0)