Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 2 additions & 15 deletions build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import com.google.protobuf.gradle.id

plugins {
application
id("com.google.protobuf") version "0.9.4"
id("com.ncorti.ktfmt.gradle") version "0.24.0"
id("org.pkl-lang") version "0.30.2"
kotlin("kapt") version "2.3.0"
Expand Down Expand Up @@ -35,7 +32,8 @@ dependencies {
implementation("org.jgrapht:jgrapht-core:1.5.2")
implementation("org.jgrapht:jgrapht-io:1.5.2")

implementation("com.google.protobuf:protobuf-java:4.32.0")
implementation("org.apache.fory:fory-core:0.15.0")
implementation("org.apache.fory:fory-kotlin:0.15.0")

implementation("io.etcd:jetcd-core:0.8.6")
implementation("org.eclipse.zenoh:zenoh-kotlin:1.7.2")
Expand Down Expand Up @@ -121,14 +119,3 @@ pkl {
}
}
}

protobuf {
generateProtoTasks {
all().forEach { task ->
task.builtins {
id("python")
id("cpp")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package at.ac.uibk.dps.cirrina.execution.`object`
import at.ac.uibk.dps.cirrina.EnvironmentVariables
import at.ac.uibk.dps.cirrina.csm.Csml
import at.ac.uibk.dps.cirrina.execution.graph.EventGraph
import at.ac.uibk.dps.cirrina.execution.util.EventExchange
import at.ac.uibk.dps.cirrina.execution.util.Serializer
import io.zenoh.Config
import io.zenoh.Session
import io.zenoh.Zenoh
Expand Down Expand Up @@ -68,7 +68,7 @@ class EventHandler() : AutoCloseable {
fun emit(event: Event) {
val key = event.toKey() ?: return
val publisher = publishers[key] ?: error("no publisher for topic '${key}'")
val payload = ZBytes.from(EventExchange.toBytes(event))
val payload = ZBytes.from(Serializer.serialize(event))

publisher.put(payload).onFailure { error("failed to send event '$event'") }
}
Expand Down Expand Up @@ -104,7 +104,7 @@ class EventHandler() : AutoCloseable {
private fun handleIncoming(sample: Sample) {
runCatching {
val bytes = sample.payload.toBytes()
val event = EventExchange.fromBytes(bytes)
val event = Serializer.deserialize<Event>(bytes)

propagate(event)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package at.ac.uibk.dps.cirrina.execution.provider

import at.ac.uibk.dps.cirrina.execution.`object`.Context
import at.ac.uibk.dps.cirrina.execution.`object`.ContextVariable
import at.ac.uibk.dps.cirrina.execution.util.ValueExchange
import at.ac.uibk.dps.cirrina.execution.util.Serializer
import io.etcd.jetcd.ByteSequence
import io.etcd.jetcd.Client
import io.etcd.jetcd.op.Cmp
Expand Down Expand Up @@ -36,7 +36,7 @@ class ContextEtcd(endpoints: List<String>) : Context {

override fun create(name: String, value: Any?): Int {
val key = name.toByteSequence()
val bytes = value.toBytes()
val bytes = value?.toBytes() ?: byteArrayOf()

val txn =
client.kvClient
Expand All @@ -52,7 +52,7 @@ class ContextEtcd(endpoints: List<String>) : Context {

override fun assign(name: String, value: Any?): Int {
val key = name.toByteSequence()
val bytes = value.toBytes()
val bytes = value?.toBytes() ?: byteArrayOf()

val txn =
client.kvClient
Expand Down Expand Up @@ -87,9 +87,9 @@ class ContextEtcd(endpoints: List<String>) : Context {

private fun String.toByteSequence() = ByteSequence.from(this, StandardCharsets.UTF_8)

private fun Any?.toBytes(): ByteArray = ValueExchange.toBytes(this)
private fun Any.toBytes(): ByteArray = Serializer.serialize(this)

private fun ByteArray.fromBytes(): Any? = ValueExchange.fromBytes(this)
private fun ByteArray.fromBytes(): Any = Serializer.deserialize(this)

override fun close() {
client.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import at.ac.uibk.dps.cirrina.csm.Csml.HttpMethod
import at.ac.uibk.dps.cirrina.csm.Csml.HttpServiceImplementationBinding
import at.ac.uibk.dps.cirrina.csm.Csml.ServiceImplementationBinding
import at.ac.uibk.dps.cirrina.execution.`object`.ContextVariable
import at.ac.uibk.dps.cirrina.execution.`object`.exchange.ContextVariableProtos
import at.ac.uibk.dps.cirrina.execution.util.ContextVariableExchange
import at.ac.uibk.dps.cirrina.execution.util.Serializer
import com.google.protobuf.InvalidProtocolBufferException
import java.net.HttpURLConnection
import java.net.URI
Expand Down Expand Up @@ -55,7 +54,7 @@ class HttpServiceImplementation(
override suspend fun invoke(input: List<ContextVariable>): List<ContextVariable> {
require(input.none { it.isLazy }) { "all variables must be evaluated before conversion" }

val payload = serializeInput(input)
val payload = Serializer.serialize(input)
val uri = URI(scheme, null, host, port, endPoint, null, null)

val request =
Expand All @@ -71,15 +70,6 @@ class HttpServiceImplementation(
return handleResponse(response)
}

private fun serializeInput(input: List<ContextVariable>): ByteArray {
if (input.isEmpty()) return byteArrayOf()

return ContextVariableProtos.ContextVariables.newBuilder()
.addAllData(input.map { ContextVariableExchange.toProto(it) })
.build()
.toByteArray()
}

private fun handleResponse(response: HttpResponse<ByteArray>): List<ContextVariable> {
val statusCode = response.statusCode()

Expand All @@ -91,9 +81,7 @@ class HttpServiceImplementation(
if (body == null || body.isEmpty()) return emptyList()

return try {
ContextVariableProtos.ContextVariables.parseFrom(body).dataList.map {
ContextVariableExchange.fromProto(it)
}
Serializer.deserialize(body)
} catch (_: InvalidProtocolBufferException) {
error("unexpected http service response format")
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package at.ac.uibk.dps.cirrina.execution.util

import at.ac.uibk.dps.cirrina.csm.Csml
import at.ac.uibk.dps.cirrina.execution.`object`.ContextVariable
import at.ac.uibk.dps.cirrina.execution.`object`.Event
import org.apache.fory.Fory
import org.apache.fory.ThreadSafeFory
import org.apache.fory.config.Language

object Serializer {
private val fory: ThreadSafeFory =
Fory.builder()
.withLanguage(Language.JAVA)
.withAsyncCompilation(true)
.buildThreadSafeFory()
.apply {
register(Event::class.java)
register(Csml.EventChannel::class.java)
register(ContextVariable::class.java)

ensureSerializersCompiled()
}

fun serialize(obj: Any): ByteArray {
if (obj is Event && obj.data.any { it.isLazy }) {
error("event '${obj.topic}' has unevaluated data")
}
return fory.serialize(obj)
}

@Suppress("UNCHECKED_CAST")
fun <T> deserialize(data: ByteArray): T {
return fory.deserialize(data) as T
}
}

This file was deleted.

Loading
Loading