diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1776807 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +# Build stage +FROM maven:3.9.11-ibm-semeru-17-noble AS build + +WORKDIR /app +COPY pom.xml . +COPY package.json . +COPY src ./src +COPY ui ./ui + +# Build the JAR (this will run frontend-maven-plugin too) +RUN mvn clean package -DskipTests + +# Runtime stage +FROM ibm-semeru-runtimes:open-17-jdk + +WORKDIR /app +COPY kafka.properties kafka.properties +COPY --from=build /app/target/demo-all.jar app.jar + +EXPOSE 8080 + +ENTRYPOINT ["java", "-jar", "app.jar"] diff --git a/docker-compose.yml b/docker-compose.yml index efd84ae..40ba105 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,29 +1,52 @@ -version: '2' services: - zookeeper: - image: strimzi/zookeeper - command: [ - "sh", "-c", - "bin/zookeeper-server-start.sh config/zookeeper.properties" - ] + broker: + image: apache/kafka:4.0.0 + container_name: broker + hostname: broker ports: - - "2181:2181" + - "9092:9092" environment: - LOG_DIR: /tmp/logs + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://:9092,PLAINTEXT_HOST://:29092,CONTROLLER://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:9093 + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_CONFIG_DIR: /var/lib/kafka-config + KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" + healthcheck: + test: ["CMD", "/opt/kafka/bin/kafka-broker-api-versions.sh", "--bootstrap-server", "localhost:9092"] + interval: 5s + timeout: 10s + retries: 3 + start_period: 3s + + create-topics: + image: apache/kafka:4.0.0 + container_name: create-topics + depends_on: + broker: + condition: service_healthy + entrypoint: ["/bin/sh", "-c"] + command: | + " + /opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic __consumer_offsets --bootstrap-server broker:9092 --partitions 1 --replication-factor 1 + /opt/kafka/bin/kafka-topics.sh --create --if-not-exists --topic demo --bootstrap-server broker:9092 --partitions 1 --replication-factor 1 + " + restart: "no" - kafka: - image: strimzi/kafka - command: [ - "sh", "-c", - "bin/kafka-server-start.sh config/server.properties --override listeners=$${KAFKA_LISTENERS} --override advertised.listeners=$${KAFKA_ADVERTISED_LISTENERS} --override zookeeper.connect=$${KAFKA_ZOOKEEPER_CONNECT}" - ] + + app: + build: . + container_name: demo-app depends_on: - - zookeeper + broker: + condition: service_healthy + create-topics: + condition: service_completed_successfully ports: - - "9092:9092" - environment: - LOG_DIR: "/tmp/logs" - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + - "8080:8080" + restart: always diff --git a/kafka.properties b/kafka.properties index 5905f0f..13e2b0b 100644 --- a/kafka.properties +++ b/kafka.properties @@ -1,4 +1,4 @@ -bootstrap.servers=localhost:9092 +bootstrap.servers=broker:9092 ## Optional topic configuration - otherwise default value will be chosen # topic= diff --git a/src/main/java/kafka/vertx/demo/PeriodicProducer.java b/src/main/java/kafka/vertx/demo/PeriodicProducer.java index a862a69..fc7f075 100644 --- a/src/main/java/kafka/vertx/demo/PeriodicProducer.java +++ b/src/main/java/kafka/vertx/demo/PeriodicProducer.java @@ -7,7 +7,6 @@ import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; -import io.vertx.core.TimeoutStream; import io.vertx.core.eventbus.Message; import io.vertx.core.json.JsonObject; import io.vertx.kafka.client.producer.KafkaProducer; @@ -16,54 +15,84 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; +import java.time.Duration; +import java.util.Map; +import java.util.stream.Collectors; public class PeriodicProducer extends AbstractVerticle { private static final Logger logger = LoggerFactory.getLogger(PeriodicProducer.class); - private String customMessage; + private static final long PRODUCE_INTERVAL_MS = Duration.ofSeconds(2).toMillis(); + + private KafkaProducer kafkaProducer; + private static final long TIMER_NOT_SET = -1L; + private long timerId = TIMER_NOT_SET; + private String customMessage = "Hello World"; @Override public void start(Promise startPromise) { - String propertiesPath = System.getProperty(Main.PROPERTIES_PATH_ENV_NAME, Main.DEFAULT_PROPERTIES_PATH); + String propertiesPath = System.getProperty( + Main.PROPERTIES_PATH_ENV_NAME, + Main.DEFAULT_PROPERTIES_PATH + ); + Main.loadKafkaConfig(vertx, propertiesPath) .onSuccess(config -> { - HashMap props = config.mapTo(HashMap.class); - setup(props); + setup(config); startPromise.complete(); }) .onFailure(startPromise::fail); } - private void setup(HashMap props) { - // Don't retry and only wait 10 secs for partition info as this is a demo app + private void setup(JsonObject config) { + // Convert JsonObject config -> Map + Map props = config.getMap() + .entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + e -> String.valueOf(e.getValue()) + )); + props.put(ProducerConfig.RETRIES_CONFIG, "0"); props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000"); - KafkaProducer kafkaProducer = KafkaProducer.create(vertx, props); - kafkaProducer.exceptionHandler(err -> logger.debug("Kafka error: {}", err)); + kafkaProducer = KafkaProducer.create(vertx, props); + kafkaProducer.exceptionHandler(err -> logger.error("Kafka producer error", err)); - TimeoutStream timerStream = vertx.periodicStream(2000); - timerStream.handler(tick -> produceKafkaRecord(kafkaProducer, props.get(Main.TOPIC_KEY))); - timerStream.pause(); + vertx.eventBus() + .consumer(Main.PERIODIC_PRODUCER_ADDRESS, + msg -> handleCommand(props, msg)); - vertx.eventBus().consumer(Main.PERIODIC_PRODUCER_ADDRESS, message -> handleCommand(timerStream, message)); logger.info("🚀 PeriodicProducer started"); } - private void handleCommand(TimeoutStream timerStream, Message message) { + private void handleCommand(Map props, Message message) { String command = message.body().getString(WebSocketServer.ACTION, "none"); - if (WebSocketServer.START_ACTION.equals(command)) { - logger.info("Producing Kafka records"); - customMessage = message.body().getString("custom", "Hello World"); - timerStream.resume(); - } else if (WebSocketServer.STOP_ACTION.equals(command)) { - logger.info("Stopping producing Kafka records"); - timerStream.pause(); + switch (command) { + case WebSocketServer.START_ACTION: + customMessage = message.body().getString("custom", "Hello World"); + if (timerId == TIMER_NOT_SET) { + timerId = vertx.setPeriodic(PRODUCE_INTERVAL_MS, + id -> produceKafkaRecord(props.get(Main.TOPIC_KEY))); + logger.info("Producing Kafka records with message template: {}", customMessage); + } + break; + + case WebSocketServer.STOP_ACTION: + if (timerId != TIMER_NOT_SET) { + vertx.cancelTimer(timerId); + timerId = TIMER_NOT_SET; + logger.info("Stopped producing Kafka records"); + } + break; + + default: + logger.warn("Unknown command received: {}", command); } } - private void produceKafkaRecord(KafkaProducer kafkaProducer, String topic) { + private void produceKafkaRecord(String topic) { String payload = customMessage; KafkaProducerRecord record = KafkaProducerRecord.create(topic, payload); logger.debug("Producing record to topic {} with payload {}", topic, payload); @@ -84,4 +113,12 @@ private void produceKafkaRecord(KafkaProducer kafkaProducer, Str vertx.eventBus().send(Main.PERIODIC_PRODUCER_BROADCAST, new JsonObject().put("status", "ERROR")); }); } + + @Override + public void stop() { + if (kafkaProducer != null) { + kafkaProducer.close() + .onComplete(ar -> logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : ar.cause())); + } + } } diff --git a/src/main/java/kafka/vertx/demo/WebSocketServer.java b/src/main/java/kafka/vertx/demo/WebSocketServer.java index fe36443..9c02c09 100644 --- a/src/main/java/kafka/vertx/demo/WebSocketServer.java +++ b/src/main/java/kafka/vertx/demo/WebSocketServer.java @@ -17,14 +17,12 @@ import io.vertx.ext.web.Router; import io.vertx.ext.web.handler.StaticHandler; import io.vertx.ext.web.templ.thymeleaf.ThymeleafTemplateEngine; -import io.vertx.kafka.client.common.TopicPartition; import io.vertx.kafka.client.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; -import java.util.Set; public class WebSocketServer extends AbstractVerticle { @@ -70,7 +68,7 @@ private Future createRouterAndStartServer(JsonObject config) { JsonObject props = new JsonObject(); String topic = config.getString("topic"); - + props.put("topic", topic); props.put("producerPath", PRODUCE_PATH); props.put("consumerPath", CONSUME_PATH); @@ -94,7 +92,7 @@ private Future startWebSocket(Router router) { return vertx.createHttpServer(new HttpServerOptions().setRegisterWebSocketWriteHandlers(true)) .requestHandler(router) .webSocketHandler(this::handleWebSocket) - .listen(8080) + .listen(8080, "0.0.0.0") .onSuccess(ok -> logger.info("🚀 WebSocketServer started")) .onFailure(err -> logger.error("❌ WebSocketServer failed to listen", err)); } @@ -140,11 +138,8 @@ private void handleProduceSocket(ServerWebSocket webSocket) { private void handleConsumeSocket(ServerWebSocket webSocket) { KafkaConsumer kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig); - kafkaConsumer.exceptionHandler(err -> logger.error("Kafka error", err)); - String topic = kafkaConfig.get(Main.TOPIC_KEY); - TopicPartition topicPartition = new TopicPartition().setTopic(topic); kafkaConsumer.handler(record -> { JsonObject payload = new JsonObject() @@ -157,16 +152,21 @@ private void handleConsumeSocket(ServerWebSocket webSocket) { vertx.eventBus().send(webSocket.textHandlerID(), payload.encode()); }); + kafkaConsumer.subscribe(topic) + .onSuccess(v -> { + logger.info("Subscribed to {}", topic); + }) + .onFailure(err -> logger.error("Could not subscribe to {}", topic, err)); + webSocket.handler(buffer -> { String action = buffer.toJsonObject().getString(ACTION, "none"); + if (START_ACTION.equals(action)) { - kafkaConsumer.subscription() - .compose(sub -> (sub.size() > 0) ? kafkaConsumer.resume(topicPartition) : kafkaConsumer.subscribe(topic)) - .onSuccess(ok -> logger.info("Subscribed to {}", topic)) - .onFailure(err -> logger.error("Could not subscribe to {}", topic, err)); + kafkaConsumer.resume(); + logger.info("Consumer resumed"); } else if (STOP_ACTION.equals(action)) { - kafkaConsumer.pause(topicPartition) - .onFailure(err -> logger.error("Cannot pause consumer", err)); + kafkaConsumer.pause(); + logger.info("Consumer paused"); } });