Skip to content

Commit 9bf0b2a

Browse files
committed
Return ByteFlow directly from factory method
1 parent ca4fbeb commit 9bf0b2a

File tree

1 file changed

+11
-10
lines changed
  • flows/src/main/java/com/softwaremill/jox/flows

1 file changed

+11
-10
lines changed

flows/src/main/java/com/softwaremill/jox/flows/Flows.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.softwaremill.jox.ChannelDone;
3131
import com.softwaremill.jox.ChannelError;
3232
import com.softwaremill.jox.Source;
33+
import com.softwaremill.jox.flows.Flow.ByteFlow;
3334
import com.softwaremill.jox.structured.Fork;
3435
import com.softwaremill.jox.structured.Scopes;
3536
import com.softwaremill.jox.structured.ThrowingConsumer;
@@ -86,15 +87,15 @@ public static <T> Flow<T> fromValues(T... ts) {
8687
* Creates a ByteFlow from given {@link ByteChunk}s. Each ByteChunk is emitted in order.
8788
* Flow can be run multiple times.
8889
*/
89-
public static Flow.ByteFlow fromByteChunks(ByteChunk... chunks) {
90+
public static ByteFlow fromByteChunks(ByteChunk... chunks) {
9091
return fromValues(chunks).toByteFlow();
9192
}
9293

9394
/**
9495
* Creates a ByteFlow from given byte[]. Each byte[] is emitted in order.
9596
* Flow can be run multiple times.
9697
*/
97-
public static Flow.ByteFlow fromByteArrays(byte[]... byteArrays) {
98+
public static ByteFlow fromByteArrays(byte[]... byteArrays) {
9899
return fromValues(byteArrays).toByteFlow();
99100
}
100101

@@ -492,15 +493,15 @@ public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, bo
492493
}
493494

494495
/**
495-
* Converts a {@link java.io.InputStream} into a `Flow<ByteChunk>`.
496+
* Converts a {@link java.io.InputStream} into {@link ByteFlow}.
496497
*
497498
* @param is
498499
* an `InputStream` to read bytes from.
499500
* @param chunkSize
500501
* maximum number of bytes to read from the underlying `InputStream` before emitting a new chunk.
501502
*/
502-
public static Flow<ByteChunk> fromInputStream(InputStream is, int chunkSize) {
503-
return usingEmit(emit -> {
503+
public static ByteFlow fromInputStream(InputStream is, int chunkSize) {
504+
return Flows.<ByteChunk>usingEmit(emit -> {
504505
try (is) {
505506
while (true) {
506507
byte[] buf = new byte[chunkSize];
@@ -514,19 +515,19 @@ public static Flow<ByteChunk> fromInputStream(InputStream is, int chunkSize) {
514515
}
515516
}
516517
}
517-
});
518+
}).toByteFlow();
518519
}
519520

520521
/**
521-
* Creates a flow that emits {@link ByteChunk} read from a file.
522+
* Creates a {@link ByteFlow} read from a file.
522523
*
523524
* @param path
524525
* path the file to read from.
525526
* @param chunkSize
526527
* maximum number of bytes to read from the file before emitting a new chunk.
527528
*/
528-
public static Flow<ByteChunk> fromFile(Path path, int chunkSize) {
529-
return usingEmit(emit -> {
529+
public static ByteFlow fromFile(Path path, int chunkSize) {
530+
return Flows.<ByteChunk>usingEmit(emit -> {
530531
if (Files.isDirectory(path)) {
531532
throw new IOException("Path %s is a directory".formatted(path));
532533
}
@@ -553,6 +554,6 @@ public static Flow<ByteChunk> fromFile(Path path, int chunkSize) {
553554
} finally {
554555
fileChannel.close();
555556
}
556-
});
557+
}).toByteFlow();
557558
}
558559
}

0 commit comments

Comments
 (0)