Skip to content

Commit 1432e1f

Browse files
committed
Return ByteFlow directly from factory method
1 parent ca4fbeb commit 1432e1f

File tree

1 file changed

+15
-22
lines changed
  • flows/src/main/java/com/softwaremill/jox/flows

1 file changed

+15
-22
lines changed

flows/src/main/java/com/softwaremill/jox/flows/Flows.java

Lines changed: 15 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package com.softwaremill.jox.flows;
22

3-
import static com.softwaremill.jox.structured.Scopes.unsupervised;
3+
import com.softwaremill.jox.*;
4+
import com.softwaremill.jox.flows.Flow.ByteFlow;
5+
import com.softwaremill.jox.structured.Fork;
6+
import com.softwaremill.jox.structured.Scopes;
7+
import com.softwaremill.jox.structured.ThrowingConsumer;
48

59
import java.io.IOException;
610
import java.io.InputStream;
@@ -11,11 +15,7 @@
1115
import java.nio.file.Path;
1216
import java.nio.file.StandardOpenOption;
1317
import java.time.Duration;
14-
import java.util.ArrayList;
15-
import java.util.Iterator;
16-
import java.util.List;
17-
import java.util.Map;
18-
import java.util.Optional;
18+
import java.util.*;
1919
import java.util.concurrent.CompletableFuture;
2020
import java.util.concurrent.Flow.Publisher;
2121
import java.util.concurrent.Flow.Subscriber;
@@ -25,14 +25,7 @@
2525
import java.util.function.Function;
2626
import java.util.function.Supplier;
2727

28-
import com.softwaremill.jox.Channel;
29-
import com.softwaremill.jox.ChannelClosed;
30-
import com.softwaremill.jox.ChannelDone;
31-
import com.softwaremill.jox.ChannelError;
32-
import com.softwaremill.jox.Source;
33-
import com.softwaremill.jox.structured.Fork;
34-
import com.softwaremill.jox.structured.Scopes;
35-
import com.softwaremill.jox.structured.ThrowingConsumer;
28+
import static com.softwaremill.jox.structured.Scopes.unsupervised;
3629

3730
public final class Flows {
3831

@@ -86,15 +79,15 @@ public static <T> Flow<T> fromValues(T... ts) {
8679
* Creates a ByteFlow from given {@link ByteChunk}s. Each ByteChunk is emitted in order.
8780
* Flow can be run multiple times.
8881
*/
89-
public static Flow.ByteFlow fromByteChunks(ByteChunk... chunks) {
82+
public static ByteFlow fromByteChunks(ByteChunk... chunks) {
9083
return fromValues(chunks).toByteFlow();
9184
}
9285

9386
/**
9487
* Creates a ByteFlow from given byte[]. Each byte[] is emitted in order.
9588
* Flow can be run multiple times.
9689
*/
97-
public static Flow.ByteFlow fromByteArrays(byte[]... byteArrays) {
90+
public static ByteFlow fromByteArrays(byte[]... byteArrays) {
9891
return fromValues(byteArrays).toByteFlow();
9992
}
10093

@@ -492,14 +485,14 @@ public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, bo
492485
}
493486

494487
/**
495-
* Converts a {@link java.io.InputStream} into a `Flow<ByteChunk>`.
488+
* Converts a {@link java.io.InputStream} into {@link ByteFlow}.
496489
*
497490
* @param is
498491
* an `InputStream` to read bytes from.
499492
* @param chunkSize
500493
* maximum number of bytes to read from the underlying `InputStream` before emitting a new chunk.
501494
*/
502-
public static Flow<ByteChunk> fromInputStream(InputStream is, int chunkSize) {
495+
public static ByteFlow fromInputStream(InputStream is, int chunkSize) {
503496
return usingEmit(emit -> {
504497
try (is) {
505498
while (true) {
@@ -514,18 +507,18 @@ public static Flow<ByteChunk> fromInputStream(InputStream is, int chunkSize) {
514507
}
515508
}
516509
}
517-
});
510+
}).toByteFlow();
518511
}
519512

520513
/**
521-
* Creates a flow that emits {@link ByteChunk} read from a file.
514+
* Creates a {@link ByteFlow} read from a file.
522515
*
523516
* @param path
524517
* path the file to read from.
525518
* @param chunkSize
526519
* maximum number of bytes to read from the file before emitting a new chunk.
527520
*/
528-
public static Flow<ByteChunk> fromFile(Path path, int chunkSize) {
521+
public static ByteFlow fromFile(Path path, int chunkSize) {
529522
return usingEmit(emit -> {
530523
if (Files.isDirectory(path)) {
531524
throw new IOException("Path %s is a directory".formatted(path));
@@ -553,6 +546,6 @@ public static Flow<ByteChunk> fromFile(Path path, int chunkSize) {
553546
} finally {
554547
fileChannel.close();
555548
}
556-
});
549+
}).toByteFlow();
557550
}
558551
}

0 commit comments

Comments
 (0)