Skip to content

Commit 6a3eea9

Browse files
authored
Update README.md (#86)
1 parent ef1702e commit 6a3eea9

File tree

1 file changed

+283
-1
lines changed

1 file changed

+283
-1
lines changed

README.md

Lines changed: 283 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,26 @@ class Demo2 {
119119
}
120120
```
121121

122+
There is also a possibility to pass `Channel`'s buffer size via `ScopedValue`.
123+
The channel must be created via `Channel.withScopedBufferSize()` to get the value.
124+
125+
If no value is in the scope, the default buffer size `Channel.DEFAULT_BUFFER_SIZE` is used
126+
127+
```java
128+
import com.softwaremill.jox.Channel;
129+
130+
public class Demo {
131+
132+
public static void main(String[] args) {
133+
ScopedValue.where(Channel.BUFFER_SIZE, 10)
134+
.run(() -> {
135+
Channel.withScopedBufferSize(); // creates channel with buffer size = 10
136+
});
137+
Channel.withScopedBufferSize(); // no value in the scope, so default (16) buffer size is used
138+
}
139+
}
140+
```
141+
122142
Unlimited channels can be created with `Channel.newUnlimitedChannel()`. Such channels will never block on send().
123143

124144
#### Closing a channel
@@ -496,7 +516,7 @@ public class Demo {
496516
// result = 5
497517
```
498518

499-
## Streaming
519+
## (Hot) Streaming
500520

501521
### Dependency
502522

@@ -579,6 +599,268 @@ public class Demo {
579599
// result = [10, 30, 50, 70, 90]
580600
```
581601

602+
## (Lazy) Streaming - Flows
603+
604+
### Dependency
605+
606+
Maven:
607+
608+
```xml
609+
610+
<dependency>
611+
<groupId>com.softwaremill.jox</groupId>
612+
<artifactId>flows</artifactId>
613+
<version>tbd</version>
614+
</dependency>
615+
```
616+
617+
Gradle:
618+
619+
```groovy
620+
implementation 'com.softwaremill.jox:flows:tbd'
621+
```
622+
623+
### Usage
624+
625+
A `Flow<T>` describes an asynchronous data transformation pipeline. When run, it emits elements of type `T`.
626+
627+
Flows are lazy, evaluation (and any effects) happen only when the flow is run. Flows might be finite or infinite; in the latter case running a flow never ends normally; it might be interrupted, though. Finally, any exceptions that occur when evaluating the flow's logic will be thrown when running the flow, after any cleanup logic completes.
628+
629+
### Creating Flows
630+
631+
There are number of methods in the `Flows` class which allows to create a `Flow`.
632+
633+
```java
634+
import java.time.Duration;
635+
import com.softwaremill.jox.flows.Flows;
636+
637+
public class Demo {
638+
639+
public static void main(String[] args) {
640+
Flows.fromValues(1, 2, 3); // a finite flow
641+
Flows.tick(Duration.ofSeconds(1), "x"); // an infinite flow emitting "x" every second
642+
Flows.iterate(0, i -> i + 1); // an infinite flow iterating from 0
643+
}
644+
}
645+
```
646+
Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the elements are emitted and any effects that are part of the flow's stages happen.
647+
648+
649+
Flows can also be created using `Channel` `Source`s:
650+
651+
```java
652+
import java.util.concurrent.ExecutionException;
653+
654+
import com.softwaremill.jox.Channel;
655+
import com.softwaremill.jox.flows.Flows;
656+
import com.softwaremill.jox.structured.Scopes;
657+
658+
public class Demo {
659+
660+
public static void main(String[] args) throws ExecutionException, InterruptedException {
661+
Channel<Integer> ch = new Channel<>(16);
662+
Scopes.supervised(scope -> {
663+
scope.fork(() -> {
664+
ch.send(1);
665+
ch.send(15);
666+
ch.send(-2);
667+
ch.done();
668+
return null;
669+
});
670+
671+
Flows.fromSource(ch); // TODO: transform the flow further & run
672+
return null;
673+
});
674+
}
675+
}
676+
```
677+
678+
Finally, flows can be created by providing arbitrary element-emitting logic:
679+
680+
```java
681+
import com.softwaremill.jox.flows.Flows;
682+
683+
public class Demo {
684+
685+
public static void main(String[] args) {
686+
Flows.usingEmit(emit -> {
687+
emit.apply(21);
688+
for (int i = 0; i < 5; i++) {
689+
emit.apply(i);
690+
}
691+
emit.apply(37);
692+
});
693+
}
694+
}
695+
```
696+
697+
The `FlowEmit` instance is used to emit elements by the flow, that is process them further, as defined by the downstream pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's a processing error.
698+
699+
As part of the callback, you can create `Scope`, fork background computations or run other flows asynchronously. However, take care **not** to share the `FlowEmit` instance across threads. That is, instances of `FlowEmit` are thread-unsafe and should only be used on the calling thread.
700+
The lifetime of `FlowEmit` should not extend over the duration of the invocation of `usingEmit`.
701+
702+
Any asynchronous communication should be best done with `Channel`s. You can then manually forward any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.
703+
704+
### Transforming flows: basics
705+
706+
Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages:
707+
708+
```java
709+
import java.util.Map;
710+
import com.softwaremill.jox.flows.Flows;
711+
712+
public class Demo {
713+
714+
public static void main(String[] args) {
715+
Flows.fromValues(1, 2, 3, 5, 6)
716+
.map(i -> i * 2)
717+
.filter(i -> i % 2 == 0)
718+
.take(3)
719+
.zip(Flows.repeat("a number"))
720+
.interleave(Flows.repeat(Map.entry(0, "also a number")), 1, false);
721+
}
722+
}
723+
```
724+
725+
You can also define arbitrary element-emitting logic, using each incoming element using `.mapUsingEmit`, similarly to `Flows.usingEmit` above.
726+
727+
### Running flows
728+
729+
Flows have to be run, for any processing to happen. This can be done with one of the `.run...` methods. For example:
730+
731+
```java
732+
import java.time.Duration;
733+
734+
import com.softwaremill.jox.flows.Flows;
735+
736+
public class Demo {
737+
738+
public static void main(String[] args) throws Exception {
739+
Flows.fromValues(1, 2, 3).runToList(); // List(1, 2, 3)
740+
Flows.fromValues(1, 2, 3).runForeach(System.out::println);
741+
Flows.tick(Duration.ofSeconds(1), "x").runDrain(); // never finishes
742+
}
743+
}
744+
```
745+
746+
Running a flow is a blocking operation. Unless asynchronous boundaries are present (explicit or implicit, more on this below), the entire processing happens on the calling thread. For example such a pipeline:
747+
748+
```java
749+
import com.softwaremill.jox.flows.Flows;
750+
751+
public class Demo {
752+
753+
public static void main(String[] args) throws Exception {
754+
Flows.fromValues(1, 2, 3, 5, 6)
755+
.map(i -> i * 2)
756+
.filter(i -> i % 2 == 0)
757+
.runToList();
758+
}
759+
}
760+
```
761+
Processes the elements one-by-one on the thread that is invoking the run method.
762+
763+
764+
### Transforming flows: concurrency
765+
766+
A number of flow transformations introduces asynchronous boundaries. For example, `.mapPar(int parallelism, Function<T,U> mappingFunction)` describes a flow,
767+
which runs the pipeline defined so far in the background, emitting elements to a `channel`. Another `fork` reads these elements and runs up to `parallelism` invocations of `mappingFunction` concurrently. Mapped elements are then emitted by the returned flow.
768+
769+
Behind the scenes, a new concurrency `Scope` is created along with a number of forks. In case of any exceptions, everything is cleaned up before the flow propagates the exceptions. The `.mapPar` logic ensures that any exceptions from the preceding pipeline are propagated through the channel.
770+
771+
Some other stages which introduce concurrency include `.merge`, `.interleave`, `.groupedWithin` and `I/O` stages. The created channels serve as buffers between the pipeline stages, and their capacity is defined by the `ScopedValue` `Channel.BUFFER_SIZE` in the scope, or default `Channel.DEFAULT_BUFFER_SIZE` is used.
772+
773+
Explicit asynchronous boundaries can be inserted using `.buffer()`. This might be useful if producing the next element to emit, and consuming the previous should run concurrently; or if the processing times of the consumer varies, and the producer should buffer up elements.
774+
775+
### Interoperability with channels
776+
777+
Flows can be created from channels, and run to channels. For example:
778+
779+
```java
780+
import java.util.Arrays;
781+
782+
import com.softwaremill.jox.Channel;
783+
import com.softwaremill.jox.Source;
784+
import com.softwaremill.jox.flows.Flows;
785+
import com.softwaremill.jox.structured.Scopes;
786+
787+
public class Demo {
788+
789+
public static void main(String[] args) throws Exception {
790+
Source<String> ch = getSource(args); // provide a source
791+
Scopes.supervised(scope -> {
792+
Source<String> output = ScopedValue.getWhere(Channel.BUFFER_SIZE, 5, () -> Flows.fromSource(ch)
793+
.mapConcat(v -> Arrays.asList(v.split(" ")))
794+
.filter(v -> v.startsWith("example"))
795+
.runToChannel(scope));
796+
});
797+
}
798+
}
799+
```
800+
801+
The method above needs to be run within a concurrency scope, as `.runToChannel()` creates a background fork which runs the pipeline described by the flow, and emits its elements onto the returned channel.
802+
803+
### Text transformations and I/O operations
804+
805+
For smooth operations on `byte[]`, we've created a wrapper class `ByteChunk`. And for smooth type handling we created a dedicated `ByteFlow`, a subtype of `Flow<ByteChunk>`.
806+
To be able to utilize text and I/O operations, you need to create or transform into `ByteFlow`. It can be created via `Flows.fromByteArray` or `Flows.fromByteChunk`.
807+
`Flow` containing `byte[]` or `ByteChunk` can be transformed by using `toByteFlow()` method. Any other flow can be transformed by using `toByteFlow()` with mapping function.
808+
809+
#### Text operations
810+
* `encodeUtf8` encodes a `Flow<String>` into a `ByteFlow`
811+
* `linesUtf8` decodes a `ByteFlow` into a `Flow<String>`. Assumes that the input represents text with line breaks. The `String` elements emitted by resulting `Flow<String>` represent text lines.
812+
* `decodeStringUtf8` to decode a `ByteFlow` into a `Flow<String>`, without handling line breaks, just processing input bytes as UTF-8 characters, even if a multi-byte character is divided into two chunks.
813+
814+
#### I/O Operations
815+
* `runToInputStream(UnsupervisedScope scope)` runs given flow asynchronously into returned `InputStream`
816+
* `runToOutputStream(OutputStream outputStream)` runs given flow into provided `OutputStream`
817+
* `runToFile(Path path)` runs given flow into file. If file does not exist, it's created.
818+
819+
It is also possible to create Flow from `inputStream` or `path` using `Flows` factory methods.
820+
821+
### Logging
822+
823+
Jox does not have any integrations with logging libraries, but it provides a simple way to log elements emitted by flows using the `.tap` method:
824+
825+
```java
826+
import com.softwaremill.jox.flows.Flows;
827+
828+
public class Demo {
829+
830+
public static void main(String[] args) throws Exception {
831+
Flows.fromValues(1, 2, 3)
832+
.tap(n -> System.out.printf("Received: %d%n", n))
833+
.runToList();
834+
}
835+
}
836+
```
837+
838+
### Reactive streams interoperability
839+
840+
#### Flow -> Publisher
841+
842+
A `Flow` can be converted to a `java.util.concurrent.Flow.Publisher` using the `.toPublisher` method.
843+
844+
This needs to be run within an concurrency `Scope`, as upon subscribing, a fork is created to run the publishing
845+
process. Hence, the scope should remain active as long as the publisher is used.
846+
847+
Internally, elements emitted by the flow are buffered, using a buffer of capacity given by the `Channel.BUFFER_SIZE` in
848+
scope.
849+
850+
To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the `reactive-streams` dependency and
851+
use `org.reactivestreams.FlowAdapters`.
852+
853+
854+
#### Publisher -> Flow
855+
856+
A `java.util.concurrent.Flow.Publisher` can be converted to a `Flow` using `Flow.fromPublisher`.
857+
858+
Internally, elements published to the subscription are buffered, using a buffer of capacity given by the
859+
`Channel.BUFFER_SIZE` in scope. That's also how many elements will be at most requested from the publisher at a time.
860+
861+
To convert a `org.reactivestreams.Publisher` instance, you'll need the same dependency as above and use `org.reactivestreams.FlowAdapters`.
862+
863+
582864
## Feedback
583865

584866
Is what we are looking for!

0 commit comments

Comments
 (0)