Skip to content

Commit 8d710be

Browse files
committed
fix OutputToInput
1 parent a5c29a0 commit 8d710be

File tree

2 files changed

+56
-31
lines changed

2 files changed

+56
-31
lines changed

net.lecousin.core/src/main/java/net/lecousin/framework/concurrent/async/Async.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public final void unblock() {
9898
Threading.debugListenerCall(listener, System.nanoTime() - start);
9999
}
100100
synchronized (this) {
101-
if (listenersInline.isEmpty()) {
101+
if (listenersInline == null || listenersInline.isEmpty()) {
102102
listenersInline = null;
103103
listeners = null;
104104
this.notifyAll();

net.lecousin.core/src/main/java/net/lecousin/framework/io/out2in/OutputToInput.java

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,15 @@ public <T extends IO.Writable.Seekable & IO.Readable.Seekable> OutputToInput(T i
3232
private IO io;
3333
private String sourceDescription;
3434
private boolean eof = false;
35-
private LockPoint<IOException> lock = new LockPoint<>();
35+
private Async<IOException> waitForData = new Async<>();
3636
private long writePos = 0;
3737
private long readPos = 0;
3838
private LockPoint<NoException> lockIO = new LockPoint<>();
3939

4040
@Override
4141
protected IAsync<IOException> closeUnderlyingResources() {
4242
eof = true;
43-
lock.error(new EOFException());
43+
waitForData.error(new EOFException());
4444
return io.closeAsync();
4545
}
4646

@@ -74,12 +74,12 @@ public IO getWrappedIO() {
7474
@Override
7575
public void endOfData() {
7676
eof = true;
77-
lock.error(new EOFException());
77+
waitForData.error(new EOFException());
7878
}
7979

8080
@Override
8181
public void signalErrorBeforeEndOfData(IOException error) {
82-
lock.error(error);
82+
waitForData.error(error);
8383
lockIO.unlock();
8484
}
8585

@@ -107,7 +107,7 @@ public int writeSync(ByteBuffer buffer) throws IOException {
107107
nb = ((IO.Writable.Seekable)io).writeSync(writePos, buffer);
108108
writePos += nb;
109109
lockIO.unlock();
110-
lock.unlock();
110+
waitForData.unblock();
111111
return nb;
112112
}
113113

@@ -120,15 +120,15 @@ public AsyncSupplier<Integer, IOException> writeAsync(ByteBuffer buffer, Consume
120120
if (param.getValue1() != null) {
121121
writePos += param.getValue1().intValue();
122122
lockIO.unlock();
123-
lock.unlock();
123+
waitForData.unblock();
124124
if (ondone != null) ondone.accept(param);
125125
} else {
126126
lockIO.unlock();
127127
if (ondone != null) ondone.accept(param);
128-
lock.error(param.getValue2());
128+
waitForData.error(param.getValue2());
129129
}
130130
});
131-
write.onCancel(lock::cancel);
131+
write.onCancel(waitForData::cancel);
132132
write.forward(result);
133133
return null;
134134
})).start();
@@ -138,21 +138,25 @@ public AsyncSupplier<Integer, IOException> writeAsync(ByteBuffer buffer, Consume
138138
@Override
139139
public IAsync<IOException> canStartReading() {
140140
if (eof) return new Async<>(true);
141-
if (lock.hasError()) return lock;
141+
if (waitForData.hasError()) return waitForData;
142142
if (readPos < writePos) return new Async<>(true);
143-
return lock;
143+
return waitForData;
144144
}
145145

146146
@Override
147147
@SuppressWarnings("squid:S2589") // eof may change in a concurrent operation
148148
public int readSync(long pos, ByteBuffer buffer) throws IOException {
149-
if (lock.hasError() && !eof)
150-
throw new OutputToInputTransferException(lock.getError());
149+
if (waitForData.hasError() && !eof)
150+
throw new OutputToInputTransferException(waitForData.getError());
151151
while (pos >= writePos) {
152152
if (eof) return -1;
153-
if (lock.hasError() && !eof)
154-
throw new OutputToInputTransferException(lock.getError());
155-
lock.lock();
153+
if (waitForData.hasError() && !eof)
154+
throw new OutputToInputTransferException(waitForData.getError());
155+
synchronized (waitForData) {
156+
if (pos >= writePos && waitForData.isDone())
157+
waitForData.reset();
158+
}
159+
waitForData.block(0);
156160
}
157161
int nb;
158162
lockIO.lock();
@@ -194,8 +198,8 @@ public AsyncSupplier<Integer, IOException> readAsync(ByteBuffer buffer, Consumer
194198
@Override
195199
@SuppressWarnings("squid:S2589") // may change in a concurrent operation
196200
public AsyncSupplier<Integer, IOException> readAsync(long pos, ByteBuffer buffer, Consumer<Pair<Integer,IOException>> ondone) {
197-
if (lock.hasError() && !eof) {
198-
IOException e = new OutputToInputTransferException(lock.getError());
201+
if (waitForData.hasError() && !eof) {
202+
IOException e = new OutputToInputTransferException(waitForData.getError());
199203
if (ondone != null) ondone.accept(new Pair<>(null, e));
200204
return new AsyncSupplier<>(null, e);
201205
}
@@ -205,7 +209,11 @@ public AsyncSupplier<Integer, IOException> readAsync(long pos, ByteBuffer buffer
205209
return new AsyncSupplier<>(Integer.valueOf(-1), null);
206210
}
207211
AsyncSupplier<Integer, IOException> result = new AsyncSupplier<>();
208-
lock.thenStart(operation(
212+
synchronized (waitForData) {
213+
if (pos >= writePos && waitForData.isDone())
214+
waitForData.reset();
215+
}
216+
waitForData.thenStart(operation(
209217
taskSyncToAsync("OutputToInput.readAsync", result, ondone, () -> Integer.valueOf(readSync(pos, buffer)))), true);
210218
return result;
211219
}
@@ -263,17 +271,21 @@ public long skipSync(long n) throws IOException {
263271
return n;
264272
}
265273
if (readPos + n > writePos) {
266-
if (lock.hasError() && !eof)
267-
throw new OutputToInputTransferException(lock.getError());
274+
if (waitForData.hasError() && !eof)
275+
throw new OutputToInputTransferException(waitForData.getError());
268276
while (readPos + n > writePos) {
269277
if (eof) {
270278
n = writePos - readPos;
271279
readPos = writePos;
272280
return n;
273281
}
274-
if (lock.hasError() && !eof)
275-
throw new OutputToInputTransferException(lock.getError());
276-
lock.lock();
282+
if (waitForData.hasError() && !eof)
283+
throw new OutputToInputTransferException(waitForData.getError());
284+
synchronized (waitForData) {
285+
if (readPos + n > writePos && waitForData.isDone())
286+
waitForData.reset();
287+
}
288+
waitForData.block(0);
277289
}
278290
}
279291
readPos += n;
@@ -300,7 +312,11 @@ public AsyncSupplier<Long, IOException> skipAsync(long n, Consumer<Pair<Long,IOE
300312
return new AsyncSupplier<>(Long.valueOf(m), null);
301313
}
302314
AsyncSupplier<Long, IOException> result = new AsyncSupplier<>();
303-
lock.thenStart(operation(taskSyncToAsync("OutputToInput.skipAsync", result, ondone, () -> Long.valueOf(skipSync(n)))), true);
315+
synchronized (waitForData) {
316+
if (readPos + n > writePos && waitForData.isDone())
317+
waitForData.reset();
318+
}
319+
waitForData.thenStart(operation(taskSyncToAsync("OutputToInput.skipAsync", result, ondone, () -> Long.valueOf(skipSync(n)))), true);
304320
return result;
305321
}
306322

@@ -320,19 +336,24 @@ public long seekSync(SeekType type, long move) throws IOException {
320336
skipSync(move);
321337
return readPos;
322338
default: //case FROM_END:
323-
while (!eof && !lock.hasError()) {
324-
lock.lock();
339+
while (!eof && !waitForData.hasError()) {
340+
synchronized (waitForData) {
341+
if (!eof && waitForData.isDone())
342+
waitForData.reset();
343+
}
344+
waitForData.block(0);
325345
}
326346
if (eof) {
327347
readPos = writePos;
328348
skipSync(-move);
329349
return readPos;
330350
}
331-
throw new OutputToInputTransferException(lock.getError());
351+
throw new OutputToInputTransferException(waitForData.getError());
332352
}
333353
}
334354

335355
@Override
356+
@SuppressWarnings("java:S3776") // complexity
336357
public AsyncSupplier<Long, IOException> seekAsync(SeekType type, long move, Consumer<Pair<Long,IOException>> ondone) {
337358
AsyncSupplier<Long, IOException> res = new AsyncSupplier<>();
338359
switch (type) {
@@ -350,8 +371,8 @@ public AsyncSupplier<Long, IOException> seekAsync(SeekType type, long move, Cons
350371
}, res);
351372
return res;
352373
case FROM_END:
353-
if (lock.hasError() && !eof)
354-
return IOUtil.error(new OutputToInputTransferException(lock.getError()), ondone);
374+
if (waitForData.hasError() && !eof)
375+
return IOUtil.error(new OutputToInputTransferException(waitForData.getError()), ondone);
355376
if (eof) {
356377
if (move <= 0)
357378
readPos = writePos;
@@ -361,7 +382,11 @@ public AsyncSupplier<Long, IOException> seekAsync(SeekType type, long move, Cons
361382
return IOUtil.success(Long.valueOf(readPos), ondone);
362383
}
363384
AsyncSupplier<Long, IOException> result = new AsyncSupplier<>();
364-
lock.thenStart(operation(Task.cpu("OutputToInput.seekAsync", io.getPriority(), t -> {
385+
synchronized (waitForData) {
386+
if (!eof && waitForData.isDone())
387+
waitForData.reset();
388+
}
389+
waitForData.thenStart(operation(Task.cpu("OutputToInput.seekAsync", io.getPriority(), t -> {
365390
try {
366391
Long nb = Long.valueOf(seekSync(type, move));
367392
if (ondone != null) ondone.accept(new Pair<>(nb, null));

0 commit comments

Comments
 (0)