Skip to content

Commit 9cb2ebc

Browse files
committed
Release 1.0.2
1 parent c488fc8 commit 9cb2ebc

File tree

9 files changed

+122
-33
lines changed

9 files changed

+122
-33
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,13 @@ preserving developer-friendly stack traces, and without compromising performance
2121
To use Ox, add the following dependency, using either [sbt](https://www.scala-sbt.org):
2222

2323
```scala
24-
"com.softwaremill.ox" %% "core" % "1.0.1"
24+
"com.softwaremill.ox" %% "core" % "1.0.2"
2525
```
2626

2727
Or [scala-cli](https://scala-cli.virtuslab.org):
2828

2929
```scala
30-
//> using dep "com.softwaremill.ox::core:1.0.1"
30+
//> using dep "com.softwaremill.ox::core:1.0.2"
3131
```
3232

3333
Documentation is available at [https://ox.softwaremill.com](https://ox.softwaremill.com), ScalaDocs can be browsed at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.ox).

generated-doc/out/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Safe direct-style streaming, concurrency and resiliency for Scala on the JVM. Requires JDK 21+ & Scala 3.
44

5-
To start using Ox, add the `com.softwaremill.ox::core:1.0.1` [dependency](info/dependency.md) to your project.
5+
To start using Ox, add the `com.softwaremill.ox::core:1.0.2` [dependency](info/dependency.md) to your project.
66
Then, take a look at the tour of Ox, or follow one of the topics listed in the menu to get to know Ox's API!
77

88
In addition to this documentation, ScalaDocs can be browsed at [https://javadoc.io](https://www.javadoc.io/doc/com.softwaremill.ox).

generated-doc/out/info/dependency.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ To use ox core in your project, add:
44

55
```scala
66
// sbt dependency
7-
"com.softwaremill.ox" %% "core" % "1.0.1"
7+
"com.softwaremill.ox" %% "core" % "1.0.2"
88

99
// scala-cli dependency
10-
//> using dep com.softwaremill.ox::core:1.0.1
10+
//> using dep com.softwaremill.ox::core:1.0.2
1111
```
1212

1313
Ox core depends only on the Java [jox](https://github.com/softwaremill/jox) project, where channels are implemented. There are no other direct or transitive dependencies.

generated-doc/out/integrations/cron4s.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Dependency:
44

55
```scala
6-
"com.softwaremill.ox" %% "cron" % "1.0.1"
6+
"com.softwaremill.ox" %% "cron" % "1.0.2"
77
```
88

99
This module allows to run schedules based on cron expressions from [cron4s](https://github.com/alonsodomin/cron4s).

generated-doc/out/integrations/kafka.md

Lines changed: 65 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,20 @@
33
Dependency:
44

55
```scala
6-
"com.softwaremill.ox" %% "kafka" % "1.0.1"
6+
"com.softwaremill.ox" %% "kafka" % "1.0.2"
77
```
88

99
`Flow`s which read from a Kafka topic, mapping stages and drains which publish to Kafka topics are available through
10-
the `KafkaFlow`, `KafkaStage` and `KafkaDrain` objects. In all cases either a manually constructed instance of a
11-
`KafkaProducer` / `KafkaConsumer` is needed, or `ProducerSettings` / `ConsumerSetttings` need to be provided with the
12-
bootstrap servers, consumer group id, key / value serializers, etc.
10+
the `KafkaFlow`, `KafkaStage` and `KafkaDrain` objects.
11+
12+
In all cases kafka producers and consumers can be provided:
13+
* by manually creating (and closing) an instance of a `KafkaProducer` / `KafkaConsumer`
14+
* through a `ProducerSettings` / `ConsumerSettings`, with the bootstrap servers, consumer group id, key/value
15+
serializers, etc. The lifetime is then managed by the flow operators.
16+
* through a thread-safe wrapper on a consumer (`ActorRef[KafkaConsumerWrapper[K, V]]`), for which the lifetime is bound
17+
to the current concurrency scope
18+
19+
## Reading from Kafka
1320

1421
To read from a Kafka topic, use:
1522

@@ -25,6 +32,8 @@ val source = KafkaFlow.subscribe(settings, topic)
2532
.runForeach { (msg: ReceivedMessage[String, String]) => ??? }
2633
```
2734

35+
## Publishing to Kafka
36+
2837
To publish data to a Kafka topic:
2938

3039
```scala
@@ -40,6 +49,25 @@ Flow
4049
.pipe(KafkaDrain.runPublish(settings))
4150
```
4251

52+
To publish data as a mapping stage:
53+
54+
```scala
55+
import ox.flow.Flow
56+
import ox.kafka.ProducerSettings
57+
import ox.kafka.KafkaStage.*
58+
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
59+
60+
val settings = ProducerSettings.default.bootstrapServers("localhost:9092")
61+
val metadatas: Flow[RecordMetadata] = Flow
62+
.fromIterable(List("a", "b", "c"))
63+
.map(msg => ProducerRecord[String, String]("my_topic", msg))
64+
.mapPublish(settings)
65+
66+
// process & run the metadatas flow further
67+
```
68+
69+
## Reading & publishing to Kafka with offset commits
70+
4371
Quite often data to be published to a topic (`topic1`) is computed basing on data received from another topic
4472
(`topic2`). In such a case, it's possible to commit messages from `topic2`, after the messages to `topic1` are
4573
successfully published.
@@ -63,7 +91,7 @@ computed. For example:
6391
```scala
6492
import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaFlow, ProducerSettings, SendPacket}
6593
import ox.kafka.ConsumerSettings.AutoOffsetReset
66-
import ox.pipe
94+
import ox.*
6795
import org.apache.kafka.clients.producer.ProducerRecord
6896

6997
val consumerSettings = ConsumerSettings.default("my_group")
@@ -72,29 +100,43 @@ val producerSettings = ProducerSettings.default.bootstrapServers("localhost:9092
72100
val sourceTopic = "source_topic"
73101
val destTopic = "dest_topic"
74102

75-
KafkaFlow
76-
.subscribe(consumerSettings, sourceTopic)
77-
.map(in => (in.value.toLong * 2, in))
78-
.map((value, original) =>
79-
SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
80-
.pipe(KafkaDrain.runPublishAndCommit(producerSettings))
103+
supervised:
104+
// the consumer is shared between the subscribe & offset stages
105+
// its lifetime is bound to the current concurrency scope
106+
val consumer = consumerSettings.toThreadSafeConsumerWrapper
107+
KafkaFlow
108+
.subscribe(consumer, sourceTopic)
109+
.map(in => (in.value.toLong * 2, in))
110+
.map((value, original) =>
111+
SendPacket(ProducerRecord[String, String](destTopic, value.toString), original))
112+
.pipe(KafkaDrain.runPublishAndCommit(producerSettings, consumer))
81113
```
82114

83115
The offsets are committed every second in a background process.
84116

85-
To publish data as a mapping stage:
117+
## Reading from Kafka, processing data & committing offsets
118+
119+
Offsets can also be committed after the data has been processed, without producing any records to write to a topic.
120+
For that, we can use the `runCommit` drain, or the `mapCommit` stage, both of which work with a `Flow[CommitPacket]`:
86121

87122
```scala
88-
import ox.flow.Flow
89-
import ox.kafka.ProducerSettings
90-
import ox.kafka.KafkaStage.*
91-
import org.apache.kafka.clients.producer.{ProducerRecord, RecordMetadata}
123+
import ox.kafka.{ConsumerSettings, KafkaDrain, KafkaFlow, CommitPacket}
124+
import ox.kafka.ConsumerSettings.AutoOffsetReset
125+
import ox.*
92126

93-
val settings = ProducerSettings.default.bootstrapServers("localhost:9092")
94-
val metadatas: Flow[RecordMetadata] = Flow
95-
.fromIterable(List("a", "b", "c"))
96-
.map(msg => ProducerRecord[String, String]("my_topic", msg))
97-
.mapPublish(settings)
127+
val consumerSettings = ConsumerSettings.default("my_group")
128+
.bootstrapServers("localhost:9092").autoOffsetReset(AutoOffsetReset.Earliest)
129+
val sourceTopic = "source_topic"
98130

99-
// process & run the metadatas flow further
100-
```
131+
supervised:
132+
// the consumer is shared between the subscribe & offset stages
133+
// its lifetime is bound to the current concurrency scope
134+
val consumer = consumerSettings.toThreadSafeConsumerWrapper
135+
KafkaFlow
136+
.subscribe(consumer, sourceTopic)
137+
.mapPar(10) { in =>
138+
// process the message, e.g. send an HTTP request
139+
CommitPacket(in)
140+
}
141+
.pipe(KafkaDrain.runCommit(consumer))
142+
```

generated-doc/out/integrations/mdc-logback.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Dependency:
44

55
```scala
6-
"com.softwaremill.ox" %% "mdc-logback" % "1.0.1"
6+
"com.softwaremill.ox" %% "mdc-logback" % "1.0.2"
77
```
88

99
Ox provides support for setting inheritable MDC (mapped diagnostic context) values, when using the [Logback](https://logback.qos.ch)

generated-doc/out/integrations/otel-context.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
Dependency:
44

55
```scala
6-
"com.softwaremill.ox" %% "otel-context" % "1.0.1"
6+
"com.softwaremill.ox" %% "otel-context" % "1.0.2"
77
```
88

99
When using the default OpenTelemetry context-propagation mechanisms, which rely on thread-local storage, the context

generated-doc/out/other/best-practices.md

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,4 +51,51 @@ blocking methods within the forks.
5151

5252
Virtual threads are normally not visible when using tools such as `jstack` or IntelliJ's debugger. To inspect their
5353
stack traces, you'll need to create a thread dump to a file using `jcmd [pid] Thread.dump_to_file [file]`, or use
54-
Intellij's thread dump utility, when paused in the debugger.
54+
Intellij's thread dump utility, when paused in the debugger.
55+
56+
## Dealing with uninterruptible stdin
57+
58+
Some I/O operations, like reading from stdin, block the thread on the `read` syscall and only unblock when data becomes
59+
available or the stream is closed; such a call is uninterruptible. The problem with stdin specifically is that it can't
60+
be easily closed, making it impossible to interrupt such operations directly. This pattern extends to other similar
61+
blocking operations that behave like stdin.
62+
63+
The solution is to delegate the blocking operation to a separate thread and use a [channel](../streaming/channels.md)
64+
for communication. This thread cannot be managed by Ox, as Ox always attempts to run cleanup on application shutdown and
65+
that means interrupting all forks. Some blocking I/O can't, however, be interrupted on the JVM and the advised way of
66+
dealing with that is to just close the resource which in turn makes read/write methods throw an `IOException`. In the
67+
case of stdin closing it is usually not what you want to do.
68+
69+
To work around that you can sacrifice a thread and since receiving from a channel is interruptible, this makes the
70+
overall operation interruptible as well:
71+
72+
```scala
73+
import ox.*, channels.*
74+
import scala.io.StdIn
75+
76+
object stdinSupport:
77+
private lazy val chan: Channel[String] =
78+
val rvChan = Channel.rendezvous[String]
79+
80+
Thread
81+
.ofVirtual()
82+
.start(() => forever(rvChan.sendOrClosed(StdIn.readLine()).discard))
83+
84+
rvChan
85+
86+
def readLineInterruptibly: String =
87+
try chan.receive()
88+
catch
89+
case iex: InterruptedException =>
90+
// Handle interruption appropriately
91+
throw iex
92+
```
93+
94+
This pattern allows you to use stdin (or similar blocking operations) with Ox's timeout and interruption mechanisms,
95+
such as `timeoutOption` or scope cancellation.
96+
97+
Note that for better stdin performance, you can use `Channel.buffered` instead of a rendezvous channel, or even use
98+
`java.lang.System.in` directly and proxy raw data through the channel. Keep in mind that this solution leaks a thread
99+
that will remain blocked on stdin for the lifetime of the application. It's possible to avoid this trade-off by using
100+
libraries that employ JNI/JNA to access stdin, such as [JLine 3](https://jline.org/docs/intro), which can use raw mode
101+
with non-blocking or timeout-based reads, allowing the thread to be properly interrupted and cleaned up.

generated-doc/out/streaming/flows.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ To obtain a `org.reactivestreams.Publisher` instance, you'll need to add the fol
171171
bring the `toReactiveStreamsPublisher` method into scope:
172172

173173
```scala
174-
// sbt dependency: "com.softwaremill.ox" %% "flow-reactive-streams" % "1.0.1"
174+
// sbt dependency: "com.softwaremill.ox" %% "flow-reactive-streams" % "1.0.2"
175175

176176
import ox.supervised
177177
import ox.flow.Flow

0 commit comments

Comments
 (0)