Skip to content

Commit 40ec868

Browse files
author
Aliaksandr.Shpak
committed
- A number of fixes
1 parent 2b67d10 commit 40ec868

File tree

9 files changed

+93
-51
lines changed

9 files changed

+93
-51
lines changed

build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ dependencies {
3535
compile 'org.yaml:snakeyaml:1.23' // Apache 2
3636

3737
testCompile "junit:junit:4.4"
38+
testCompile "org.projectlombok:lombok:1.16.12"
3839
}
3940

4041
task createProperties(dependsOn: processResources) {
@@ -72,6 +73,9 @@ sourceSets {
7273
scala {
7374
srcDirs = ['src/test']
7475
}
76+
java {
77+
srcDirs = ['src/test']
78+
}
7579
}
7680
}
7781

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
1-
version = 0.1.5
1+
version = 0.1.6
22
url = https://api.github.com/repos/alshpak/kafka_data_viewer/releases/latest
33
update_url = https://github.com/alshpak/kafka_data_viewer/releases

src/main/scala/devtools/kafka_data_viewer/KafkaDataViewer.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class KafkaDataViewerAppPane(val layoutData: String = "",
3535
val connector = new KafkaConnector(
3636
host = connDef.kafkaHost.value)
3737

38+
//val connector = StubConnector.stubConnector()
39+
3840
ConnectionsSet(logging = connector.connectConsumer(), read = connector.connectConsumer(), master = connector.connectConsumer(), producer = connector.connectProducer())
3941
}
4042

@@ -79,7 +81,9 @@ class KafkaDataViewerAppPane(val layoutData: String = "",
7981

8082
for (connectResultOpt <- $(connectionDone); connectResult <- connectResultOpt) {
8183
connectResult match {
82-
case Right(connHndl) => connectOps << AddItems(Seq(connHndl)); selected << connHndl
84+
case Right(connHndl) =>
85+
connectOps << AddItems(Seq(connHndl))
86+
selected << connHndl
8387
case Left(error) => uiRenderer.alert(ErrorAlert, "Connection in cancelled; " + error)
8488
}
8589
}

src/main/scala/devtools/kafka_data_viewer/kafkaconn/MessageFormats.scala

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,16 @@ package devtools.kafka_data_viewer.kafkaconn
22

33
import java.io._
44
import java.util.zip.{GZIPInputStream, GZIPOutputStream}
5+
import scala.language.postfixOps
56

6-
import devtools.kafka_data_viewer.kafkaconn.Connector.withClosable
77
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
88
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer}
99
import org.apache.avro.generic.GenericData
1010
import org.apache.avro.reflect.ReflectData
1111
import org.apache.avro.{Conversions, Schema}
1212
import tech.allegro.schema.json2avro.converter.JsonAvroConverter
1313

14-
import scala.language.postfixOps
14+
import devtools.kafka_data_viewer.kafkaconn.Connector.withClosable
1515

1616
object MessageFormats {
1717

@@ -34,7 +34,7 @@ object MessageFormats {
3434
}
3535

3636
case class AvroMessage(registry: String) extends MessageType {
37-
override def formatter(): MessageFormat = new AvroMessageFormat(registry)
37+
override def formatter(): MessageFormat = AvroRegistries(registry)
3838

3939
override def display: String = "Avro:" + registry
4040
}
@@ -78,28 +78,38 @@ object GZipMessageFormat extends MessageFormat {
7878

7979
}
8080

81-
class AvroMessageFormat(avroServer: String) extends MessageFormat {
81+
object AvroRegistries {
82+
83+
ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion())
8284

83-
private val schemaClient = Option(avroServer).map(host => {
84-
ReflectData.get().addLogicalTypeConversion(new Conversions.DecimalConversion())
85-
new CachedSchemaRegistryClient(host, 100)
86-
})
87-
private val registryDeser = schemaClient.map(client => new KafkaAvroDeserializer(client))
85+
private val registriesCache = scala.collection.mutable.HashMap[String, AvroMessageFormat]()
86+
87+
def apply(avroServer: String): AvroMessageFormat = {
88+
if (!registriesCache.contains(avroServer)) {
89+
registriesCache += avroServer -> new AvroMessageFormat(avroServer)
90+
}
91+
registriesCache(avroServer)
92+
}
93+
}
94+
95+
class AvroMessageFormat(avroServer: String) extends MessageFormat {
8896

97+
private val schemaClient = new CachedSchemaRegistryClient(avroServer, 100)
98+
private val registryDeser = new KafkaAvroDeserializer(schemaClient)
8999

90100
override def encode(topic: String, message: String): Array[Byte] = {
91-
val schemaMeta = schemaClient.get.getLatestSchemaMetadata(topic)
101+
val schemaMeta = schemaClient.getLatestSchemaMetadata(topic)
92102
val schema = new Schema.Parser().parse(schemaMeta.getSchema)
93103

94104
val converter = new JsonAvroConverter()
95105
val record = converter.convertToGenericDataRecord(message.getBytes, schema)
96-
val bytes = new KafkaAvroSerializer(schemaClient.get).serialize(topic, record)
106+
val bytes = new KafkaAvroSerializer(schemaClient).serialize(topic, record)
97107
bytes
98108
}
99109

100110
override def decode(topic: String, bytes: Array[Byte]): String =
101111
Option(bytes).map(bytes => try {
102-
val record = registryDeser.get.deserialize(topic, bytes).asInstanceOf[GenericData.Record]
112+
val record = registryDeser.deserialize(topic, bytes).asInstanceOf[GenericData.Record]
103113
val converter = new JsonAvroConverter()
104114
val json = new String(converter.convertToJson(record), "UTF8")
105115
json

src/main/scala/devtools/kafka_data_viewer/kafkaconn/StubConnector.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ object StubConnector {
4343

4444
override def close(): Unit = Unit
4545

46-
override def queryTopicPartitions(topic: String): Seq[Int] = ???
46+
override def queryTopicPartitions(topic: String): Seq[Int] = Seq(0)
4747
}
4848

4949
override def connectProducer(): ProducerConnection = new ProducerConnection {

src/main/scala/devtools/kafka_data_viewer/ui/KafkaTopicInfoPanes.scala

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,6 @@ import devtools.lib.rxext.{BehaviorSubject, Observable, Subject}
1818
import devtools.lib.rxui.UiImplicits._
1919
import devtools.lib.rxui._
2020

21-
/*
22-
class TypesRegistry(
23-
private val topicToType: BehaviorSubject[Seq[(String, MessageType)]],
24-
private val avroRegistires: BehaviorSubject[Seq[String]]) {
25-
26-
private var topicToTypeCache = Map[String, MessageType]()
27-
for (items <- topicToType) topicToTypeCache = items.toMap
28-
29-
def listRegistries(): Seq[String] = avroRegistires.value
30-
31-
def addRegistry(registry: String): Unit = avroRegistires << avroRegistires.value :+ registry
32-
33-
def encoder(topic: String): MessageType = topicToTypeCache.getOrElse(topic, StringMessage)
34-
35-
def changeType(topic: String, messageType: MessageType): Unit = {
36-
topicToType << topicToType.value.filterNot(_._1 == topic) :+ (topic -> messageType)
37-
}
38-
}
39-
*/
40-
4121
class TopicsInfoList(val topicsWithSizes: Observable[TopicsWithSizes], onRefresh: => Unit) {
4222
def requestRefresh(): Unit = onRefresh
4323
}
@@ -80,7 +60,7 @@ class RecordsOutputTablePane(val layoutData: String,
8060
(builtRecords <<< records.map(recsSeq => recsSeq.map(DecodingFunction.decode(encFunc)))) ($)
8161

8262
for ((_, recsSeq) <- $(refreshData.withLatestFrom(recordsCache))) {
83-
builtRecords << ResetLogOp()
63+
builtRecords << ResetLogOp()
8464
builtRecords << AppendLogOp(recsSeq.map(DecodingFunction.decode(encFunc)))
8565
}
8666

src/main/scala/devtools/kafka_data_viewer/ui/KafkaTopicProducerPanes.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,13 +138,12 @@ class ProduceMessagePane(val layoutData: String = "",
138138
.map(partitions => defaultPartitioner +: partitions.map(_.toString))
139139
.withCachedLatest()
140140

141-
private val partition = behaviorSubject[String](defaultPartitioner)
142-
(partition <<< partitionsList.map(_ => defaultPartitioner)) ($)
141+
private val partition = behaviorSubject[String](settings.partition.value.map(_.toString).getOrElse(defaultPartitioner))
143142

144143
settings.custom << (topicData == CustomTopic)
145144
(settings.topic <<< topicName) ($)
146145
(settings.msgType <<< selectedMessageType.value) ($)
147-
(settings.partition <<< partition.map(partitionName => if (partitionName == defaultPartitioner) None else partitionName.toInt)) ($)
146+
(settings.partition <<< partition.map(partitionName => if (partitionName == defaultPartitioner) None else Some(partitionName.toInt))) ($)
148147

149148
private val key = settings.key
150149
private val message = settings.value

src/main/scala/devtools/lib/rxui/FxRender.scala

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,8 @@ object FxRender {
250250
col.setCellValueFactory((param: TableColumn.CellDataFeatures[T, String]) => {
251251
applying(new SimpleStringProperty()) { prop =>
252252
val rowModel: T = param.getValue
253-
val $row$ = new DisposeStore()
254-
rowObservables += rowModel -> $row$
255-
for (value <- $row$(colModel.value(param.getValue))) prop.setValue(value)
253+
val $row$ = rowObservables(rowModel)
254+
for (value <- $row$(colModel.value(rowModel))) prop.setValue(value)
256255
}
257256
})
258257
col.setSortable(colModel.onSort.isDefined)
@@ -268,19 +267,32 @@ object FxRender {
268267
}
269268

270269
for (itemsOp <- $(tableModel.items)) itemsOp match {
271-
case SetList(items) => tableView.getItems.setAll(items.asJavaCollection)
272-
case AddItems(items) => tableView.getItems.addAll(items.asJavaCollection)
273-
case InsertItems(index, items) => tableView.getItems.addAll(index, items.asJavaCollection)
274-
case RemoveItems(index, amount) => tableView.getItems.remove(index, index + amount)
275-
case RemoveItemObjs(items) => tableView.getItems.removeAll(items: _*)
270+
case SetList(items) =>
271+
tableView.getItems.clear()
272+
items.foreach(rowObservables += _ -> new DisposeStore())
273+
tableView.getItems.setAll(items.asJavaCollection)
274+
case AddItems(items) =>
275+
items.foreach(rowObservables += _ -> new DisposeStore())
276+
tableView.getItems.addAll(items.asJavaCollection)
277+
case InsertItems(index, items) =>
278+
items.foreach(rowObservables += _ -> new DisposeStore())
279+
tableView.getItems.addAll(index, items.asJavaCollection)
280+
case RemoveItems(index, amount) =>
281+
tableView.getItems.remove(index, index + amount)
282+
case RemoveItemObjs(items) =>
283+
tableView.getItems.removeAll(items.asJava)
276284
}
277285

278286
tableView.getItems.addListener(new ListChangeListener[T] {
279287
override def onChanged(c: ListChangeListener.Change[_ <: T]): Unit = {
280288
while (c.next()) {
281289
c.getRemoved.forEach { rowModel =>
282-
rowObservables(rowModel).dispose()
283-
rowObservables -= rowModel
290+
if (!rowObservables.contains(rowModel)) {
291+
println("ERROR: row does not exists " + rowModel)
292+
} else {
293+
rowObservables(rowModel).dispose()
294+
rowObservables -= rowModel
295+
}
284296
}
285297
}
286298
}
@@ -513,18 +525,31 @@ object FxRender {
513525
tabToRenderer.foreach { case (_, renderer) => renderer.dispose() }
514526
tabToRenderer.clear()
515527
pane.getTabs.clear()
516-
tabs.foreach { tabModel =>
528+
529+
def addTab(tabModel: UiTab, delayContent: Boolean): Unit = {
517530
val tab = new Tab()
518531
for (label <- $(tabModel.label)) tab.setText(label)
519532
for (content <- $(tabModel.content)) {
520533
if (tabToRenderer.contains(tab)) tabToRenderer(tab).dispose()
521534
val tabRenderer = renderers.renderer(content)
522-
tab.setContent(tabRenderer.render())
535+
536+
if (delayContent)
537+
new Thread(() => {
538+
Thread.sleep(100L)
539+
Platform.runLater(() => tab.setContent(tabRenderer.render()))
540+
}).start()
541+
else tab.setContent(tabRenderer.render())
542+
523543
tabToRenderer += tab -> tabRenderer
524544
}
525545
tab.setClosable(false)
526546
pane.getTabs.add(tab)
527547
}
548+
549+
if (tabs.nonEmpty) {
550+
addTab(tabs.head, delayContent = false)
551+
tabs.tail.foreach(tab => addTab(tab, delayContent = true))
552+
}
528553
}
529554
pane
530555
}

src/test/TestFxPerfApp.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import javafx.application.Application;
2+
import javafx.scene.layout.GridPane;
3+
import javafx.stage.Stage;
4+
import lombok.val;
5+
6+
public class TestFxPerfApp extends Application {
7+
8+
@Override
9+
public void start(Stage primaryStage) throws Exception {
10+
11+
val rootPane = new GridPane();
12+
13+
14+
}
15+
16+
public static void main(String[] args) {
17+
18+
launch();
19+
}
20+
}

0 commit comments

Comments
 (0)