3030import com .softwaremill .jox .ChannelDone ;
3131import com .softwaremill .jox .ChannelError ;
3232import com .softwaremill .jox .Source ;
33+ import com .softwaremill .jox .flows .Flow .ByteFlow ;
3334import com .softwaremill .jox .structured .Fork ;
3435import com .softwaremill .jox .structured .Scopes ;
3536import com .softwaremill .jox .structured .ThrowingConsumer ;
@@ -86,15 +87,15 @@ public static <T> Flow<T> fromValues(T... ts) {
8687 * Creates a ByteFlow from given {@link ByteChunk}s. Each ByteChunk is emitted in order.
8788 * Flow can be run multiple times.
8889 */
89- public static Flow . ByteFlow fromByteChunks (ByteChunk ... chunks ) {
90+ public static ByteFlow fromByteChunks (ByteChunk ... chunks ) {
9091 return fromValues (chunks ).toByteFlow ();
9192 }
9293
9394 /**
9495 * Creates a ByteFlow from given byte[]. Each byte[] is emitted in order.
9596 * Flow can be run multiple times.
9697 */
97- public static Flow . ByteFlow fromByteArrays (byte []... byteArrays ) {
98+ public static ByteFlow fromByteArrays (byte []... byteArrays ) {
9899 return fromValues (byteArrays ).toByteFlow ();
99100 }
100101
@@ -492,15 +493,15 @@ public static <T> Flow<T> interleaveAll(List<Flow<T>> flows, int segmentSize, bo
492493 }
493494
494495 /**
495- * Converts a {@link java.io.InputStream} into a `Flow<ByteChunk>` .
496+ * Converts a {@link java.io.InputStream} into {@link ByteFlow} .
496497 *
497498 * @param is
498499 * an `InputStream` to read bytes from.
499500 * @param chunkSize
500501 * maximum number of bytes to read from the underlying `InputStream` before emitting a new chunk.
501502 */
502- public static Flow < ByteChunk > fromInputStream (InputStream is , int chunkSize ) {
503- return usingEmit (emit -> {
503+ public static ByteFlow fromInputStream (InputStream is , int chunkSize ) {
504+ return Flows .< ByteChunk > usingEmit (emit -> {
504505 try (is ) {
505506 while (true ) {
506507 byte [] buf = new byte [chunkSize ];
@@ -514,19 +515,19 @@ public static Flow<ByteChunk> fromInputStream(InputStream is, int chunkSize) {
514515 }
515516 }
516517 }
517- });
518+ }). toByteFlow () ;
518519 }
519520
520521 /**
521- * Creates a flow that emits {@link ByteChunk } read from a file.
522+ * Creates a {@link ByteFlow } read from a file.
522523 *
523524 * @param path
524525 * path the file to read from.
525526 * @param chunkSize
526527 * maximum number of bytes to read from the file before emitting a new chunk.
527528 */
528- public static Flow < ByteChunk > fromFile (Path path , int chunkSize ) {
529- return usingEmit (emit -> {
529+ public static ByteFlow fromFile (Path path , int chunkSize ) {
530+ return Flows .< ByteChunk > usingEmit (emit -> {
530531 if (Files .isDirectory (path )) {
531532 throw new IOException ("Path %s is a directory" .formatted (path ));
532533 }
@@ -553,6 +554,6 @@ public static Flow<ByteChunk> fromFile(Path path, int chunkSize) {
553554 } finally {
554555 fileChannel .close ();
555556 }
556- });
557+ }). toByteFlow () ;
557558 }
558559}
0 commit comments