This is a test client for validating the Google Managed Kafka publishing functionality using the modified Pub/Sub Lite Java client.
-
Java 11 or higher
java -version
-
Maven 3.6 or higher
mvn -version
-
Modified Pub/Sub Lite Library
- The modified
java-pubsublitelibrary must be built and installed locally - Navigate to the
java-pubsublite-maindirectory and run:cd ../java-pubsublite-main mvn clean install -DskipTests
- The modified
-
Google Managed Kafka Cluster (or local Kafka for testing)
- Kafka broker(s) accessible and running
- Topic created (or auto-creation enabled)
- Appropriate credentials configured if using authentication
Edit src/main/java/com/google/cloud/kafka/test/KafkaPublisherTest.java and update these variables:
private static final String BOOTSTRAP_SERVERS = "your-kafka-broker:9092";
private static final String TOPIC_NAME = "your-topic-name";
private static final long PROJECT_NUMBER = 123456789L; // Your GCP project number
private static final String CLOUD_REGION = "us-central1";
private static final int MESSAGE_COUNT = 10; // Number of test messagesmvn clean packagemvn exec:javaOr using the JAR directly:
java -cp target/kafka-publisher-test-client-1.0.0.jar com.google.cloud.kafka.test.KafkaPublisherTestThe minimum required configuration:
BOOTSTRAP_SERVERS: Kafka broker address(es)TOPIC_NAME: Name of the Kafka topic to publish to
You can configure additional Kafka producer properties in the kafkaProperties map:
kafkaProperties.put("acks", "all"); // Wait for all replicas
kafkaProperties.put("compression.type", "gzip"); // Enable compression
kafkaProperties.put("max.in.flight.requests.per.connection", 5);
kafkaProperties.put("retries", 3);
kafkaProperties.put("batch.size", 16384);
kafkaProperties.put("linger.ms", 10);For Google Managed Kafka with authentication, add:
kafkaProperties.put("security.protocol", "SASL_SSL");
kafkaProperties.put("sasl.mechanism", "OAUTHBEARER");
kafkaProperties.put("sasl.jaas.config", "...");
kafkaProperties.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler");-
Start Kafka with Docker Compose:
docker-compose up -d
Use this
docker-compose.yml:version: '3' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - "2181:2181" kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
-
Create a topic (optional if auto-creation is enabled):
docker exec -it <kafka-container-id> kafka-topics --create \ --topic test-topic \ --bootstrap-server localhost:9092 \ --partitions 3 \ --replication-factor 1
-
Run the test client with default configuration (localhost:9092)
-
Verify messages were published:
docker exec -it <kafka-container-id> kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic test-topic \ --from-beginning
When running successfully, you should see:
=== Google Managed Kafka Publisher Test ===
Configuration:
Bootstrap Servers: localhost:9092
Topic: test-topic
Messages: 10
Creating publisher...
Starting publisher...
✓ Publisher started successfully
Publishing messages...
[0] Published: Test message #0 - Timestamp: 2024-10-09T...
[1] Published: Test message #1 - Timestamp: 2024-10-09T...
...
Waiting for acknowledgements...
=== Results ===
✓ Successfully published 10 messages
Message IDs (partition:offset):
[0] 0:123
[1] 1:456
...
Testing flush...
✓ Flush completed
Shutting down publisher...
✓ Publisher shut down
✅ Test completed successfully!
Error: Connection refused
Solution: Verify Kafka broker is running and accessible at the configured address.
Error: Authentication failed
Solution: Check your SASL/SSL configuration and credentials.
Error: Topic 'test-topic' not found
Solution: Create the topic manually or enable auto-creation on your Kafka cluster.
Error: Timeout expired after 30000ms
Solution:
- Increase
request.timeout.msin Kafka properties - Check network connectivity to Kafka brokers
- Verify broker configuration
Error: Could not find artifact com.google.cloud:google-cloud-pubsublite
Solution: Build and install the modified Pub/Sub Lite library locally:
cd ../java-pubsublite-main
mvn clean install -DskipTests-
Check message count in Kafka:
kafka-run-class kafka.tools.GetOffsetShell \ --broker-list localhost:9092 \ --topic test-topic
-
Consume and verify messages:
kafka-console-consumer \ --bootstrap-server localhost:9092 \ --topic test-topic \ --from-beginning \ --property print.headers=true \ --property print.timestamp=true
-
Check message attributes (headers):
message-index: Message sequence numbertimestamp: Message creation timesource: Should be "kafka-publisher-test-client"
Edit the createTestMessage() method to customize message format:
private static String createTestMessage(int index) {
return String.format("{\"id\": %d, \"data\": \"custom\"}", index);
}Modify the message creation to include custom headers:
PubsubMessage pubsubMessage =
PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(messageData))
.putAttributes("custom-header", "custom-value")
.build();Increase MESSAGE_COUNT to test throughput:
private static final int MESSAGE_COUNT = 10000;
private static final boolean VERBOSE = false; // Disable verbose outputThe test client uses:
- PublisherSettings.MessagingBackend.MANAGED_KAFKA: Routes to Kafka implementation
- KafkaPublisher: Wraps Apache Kafka Producer
- PubsubMessage → ProducerRecord conversion: Maps Pub/Sub message format to Kafka
Message flow:
PubsubMessage → KafkaPublisher → KafkaProducer → Kafka Broker
Copyright 2024 Google LLC
Licensed under the Apache License, Version 2.0.