Skip to content

Commit 2cdeb73

Browse files
emil-baradamw
andauthored
Implement io flows methods (#78)
Co-authored-by: Adam Warski <adam@warski.org>
1 parent f3024b2 commit 2cdeb73

File tree

4 files changed

+544
-2
lines changed

4 files changed

+544
-2
lines changed
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.softwaremill.jox.flows;
2+
3+
import java.util.Iterator;
4+
import java.util.NoSuchElementException;
5+
6+
class ByteArrayIterator implements Iterator<Byte> {
7+
private final byte[] array;
8+
private int position = 0;
9+
10+
public ByteArrayIterator(byte[] array) {
11+
this.array = array;
12+
}
13+
14+
@Override
15+
public boolean hasNext() {
16+
return position < array.length;
17+
}
18+
19+
@Override
20+
public Byte next() {
21+
if (!hasNext()) {
22+
throw new NoSuchElementException();
23+
}
24+
return array[position++];
25+
}
26+
27+
public int available() {
28+
return array.length - position;
29+
}
30+
31+
public static ByteArrayIterator empty() {
32+
return new ByteArrayIterator(new byte[0]);
33+
}
34+
}

flows/src/main/java/com/softwaremill/jox/flows/Flow.java

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,17 @@
88
import static com.softwaremill.jox.structured.Scopes.unsupervised;
99
import static java.lang.Thread.sleep;
1010

11+
import java.io.IOException;
12+
import java.io.InputStream;
13+
import java.io.OutputStream;
1114
import java.nio.ByteBuffer;
1215
import java.nio.charset.Charset;
1316
import java.nio.charset.StandardCharsets;
17+
import java.nio.channels.FileChannel;
18+
import java.nio.channels.SeekableByteChannel;
19+
import java.nio.file.Files;
20+
import java.nio.file.Path;
21+
import java.nio.file.StandardOpenOption;
1422
import java.time.Duration;
1523
import java.util.ArrayList;
1624
import java.util.Arrays;
@@ -592,7 +600,6 @@ case ChannelError(Throwable cause):
592600
if (!buffer.isEmpty()) {
593601
sendBufferAndCleanupCost.call();
594602
// cancel existing timeout and start a new one
595-
if (timeoutFork != null) timeoutFork.cancelNow();
596603
timeoutFork = forkTimeout(scope, timerChannel, duration);
597604
}
598605
yield true;
@@ -1290,6 +1297,127 @@ public Flow<T> alsoToTap(Sink<T> other) {
12901297
});
12911298
}
12921299

1300+
/**
1301+
* Runs the flow into a {@link java.io.InputStream}.
1302+
* <p>
1303+
* Must be run within a concurrency scope, as under the hood the flow is run in the background.
1304+
* <p>
1305+
* Buffer capacity can be set via scoped value {@link Channel#BUFFER_SIZE}. If not specified in scope, {@link Channel#DEFAULT_BUFFER_SIZE} is used.
1306+
*/
1307+
public InputStream runToInputStream(UnsupervisedScope scope) {
1308+
Source<byte[]> ch = this
1309+
.map(t -> {
1310+
if (t instanceof byte[] bytes) {
1311+
return bytes;
1312+
} else {
1313+
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
1314+
}
1315+
})
1316+
.runToChannel(scope);
1317+
1318+
return new InputStream() {
1319+
private ByteArrayIterator currentChunk = ByteArrayIterator.empty();
1320+
1321+
@Override
1322+
public int read() {
1323+
try {
1324+
if (!currentChunk.hasNext()) {
1325+
Object result = ch.receiveOrClosed();
1326+
if (result instanceof ChannelDone) {
1327+
return -1;
1328+
} else if (result instanceof ChannelError error) {
1329+
throw error.toException();
1330+
} else {
1331+
byte[] chunk = (byte[]) result;
1332+
currentChunk = new ByteArrayIterator(chunk);
1333+
}
1334+
}
1335+
return currentChunk.next() & 0xff; // Convert to unsigned
1336+
} catch (InterruptedException e) {
1337+
throw new RuntimeException(e);
1338+
}
1339+
}
1340+
1341+
@Override
1342+
public int available() {
1343+
return currentChunk.available();
1344+
}
1345+
};
1346+
}
1347+
1348+
/**
1349+
* Writes content of this flow to an {@link java.io.OutputStream}.
1350+
*
1351+
* @param outputStream
1352+
* Target `OutputStream` to write to. Will be closed after finishing the process or on error.
1353+
*/
1354+
public void runToOutputStream(OutputStream outputStream) throws Exception {
1355+
try {
1356+
last.run(t -> {
1357+
if (t instanceof byte[] chunk) {
1358+
outputStream.write(chunk);
1359+
} else {
1360+
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
1361+
}
1362+
});
1363+
close(outputStream, null);
1364+
} catch (Exception e) {
1365+
close(outputStream, e);
1366+
throw e;
1367+
}
1368+
}
1369+
1370+
/** Writes content of this flow to a file.
1371+
*
1372+
* @param path
1373+
* Path to the target file. If not exists, it will be created.s
1374+
*/
1375+
public void runToFile(Path path) throws Exception {
1376+
if (Files.isDirectory(path)) {
1377+
throw new IOException("Path %s is a directory".formatted(path));
1378+
}
1379+
final SeekableByteChannel channel = getFileChannel(path);
1380+
try {
1381+
map(t -> {
1382+
if (t instanceof byte[] chunk) {
1383+
return chunk;
1384+
} else {
1385+
throw new IllegalArgumentException("requirement failed: method can be called only on flow containing byte[]");
1386+
}
1387+
}).runForeach(chunk -> {
1388+
try {
1389+
channel.write(ByteBuffer.wrap(chunk));
1390+
} catch (IOException e) {
1391+
throw new RuntimeException(e);
1392+
}
1393+
});
1394+
close(channel, null);
1395+
} catch (Exception t) {
1396+
close(channel, t);
1397+
throw t;
1398+
}
1399+
}
1400+
1401+
private SeekableByteChannel getFileChannel(Path path) throws IOException {
1402+
try {
1403+
return FileChannel.open(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
1404+
} catch (UnsupportedOperationException e) {
1405+
// Some file systems don't support file channels
1406+
return Files.newByteChannel(path, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
1407+
}
1408+
}
1409+
1410+
private void close(AutoCloseable closeable, Exception cause) throws Exception {
1411+
try {
1412+
closeable.close();
1413+
} catch (IOException e) {
1414+
if (cause != null) {
1415+
cause.addSuppressed(e);
1416+
}
1417+
throw cause != null ? cause : e;
1418+
}
1419+
}
1420+
12931421
/** Converts this {@link Flow} into a {@link Publisher}. The flow is run every time the publisher is subscribed to.
12941422
* <p>
12951423
* Must be run within a concurrency scope, as upon subscribing, a fork is created to run the publishing process. Hence, the scope should

flows/src/test/java/com/softwaremill/jox/flows/FlowGroupedTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ void groupedWithin_shouldGroupFirstBatchOfElementsDueToTimeoutAndSecondBatchDueT
175175
// when
176176
var elementsWithEmittedTimeOffset = Flows.fromSource(c)
177177
.groupedWithin(3, Duration.ofMillis(100))
178-
.map(s -> new AbstractMap.SimpleEntry<>(s, Duration.ofNanos(System.nanoTime() - start)))
178+
.map(s -> Map.entry(s, Duration.ofNanos(System.nanoTime() - start)))
179179
.runToList();
180180

181181
// then

0 commit comments

Comments
 (0)