Skip to content

Commit e8b9513

Browse files
committed
feat: add flow graph
1 parent 60e87d0 commit e8b9513

22 files changed

+483
-198
lines changed

build.gradle.kts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
plugins {
22
kotlin("jvm") version "2.0.0"
3+
id("org.jetbrains.kotlinx.atomicfu") version "0.25.0"
34
}
45

56
group = "dev.silenium.playground"
@@ -17,6 +18,8 @@ dependencies {
1718
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-jdk9:$coroutines")
1819
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-reactive:$coroutines")
1920

21+
implementation("org.jetbrains.kotlinx:atomicfu:0.25.0")
22+
2023
val kotest = "5.9.0"
2124
testImplementation("io.kotest:kotest-runner-junit5-jvm:$kotest")
2225
testImplementation("io.kotest:kotest-assertions-core-jvm:$kotest")

src/main/kotlin/Source.kt

Lines changed: 0 additions & 53 deletions
This file was deleted.
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package dev.silenium.libs.flows.api
2+
3+
import kotlinx.coroutines.Job
4+
import kotlinx.coroutines.flow.launchIn
5+
import kotlinx.coroutines.flow.onEach
6+
7+
interface FlowGraphBuilder : FlowGraph {
8+
infix fun <T, P> Source<T, P>.connectTo(sink: Sink<T, P>): Job =
9+
flow.onEach(sink::submit).launchIn(this@FlowGraphBuilder)
10+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package dev.silenium.libs.flows.api
2+
3+
import kotlinx.coroutines.CoroutineScope
4+
5+
interface FlowGraph : AutoCloseable, CoroutineScope {
6+
val elements: List<FlowGraphElement<*>>
7+
val size: Int
8+
9+
fun <T, P, E : Source<T, P>> source(
10+
source: E,
11+
name: String = "${source.javaClass.name}-${size}",
12+
): SourceFlowGraphElement<T, P, E>
13+
14+
fun <T, P, E : Sink<T, P>> sink(
15+
sink: E,
16+
name: String = "${sink.javaClass.name}-${size}",
17+
): SinkFlowGraphElement<T, P, E>
18+
19+
fun <IT, IP, OT, OP, E : Transformer<IT, IP, OT, OP>> transformer(
20+
transform: E,
21+
name: String = "${transform.javaClass.name}-${size}",
22+
): TransformerFlowGraphElement<IT, IP, OT, OP, E>
23+
24+
fun <E : Source<*, *>> source(name: String): FlowGraphElement<E>?
25+
fun <E : Sink<*, *>> sink(name: String): FlowGraphElement<E>?
26+
fun <E : Transformer<*, *, *, *>> transformer(name: String): FlowGraphElement<E>?
27+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package dev.silenium.libs.flows.api
2+
3+
import kotlin.reflect.KClass
4+
5+
interface FlowGraphElement<T : Any> : AutoCloseable {
6+
val name: String
7+
val type: KClass<T>
8+
val impl: T
9+
10+
override fun close()
11+
}
12+
13+
interface SourceFlowGraphElement<T, P, S : Source<T, P>> : FlowGraphElement<S>, Source<T, P>
14+
interface SinkFlowGraphElement<T, P, S : Sink<T, P>> : FlowGraphElement<S>, Sink<T, P>
15+
interface TransformerFlowGraphElement<IT, IP, OT, OP, T : Transformer<IT, IP, OT, OP>> : FlowGraphElement<T>, Transformer<IT, IP, OT, OP>
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package dev.silenium.libs.flows.api
2+
3+
data class FlowItem<T, P>(val pad: UInt, val metadata: P, val value: T) : ReferenceCounted<FlowItem<T, P>> {
4+
@Suppress("UNCHECKED_CAST")
5+
override fun clone(): Result<FlowItem<T, P>> {
6+
return when (value) {
7+
is ReferenceCounted<*> -> value.clone().map { FlowItem(pad, metadata, it as T) }
8+
else -> Result.success(this)
9+
}
10+
}
11+
12+
override fun close() {
13+
if (value is AutoCloseable) {
14+
(value as AutoCloseable).close()
15+
}
16+
}
17+
}
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
package dev.silenium.libs.flows.api
2+
13
interface ReferenceCounted<T : AutoCloseable> : AutoCloseable {
24
fun clone(): Result<T>
35
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package dev.silenium.libs.flows.api
2+
3+
import kotlinx.coroutines.flow.FlowCollector
4+
5+
interface Sink<T, P> : FlowCollector<FlowItem<T, P>>, AutoCloseable {
6+
val inputMetadata: Map<UInt, P?>
7+
8+
fun configure(pad: UInt, metadata: P): Result<Unit>
9+
10+
suspend fun submit(item: FlowItem<T, P>): Result<Unit>
11+
12+
override suspend fun emit(value: FlowItem<T, P>) {
13+
check(inputMetadata.containsKey(value.pad)) { "pad not configured" }
14+
check(inputMetadata[value.pad] == value.metadata) { "metadata mismatch" }
15+
16+
submit(value).getOrThrow()
17+
}
18+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package dev.silenium.libs.flows.api
2+
3+
import kotlinx.coroutines.flow.Flow
4+
5+
interface Source<T, P> : AutoCloseable {
6+
val outputMetadata: Map<UInt, P>
7+
val flow: Flow<FlowItem<T, P>>
8+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
package dev.silenium.libs.flows.api
2+
3+
interface Transformer<IT, IP, OT, OP> : Sink<IT, IP>, Source<OT, OP>

0 commit comments

Comments
 (0)