Skip to content

Commit 98dad87

Browse files
committed
fix BufferedIO + allow to configure LCCore before to start it
1 parent e1dc877 commit 98dad87

File tree

9 files changed

+53
-70
lines changed

9 files changed

+53
-70
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/application/Application.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ public static ISynchronizationPoint<Exception> start(
280280
app.loggerFactory = new LoggerFactory(app);
281281

282282
// init LCCore with this application
283-
LCCore.init(app);
283+
LCCore.start(app);
284284

285285
JoinPoint<Exception> loading = new JoinPoint<>();
286286

net.lecousin.core/src/main/java/net/lecousin/framework/application/LCCore.java

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,29 @@ public static interface Environment {
6464
}
6565

6666
private static Environment instance = null;
67+
private static boolean started = false;
6768

6869
/** Return the environment. */
6970
public static Environment get() {
7071
return instance;
7172
}
7273

73-
/** Initialization. */
74-
public static synchronized void init(Environment env) {
74+
/** Set the environment. */
75+
public static void set(Environment env) throws IllegalStateException {
7576
if (instance != null) throw new IllegalStateException("LCCore environment already exists");
7677
instance = env;
77-
78+
}
79+
80+
/** Return true if the environment is already started. */
81+
public static boolean isStarted() {
82+
return started;
83+
}
84+
85+
/** Initialization. */
86+
public static synchronized void start() {
87+
if (started) throw new IllegalStateException("LCCore environment already started");
88+
started = true;
89+
7890
// init logging system if not specified
7991
if (System.getProperty("org.apache.commons.logging.Log") == null) {
8092
try {
@@ -102,13 +114,14 @@ public Void run() {
102114
}.start();
103115
}
104116

105-
static synchronized void init(Application app) {
117+
static synchronized void start(Application app) {
106118
if (instance == null) {
107119
StandaloneLCCore env = new StandaloneLCCore();
108-
env.add(app);
109-
init(env);
110-
} else
111-
instance.add(app);
120+
set(env);
121+
}
122+
instance.add(app);
123+
if (!isStarted())
124+
start();
112125
}
113126

114127
public static Application getApplication() {

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/BufferedIO.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ protected BufferedIO(
6363
this.bufferSize = bufferSize;
6464
this.size = size;
6565
this.startReadSecondBufferWhenFirstBufferFullyRead = startReadSecondBufferWhenFirstBufferFullyRead;
66-
int nbBuffers = (int)((size - firstBufferSize) / bufferSize) + 2;
66+
int nbBuffers = size <= firstBufferSize ? 1 : (int)((size - firstBufferSize) / bufferSize) + 2;
6767
if (nbBuffers <= 10)
6868
buffers = new ArrayList<>(nbBuffers);
6969
else {
@@ -128,6 +128,7 @@ public void run() {
128128
return;
129129
}
130130
buffer.len = loading.getResult().intValue();
131+
if (buffer.len < 0) buffer.len = 0;
131132
buffer.lastRead = System.currentTimeMillis();
132133
manager.newBuffer(buffer);
133134
buffer.loading.unblock();
@@ -1032,12 +1033,17 @@ public void run() {
10321033
while (firstIndex > buffers.size())
10331034
buffers.add(new Buffer(this));
10341035
while (firstIndex <= lastIndex) {
1035-
Buffer b = new Buffer(this);
1036-
b.buffer = new byte[firstIndex > 0 ? bufferSize : firstBufferSize];
1036+
Buffer b;
1037+
if (firstIndex >= buffers.size()) {
1038+
b = new Buffer(this);
1039+
newBuffers.add(b);
1040+
buffers.add(b);
1041+
} else
1042+
b = buffers.get(firstIndex);
1043+
if (b.buffer == null)
1044+
b.buffer = new byte[firstIndex > 0 ? bufferSize : firstBufferSize];
10371045
b.len = firstIndex < lastIndex ? b.buffer.length : (int)(newSize - getBufferStart(firstIndex));
10381046
b.lastRead = System.currentTimeMillis();
1039-
newBuffers.add(b);
1040-
buffers.add(b);
10411047
firstIndex++;
10421048
}
10431049
}
@@ -1259,7 +1265,7 @@ protected int writeSync(long pos, ByteBuffer buf, int alreadyDone) throws IOExce
12591265
if (p < size && (bufferIndex != 0 || startReadSecondBufferWhenFirstBufferFullyRead)) {
12601266
int nextIndex = getBufferIndex(p);
12611267
if (nextIndex != bufferIndex) loadBuffer(nextIndex);
1262-
} else
1268+
} else if (p > size)
12631269
size = p;
12641270
if (buf.remaining() == 0) return alreadyDone + len;
12651271
return writeSync(p, buf, alreadyDone + len);
@@ -1308,7 +1314,7 @@ protected void write(byte[] buf, int offset, int length) throws IOException {
13081314
if (pos < size && (bufferIndex != 0 || startReadSecondBufferWhenFirstBufferFullyRead)) {
13091315
int nextIndex = getBufferIndex(pos);
13101316
if (nextIndex != bufferIndex) loadBuffer(nextIndex);
1311-
} else
1317+
} else if (pos > size)
13121318
size = pos;
13131319
} while (length > 0);
13141320
}
@@ -1351,7 +1357,7 @@ protected void write(byte b) throws IOException {
13511357
if (pos < size && (bufferIndex != 0 || startReadSecondBufferWhenFirstBufferFullyRead)) {
13521358
int nextIndex = getBufferIndex(pos);
13531359
if (nextIndex != bufferIndex) loadBuffer(nextIndex);
1354-
} else
1360+
} else if (pos > size)
13551361
size = pos;
13561362
}
13571363
}

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/BufferingManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ private BufferingManager() {
3434
background.start();
3535
}
3636

37-
static synchronized BufferingManager get() {
37+
public static synchronized BufferingManager get() {
3838
Application app = LCCore.getApplication();
3939
BufferingManager bm = app.getInstance(BufferingManager.class);
4040
if (bm == null) {

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/PreBufferedReadable.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,9 @@ public void run() {
291291
read += buffer.remaining();
292292
if (size > 0 && read == size)
293293
endReached = true;
294+
if (endReached && size > 0 && read < size) {
295+
error = new IOException("Unexpected end after " + read + " bytes read, known size is " + size);
296+
}
294297
if (dataReady != null) {
295298
SynchronizationPoint<NoException> dr = dataReady;
296299
dataReady = null;
@@ -499,6 +502,8 @@ public long skipSync(long n) throws IOException {
499502
if (reusableBuffers == null) {
500503
// we reach the end
501504
endReached = true;
505+
if (size > 0 && read < size)
506+
error = new IOException("Unexpected end after " + read + " bytes read, known size is " + size);
502507
if (dataReady != null) dataReady.unblock();
503508
return skipped;
504509
}
@@ -509,7 +514,9 @@ public long skipSync(long n) throws IOException {
509514
if (endReached) return skipped;
510515
if (nextReadTask == null && reusableBuffers != null) {
511516
// we cancelled all buffers, let's do a move
512-
skipped += src.skipSync(n);
517+
long n2 = src.skipSync(n);
518+
skipped += n2;
519+
read += n2;
513520
// restart reading
514521
stopReading = false;
515522
moveNextBuffer(true);
@@ -629,6 +636,8 @@ public void run() {
629636
if (!endReached && !reusableBuffers.isEmpty() && !stopReading)
630637
nextRead();
631638
}
639+
if (endReached && size > 0 && read < size)
640+
error = new IOException("Unexpected end after " + read + " bytes read, known size is " + size);
632641
if (dataReady != null) {
633642
sp = dataReady;
634643
dataReady = null;

net.lecousin.core/src/main/java/net/lecousin/framework/io/buffering/ReadableToSeekable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ public ReadableToSeekable(IO.Readable io, int bufferSize) throws IOException {
5454
file.deleteOnExit();
5555
@SuppressWarnings("resource")
5656
FileIO.ReadWrite fio = new FileIO.ReadWrite(file, io.getPriority());
57-
buffered = new BufferedIO.ReadWrite(fio, 512, bufferSize, 0L, true);
57+
buffered = new BufferedIO.ReadWrite(fio, 512, bufferSize, 0L, false);
5858
readNextBuffer();
5959
}
6060

net.lecousin.core/src/test/java/net/lecousin/framework/core/test/io/TestReadable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,6 @@
44
import java.io.IOException;
55
import java.nio.ByteBuffer;
66

7-
import org.junit.Assert;
8-
import org.junit.Assume;
9-
import org.junit.Test;
10-
117
import net.lecousin.framework.collections.ArrayUtil;
128
import net.lecousin.framework.concurrent.synch.AsyncWork;
139
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
@@ -21,6 +17,10 @@
2117
import net.lecousin.framework.util.Pair;
2218
import net.lecousin.framework.util.RunnableWithParameter;
2319

20+
import org.junit.Assert;
21+
import org.junit.Assume;
22+
import org.junit.Test;
23+
2424
public abstract class TestReadable extends TestIO.UsingGeneratedTestFiles {
2525

2626
protected TestReadable(File testFile, byte[] testBuf, int nbBuf) {

net.lecousin.core/src/test/java/net/lecousin/framework/core/test/io/TestReadableSeekable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public void run() {
122122
return;
123123
}
124124
if (read.get().getResult().intValue() != 1) {
125-
sp.error(new Exception("Unexpected end of stream at " + (offset.get()*testBuf.length+j.get())));
125+
sp.error(new Exception("Unexpected end of stream at " + (offset.get()*testBuf.length+j.get()) + ": 1 byte expected, " + read.get().getResult().intValue() + " read"));
126126
return;
127127
}
128128
if (b[0] != testBuf[j.get()]) {

net.lecousin.core/src/test/java/net/lecousin/framework/core/tests/io/TestReadableToSeekableReadableSeekableUsingOutputToInput.java

Lines changed: 0 additions & 45 deletions
This file was deleted.

0 commit comments

Comments
 (0)