Skip to content

Commit f5c5a7f

Browse files
committed
few improvements on pending IO operations while closing it
1 parent d0dd37c commit f5c5a7f

File tree

5 files changed

+283
-253
lines changed

5 files changed

+283
-253
lines changed
Lines changed: 126 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -1,121 +1,126 @@
1-
package net.lecousin.framework.concurrent.tasks.drives;
2-
3-
import java.io.IOException;
4-
import java.nio.ByteBuffer;
5-
import java.util.ArrayList;
6-
7-
import net.lecousin.framework.collections.sort.RedBlackTreeInteger;
8-
import net.lecousin.framework.concurrent.Task;
9-
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
10-
import net.lecousin.framework.event.Listener;
11-
import net.lecousin.framework.exception.NoException;
12-
import net.lecousin.framework.util.Pair;
13-
import net.lecousin.framework.util.RunnableWithParameter;
14-
15-
/**
16-
* Task to read some bytes from a file.
17-
*/
18-
class ReadFileTask extends Task.OnFile<Integer,IOException> {
19-
20-
/** Constructor. */
21-
public ReadFileTask(
22-
FileAccess file, long pos, ByteBuffer buffer, boolean fully,
23-
byte priority, RunnableWithParameter<Pair<Integer,IOException>> ondone
24-
) {
25-
super(file.manager, "Read from file " + file.path + " at " + pos, priority, ondone);
26-
this.file = file;
27-
this.pos = pos;
28-
this.buffer = buffer;
29-
this.fully = fully;
30-
file.openTask.ondone(this, false);
31-
}
32-
33-
private FileAccess file;
34-
private long pos;
35-
private ByteBuffer buffer;
36-
private boolean fully;
37-
private int nbRead = 0;
38-
39-
private RedBlackTreeInteger<SynchronizationPoint<NoException>> waiting = null;
40-
private ArrayList<Listener<Integer>> onprogress = null;
41-
42-
public int getCurrentNbRead() { return nbRead; }
43-
44-
public int waitNbRead(int nbRead) throws IOException {
45-
if (this.nbRead >= nbRead) return this.nbRead;
46-
if (getError() != null) throw getError();
47-
if (isDone()) return this.nbRead;
48-
SynchronizationPoint<NoException> sp = new SynchronizationPoint<NoException>();
49-
synchronized (buffer) {
50-
if (getError() != null) throw getError();
51-
if (isDone()) return this.nbRead;
52-
if (waiting == null) waiting = new RedBlackTreeInteger<>();
53-
waiting.add(nbRead, sp);
54-
}
55-
sp.block(0);
56-
if (getError() != null) throw getError();
57-
return this.nbRead;
58-
}
59-
60-
public void onprogress(Listener<Integer> listener) {
61-
synchronized (buffer) {
62-
if (nbRead > 0) listener.fire(Integer.valueOf(nbRead));
63-
if (onprogress == null) onprogress = new ArrayList<>(5);
64-
onprogress.add(listener);
65-
}
66-
}
67-
68-
@Override
69-
public Integer run() throws IOException {
70-
try {
71-
if (!file.openTask.isSuccessful())
72-
throw file.openTask.getError();
73-
nbRead = 0;
74-
if (pos >= 0)
75-
try { file.channel.position(pos); }
76-
catch (IOException e) {
77-
throw new IOException("Unable to seek to position " + pos + " in file " + file.path, e);
78-
}
79-
if (!fully) {
80-
nbRead = file.channel.read(buffer);
81-
callListeners();
82-
} else {
83-
nbRead = 0;
84-
while (buffer.remaining() > 0) {
85-
int nb = file.channel.read(buffer);
86-
if (nb <= 0) break;
87-
nbRead += nb;
88-
callListeners();
89-
}
90-
}
91-
return Integer.valueOf(nbRead);
92-
} finally {
93-
synchronized (buffer) {
94-
if (waiting != null) {
95-
for (SynchronizationPoint<NoException> sp : waiting) sp.unblock();
96-
waiting = null;
97-
}
98-
}
99-
}
100-
}
101-
102-
private void callListeners() {
103-
synchronized (buffer) {
104-
if (onprogress != null)
105-
for (int i = onprogress.size() - 1; i >= 0; --i)
106-
onprogress.get(i).fire(Integer.valueOf(nbRead));
107-
if (waiting != null) {
108-
do {
109-
RedBlackTreeInteger.Node<SynchronizationPoint<NoException>> min = waiting.getMin();
110-
if (min.getValue() <= nbRead) {
111-
min.getElement().unblock();
112-
waiting.removeMin();
113-
} else
114-
break;
115-
} while (!waiting.isEmpty());
116-
if (waiting.isEmpty()) waiting = null;
117-
}
118-
}
119-
}
120-
121-
}
1+
package net.lecousin.framework.concurrent.tasks.drives;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import java.nio.channels.ClosedChannelException;
6+
import java.util.ArrayList;
7+
8+
import net.lecousin.framework.collections.sort.RedBlackTreeInteger;
9+
import net.lecousin.framework.concurrent.CancelException;
10+
import net.lecousin.framework.concurrent.Task;
11+
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
12+
import net.lecousin.framework.event.Listener;
13+
import net.lecousin.framework.exception.NoException;
14+
import net.lecousin.framework.util.Pair;
15+
import net.lecousin.framework.util.RunnableWithParameter;
16+
17+
/**
18+
* Task to read some bytes from a file.
19+
*/
20+
class ReadFileTask extends Task.OnFile<Integer,IOException> {
21+
22+
/** Constructor. */
23+
public ReadFileTask(
24+
FileAccess file, long pos, ByteBuffer buffer, boolean fully,
25+
byte priority, RunnableWithParameter<Pair<Integer,IOException>> ondone
26+
) {
27+
super(file.manager, "Read from file " + file.path + (pos >= 0 ? " at " + pos : ""), priority, ondone);
28+
this.file = file;
29+
this.pos = pos;
30+
this.buffer = buffer;
31+
this.fully = fully;
32+
file.openTask.ondone(this, false);
33+
}
34+
35+
private FileAccess file;
36+
private long pos;
37+
private ByteBuffer buffer;
38+
private boolean fully;
39+
private int nbRead = 0;
40+
41+
private RedBlackTreeInteger<SynchronizationPoint<NoException>> waiting = null;
42+
private ArrayList<Listener<Integer>> onprogress = null;
43+
44+
public int getCurrentNbRead() { return nbRead; }
45+
46+
public int waitNbRead(int nbRead) throws IOException {
47+
if (this.nbRead >= nbRead) return this.nbRead;
48+
if (getError() != null) throw getError();
49+
if (isDone()) return this.nbRead;
50+
SynchronizationPoint<NoException> sp = new SynchronizationPoint<NoException>();
51+
synchronized (buffer) {
52+
if (getError() != null) throw getError();
53+
if (isDone()) return this.nbRead;
54+
if (waiting == null) waiting = new RedBlackTreeInteger<>();
55+
waiting.add(nbRead, sp);
56+
}
57+
sp.block(0);
58+
if (getError() != null) throw getError();
59+
return this.nbRead;
60+
}
61+
62+
public void onprogress(Listener<Integer> listener) {
63+
synchronized (buffer) {
64+
if (nbRead > 0) listener.fire(Integer.valueOf(nbRead));
65+
if (onprogress == null) onprogress = new ArrayList<>(5);
66+
onprogress.add(listener);
67+
}
68+
}
69+
70+
@Override
71+
public Integer run() throws IOException, CancelException {
72+
try {
73+
if (!file.openTask.isSuccessful())
74+
throw file.openTask.getError();
75+
nbRead = 0;
76+
if (pos >= 0)
77+
try { file.channel.position(pos); }
78+
catch (ClosedChannelException e) { throw new CancelException("File has been closed"); }
79+
catch (IOException e) {
80+
throw new IOException("Unable to seek to position " + pos + " in file " + file.path, e);
81+
}
82+
if (!fully) {
83+
nbRead = file.channel.read(buffer);
84+
callListeners();
85+
} else {
86+
nbRead = 0;
87+
while (buffer.remaining() > 0) {
88+
int nb;
89+
try { nb = file.channel.read(buffer); }
90+
catch (ClosedChannelException e) { throw new CancelException("File has been closed"); }
91+
if (nb <= 0) break;
92+
nbRead += nb;
93+
callListeners();
94+
}
95+
}
96+
return Integer.valueOf(nbRead);
97+
} finally {
98+
synchronized (buffer) {
99+
if (waiting != null) {
100+
for (SynchronizationPoint<NoException> sp : waiting) sp.unblock();
101+
waiting = null;
102+
}
103+
}
104+
}
105+
}
106+
107+
private void callListeners() {
108+
synchronized (buffer) {
109+
if (onprogress != null)
110+
for (int i = onprogress.size() - 1; i >= 0; --i)
111+
onprogress.get(i).fire(Integer.valueOf(nbRead));
112+
if (waiting != null) {
113+
do {
114+
RedBlackTreeInteger.Node<SynchronizationPoint<NoException>> min = waiting.getMin();
115+
if (min.getValue() <= nbRead) {
116+
min.getElement().unblock();
117+
waiting.removeMin();
118+
} else
119+
break;
120+
} while (!waiting.isEmpty());
121+
if (waiting.isEmpty()) waiting = null;
122+
}
123+
}
124+
}
125+
126+
}

0 commit comments

Comments
 (0)