Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,10 @@ import org.apache.pulsar.client.api.Message
*/
interface MessageHandling {
fun parseMessage(message: Message<ByteArray>)

fun skipMessage(message: Message<ByteArray>, propertyFilter: Pair<String, String>?): Boolean {
return propertyFilter?.let { filter ->
message.properties[filter.first] != filter.second
} ?: false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,16 @@ import java.time.Instant
*/
class MessageHandlingClassImpl(
private val selectedProtoClass: SingleSelection<PulsarMessageClassInfo>,
private val propertyFilter: Pair<String, String>?,
private val receivedMessages: SnapshotStateList<ReceivedMessages>,
private val setUserFeedback: (String) -> Unit
) : MessageHandling {

override fun parseMessage(message: Message<ByteArray>) {
try {
if (skipMessage(message, propertyFilter)) {
return
}
val proto = selectedProtoClass.selected ?: run {
setUserFeedback(NO_CLASS_SELECTED_DESERIALIZE)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,16 @@ import java.time.Instant
*/
class MessageHandlingImpl(
private val messageType: SingleSelection<PulsarMessage>,
private val propertyFilter: Pair<String, String>?,
private val receivedMessages: SnapshotStateList<ReceivedMessages>,
private val setUserFeedback: (String) -> Unit
) : MessageHandling {

override fun parseMessage(message: Message<ByteArray>) {
try {
if (skipMessage(message, propertyFilter)) {
return
}
val messageString = messageType.selected?.deserialize(message.data)
val publishTime = Instant.ofEpochMilli(message.publishTime)
receivedMessages.add(
Expand Down
15 changes: 1 addition & 14 deletions src/main/kotlin/com/toasttab/pulseman/pulsar/Pulsar.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import com.toasttab.pulseman.AppStrings.EXCEPTION
import com.toasttab.pulseman.AppStrings.FAILED_TO_CLOSE_PULSAR
import com.toasttab.pulseman.AppStrings.FAILED_TO_CREATE_CONSUMER
import com.toasttab.pulseman.AppStrings.FAILED_TO_CREATE_PRODUCER
import com.toasttab.pulseman.AppStrings.FAILED_TO_DESERIALIZE_PROPERTIES
import com.toasttab.pulseman.AppStrings.FAILED_TO_SETUP_PULSAR
import com.toasttab.pulseman.AppStrings.MESSAGE_SENT_ID
import com.toasttab.pulseman.AppStrings.NO_CLASS_GENERATED_TO_SEND
Expand Down Expand Up @@ -129,18 +128,6 @@ class Pulsar(
}
}

private fun properties(): Map<String, String> {
val propertiesJsonMap = pulsarSettings.propertySettings.propertyMap()
if (propertiesJsonMap.isNotBlank()) {
try {
return mapper.readValue(propertiesJsonMap, mapTypeRef)
} catch (ex: Exception) {
setUserFeedback("$FAILED_TO_DESERIALIZE_PROPERTIES=$propertiesJsonMap. $EXCEPTION=$ex")
}
}
return emptyMap()
}

fun sendMessage(message: ByteArray?): Boolean {
var wrongSettings = false
if (pulsarSettings.serviceUrl.value.isBlank()) {
Expand All @@ -165,7 +152,7 @@ class Pulsar(
?.newMessage()
?.value(message)
?.eventTime(System.currentTimeMillis())
?.properties(properties())
?.properties(pulsarSettings.propertySettings.propertyMap(setUserFeedback))
?.send()
?.let { messageId ->
setUserFeedback("$MESSAGE_SENT_ID $messageId $ON_TOPIC $topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ class DropdownSelector(
) {
private val expanded = mutableStateOf(false)

fun getUI(currentlySelected: String): @Composable () -> Unit {
fun getUI(currentlySelected: String?, noOptionSelected: String = ""): @Composable () -> Unit {
return {
dropdownSelectorUI(
expanded = expanded.value,
currentlySelected = currentlySelected,
noOptionSelected = noOptionSelected,
options = options,
onChangeExpanded = expanded::onChange,
onSelectedOption = onSelected
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

package com.toasttab.pulseman.state

import com.fasterxml.jackson.core.type.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.KotlinModule
import com.toasttab.pulseman.AppStrings
import com.toasttab.pulseman.entities.TabValuesV3
import com.toasttab.pulseman.thirdparty.rsyntaxtextarea.RSyntaxTextArea
import org.fife.ui.rsyntaxtextarea.SyntaxConstants
Expand All @@ -36,5 +40,22 @@ class PropertyConfiguration(

val sp = RTextScrollPane(textArea)

fun propertyMap(): String = textArea.text
fun propertyText(): String = textArea.text

fun propertyMap(setUserFeedback: (String) -> Unit): Map<String, String> {
val propertiesJsonMap = textArea.text
if (textArea.text.isNotBlank()) {
try {
return mapper.readValue(propertiesJsonMap, mapTypeRef)
} catch (ex: Exception) {
setUserFeedback("${AppStrings.FAILED_TO_DESERIALIZE_PROPERTIES}=$propertiesJsonMap. ${AppStrings.EXCEPTION}=$ex")
}
}
return emptyMap()
}

companion object {
private val mapper = ObjectMapper().registerModule(KotlinModule.Builder().build())
private val mapTypeRef = object : TypeReference<Map<String, String>>() {}
}
}
15 changes: 13 additions & 2 deletions src/main/kotlin/com/toasttab/pulseman/state/ReceiveMessage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,24 @@ class ReceiveMessage(
private val pulsarSettings: PulsarSettings,
private val receivedMessages: SnapshotStateList<ReceivedMessages>,
private val messageHandling: MessageHandling,
private val runTimeJarLoader: RunTimeJarLoader
private val runTimeJarLoader: RunTimeJarLoader,
private val propertyFilter: MutableState<Pair<String, String>?>
) {
val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())

private val subscribeState = mutableStateOf(ButtonState.WAITING)
private val clearState = mutableStateOf(ButtonState.WAITING)
private val closeState = mutableStateOf(ButtonState.WAITING)

private val propertyFilterSelectorUI = DropdownSelector(
options = pulsarSettings.propertySettings.propertyMap(setUserFeedback).keys.toList(),
onSelected = { selectedKey ->
pulsarSettings.propertySettings.propertyMap(setUserFeedback)[selectedKey]?.let { selectedValue ->
propertyFilter.value = Pair(selectedKey, selectedValue)
}
}
).getUI(currentlySelected = "")

private val pulsar: MutableState<Pulsar?> = mutableStateOf(null)
private var consumer: Consumer<ByteArray>? = null

Expand Down Expand Up @@ -117,7 +127,8 @@ class ReceiveMessage(
onClear = ::onClear,
onCloseConnection = ::onCloseConnection,
receivedMessages = receivedMessages,
scrollState = stateVertical
scrollState = stateVertical,
propertyFilterSelectorUI = propertyFilterSelectorUI
)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/kotlin/com/toasttab/pulseman/state/TabState.kt
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ class TabState(
serviceUrl = pulsarSettings.serviceUrl.value,
selectedAuthClass = authSelector.selectedAuthClass.selected?.cls?.name,
authJsonParameters = authSelector.authJsonParameters(),
propertyMap = propertySettings.propertyMap(),
propertyMap = propertySettings.propertyText(),
serializationFormat = serializationFormat.value,
protobufSettings = serializationState.protobufState.toProtobufTabValues(),
textSettings = serializationState.textState.toTextTabValues(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.toasttab.pulseman.state.protocol.protobuf

import androidx.compose.foundation.ExperimentalFoundationApi
import androidx.compose.runtime.Composable
import androidx.compose.runtime.MutableState
import androidx.compose.runtime.mutableStateListOf
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.snapshots.SnapshotStateList
Expand Down Expand Up @@ -81,9 +82,11 @@ class ProtobufState(
onChange = onChange
)

private val propertyFilter: MutableState<Pair<String, String>?> = mutableStateOf(null)
private val receivedMessages: SnapshotStateList<ReceivedMessages> = mutableStateListOf()
private val messageHandling = MessageHandlingClassImpl(
selectedProtoClass = protobufSelector.selectedClass,
propertyFilter = propertyFilter.value,
receivedMessages = receivedMessages,
setUserFeedback = setUserFeedback
)
Expand All @@ -93,7 +96,8 @@ class ProtobufState(
pulsarSettings = pulsarSettings,
receivedMessages = receivedMessages,
messageHandling = messageHandling,
runTimeJarLoader = pulsarMessageJars.runTimeJarLoader
runTimeJarLoader = pulsarMessageJars.runTimeJarLoader,
propertyFilter = propertyFilter
)

private val convertProtoBufMessage = ConvertProtobufMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.toasttab.pulseman.state.protocol.text

import androidx.compose.foundation.ExperimentalFoundationApi
import androidx.compose.runtime.Composable
import androidx.compose.runtime.MutableState
import androidx.compose.runtime.mutableStateListOf
import androidx.compose.runtime.mutableStateOf
import androidx.compose.runtime.snapshots.SnapshotStateList
Expand Down Expand Up @@ -59,10 +60,12 @@ class TextState(
onChange = onChange
)

private val propertyFilter: MutableState<Pair<String, String>?> = mutableStateOf(null)
private val receivedMessages: SnapshotStateList<ReceivedMessages> = mutableStateListOf()

private val messageHandling = MessageHandlingImpl(
messageType = serializationTypeSelector.selectedEncoding,
propertyFilter = propertyFilter.value,
receivedMessages = receivedMessages,
setUserFeedback = setUserFeedback
)
Expand All @@ -72,7 +75,8 @@ class TextState(
pulsarSettings = pulsarSettings,
receivedMessages = receivedMessages,
messageHandling = messageHandling,
runTimeJarLoader = runTimeJarLoader
runTimeJarLoader = runTimeJarLoader,
propertyFilter = propertyFilter
)

fun toTextTabValues() = TextTabValuesV3(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ import com.toasttab.pulseman.AppStrings
@Composable
fun dropdownSelectorUI(
expanded: Boolean,
currentlySelected: String,
currentlySelected: String?,
noOptionSelected: String = "",
options: List<String>,
onChangeExpanded: () -> Unit,
onSelectedOption: (String) -> Unit
Expand All @@ -58,7 +59,11 @@ fun dropdownSelectorUI(
.border(width = 0.8.dp, color = Color.White.copy(alpha = 0.5f), shape = RoundedCornerShape(8.dp))
) {
Row(modifier = Modifier.background(Color.Transparent).padding(8.dp, 8.dp)) {
Text(currentlySelected)
if (currentlySelected != null) {
Text(currentlySelected)
} else {
Text(noOptionSelected)
}
Icon(Icons.Filled.ArrowDropDown, contentDescription = AppStrings.CHOOSE_OPTION)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ fun receiveMessageUI(
onClear: () -> Unit,
onCloseConnection: () -> Unit,
receivedMessages: List<ReceivedMessages>,
scrollState: ScrollState
scrollState: ScrollState,
propertyFilterSelectorUI: @Composable () -> Unit
) {
Column {
Row {
Expand Down Expand Up @@ -109,6 +110,8 @@ fun receiveMessageUI(
) {
onCloseConnection()
}

propertyFilterSelectorUI()
}
Box(modifier = Modifier.fillMaxSize()) {
Column(modifier = Modifier.verticalScroll(scrollState)) {
Expand Down
Loading