Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 76 additions & 1 deletion google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,79 @@
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
</differences>
<!-- Ignore addition of messagingBackend() method to PublisherSettings -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings</className>
<method>java.util.Optional kafkaProperties()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings</className>
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
</difference>
<!-- Ignore addition of new methods to AdminClientSettings -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>int kafkaDefaultPartitions()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>short kafkaDefaultReplicationFactor()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>java.util.Optional kafkaProperties()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
</difference>
<!-- Ignore addition of new methods to AdminClientSettings.Builder -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaDefaultPartitions(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaDefaultReplicationFactor(short)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaProperties(java.util.Map)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setMessagingBackend(com.google.cloud.pubsublite.cloudpubsub.MessagingBackend)</method>
</difference>
<!-- Ignore addition of new methods to SubscriberSettings -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings</className>
<method>java.util.Optional kafkaProperties()</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings</className>
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
</difference>
<!-- Ignore addition of new methods to SubscriberSettings.Builder -->
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings$Builder</className>
<method>com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings$Builder setKafkaProperties(java.util.Map)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings$Builder</className>
<method>com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings$Builder setMessagingBackend(com.google.cloud.pubsublite.cloudpubsub.MessagingBackend)</method>
</difference>
</differences>
29 changes: 27 additions & 2 deletions google-cloud-pubsublite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,28 @@
<version>0.8</version>
</dependency>

<!-- Apache Kafka Client (optional, for Managed Kafka backend) -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
<optional>true</optional>
</dependency>

<!-- Google Managed Kafka authentication handler -->
<dependency>
<groupId>com.google.cloud.hosted.kafka</groupId>
<artifactId>managed-kafka-auth-login-handler</artifactId>
<version>1.0.6</version>
<optional>true</optional>
<exclusions>
<exclusion>
<groupId>io.confluent</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

<!--test dependencies-->
<dependency>
<groupId>com.google.truth</groupId>
Expand Down Expand Up @@ -164,8 +186,11 @@
<ignoredUnusedDeclaredDependency>org.hamcrest:hamcrest</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>com.google.flogger:flogger-system-backend</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
<ignoredUnusedDeclaredDependency>com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler</ignoredUnusedDeclaredDependency>
</ignoredUnusedDeclaredDependencies>
<ignoredUsedUndeclaredDependencies>org.hamcrest:hamcrest-core</ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependencies>
<ignoredUsedUndeclaredDependency>org.hamcrest:hamcrest-core</ignoredUsedUndeclaredDependency>
</ignoredUsedUndeclaredDependencies>
</configuration>
</plugin>
</plugins>
Expand Down Expand Up @@ -211,4 +236,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Google LLC
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,13 @@
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.cloudpubsub.MessagingBackend;
import com.google.cloud.pubsublite.internal.AdminClientImpl;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.KafkaAdminClient;
import com.google.cloud.pubsublite.v1.AdminServiceClient;
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import java.util.Map;
import java.util.Optional;

/** Settings for construction a Pub/Sub Lite AdminClient. */
Expand All @@ -44,9 +47,24 @@ public abstract class AdminClientSettings {
/** A stub to use to connect. */
abstract Optional<AdminServiceClient> serviceClient();

/** The backend messaging system to use (e.g., PUBSUB_LITE or MANAGED_KAFKA). */
public abstract MessagingBackend messagingBackend();

/** Kafka-specific properties for when using MANAGED_KAFKA backend. */
public abstract Optional<Map<String, Object>> kafkaProperties();

/** Default number of partitions for new Kafka topics. */
public abstract int kafkaDefaultPartitions();

/** Default replication factor for new Kafka topics. */
public abstract short kafkaDefaultReplicationFactor();

public static Builder newBuilder() {
return new AutoValue_AdminClientSettings.Builder()
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS);
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS)
.setMessagingBackend(MessagingBackend.PUBSUB_LITE)
.setKafkaDefaultPartitions(3)
.setKafkaDefaultReplicationFactor((short) 1);
}

@AutoValue.Builder
Expand All @@ -64,11 +82,37 @@ public abstract static class Builder {
/** A service client to use to connect. */
public abstract Builder setServiceClient(AdminServiceClient serviceClient);

/** Set the backend messaging system to use (e.g., PUBSUB_LITE or MANAGED_KAFKA). */
public abstract Builder setMessagingBackend(MessagingBackend backend);

/** Set Kafka-specific properties for when using MANAGED_KAFKA backend. */
public abstract Builder setKafkaProperties(Map<String, Object> kafkaProperties);

/** Set default number of partitions for new Kafka topics. */
public abstract Builder setKafkaDefaultPartitions(int partitions);

/** Set default replication factor for new Kafka topics. */
public abstract Builder setKafkaDefaultReplicationFactor(short replicationFactor);

/** Build the settings object. */
public abstract AdminClientSettings build();
}

AdminClient instantiate() throws ApiException {
// For Kafka backend, use KafkaAdminClient
if (messagingBackend() == MessagingBackend.MANAGED_KAFKA) {
if (!kafkaProperties().isPresent()) {
throw new IllegalStateException(
"kafkaProperties must be set when using MANAGED_KAFKA backend");
}
return new KafkaAdminClient(
region(),
kafkaProperties().get(),
kafkaDefaultPartitions(),
kafkaDefaultReplicationFactor());
}

// For Pub/Sub Lite backend, use AdminClientImpl
AdminServiceClient serviceClient;
if (serviceClient().isPresent()) {
serviceClient = serviceClient().get();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.cloudpubsub;

import java.net.InetAddress;
import java.util.Map;

/** Utility methods for Google Managed Kafka (GMK) integration. */
public class GmkUtils {

/**
* Validates GMK bootstrap server connectivity and configuration.
*
* @param kafkaProperties The Kafka properties containing bootstrap.servers
* @return Validation result with suggestions if issues are found
*/
public static ValidationResult validateGmkConfiguration(Map<String, Object> kafkaProperties) {
String bootstrapServers = (String) kafkaProperties.get("bootstrap.servers");

if (bootstrapServers == null || bootstrapServers.trim().isEmpty()) {
return ValidationResult.error("bootstrap.servers property is missing or empty");
}

// Extract hostname from bootstrap servers
String[] servers = bootstrapServers.split(",");
for (String server : servers) {
String hostname = server.trim().split(":")[0];

// Check if it looks like a GMK hostname
if (hostname.contains("managedkafka") && hostname.contains(".cloud.goog")) {
try {
// Test DNS resolution
InetAddress.getByName(hostname);
return ValidationResult.success(
"GMK bootstrap server resolved successfully: " + hostname);
} catch (Exception e) {
return ValidationResult.error(
"Failed to resolve GMK hostname: "
+ hostname
+ ". Please verify:\n"
+ "1. The GMK cluster exists and is running\n"
+ "2. You have the correct project ID, region, and cluster name\n"
+ "3. Your network allows DNS resolution to *.cloud.goog domains\n"
+ "Use: gcloud managed-kafka clusters list --location=<region>"
+ " --project=<project>");
}
}
}

return ValidationResult.warning(
"Bootstrap servers don't appear to be GMK endpoints: " + bootstrapServers);
}

/**
* Constructs a GMK bootstrap server URL from cluster details.
*
* @param projectId GCP project ID
* @param region GCP region (e.g., "us-central1")
* @param clusterId GMK cluster ID
* @param port Port number
* @return Formatted bootstrap server URL
*/
public static String buildGmkBootstrapServer(
String projectId, String region, String clusterId, int port) {
return String.format(
"bootstrap.%s.%s.managedkafka.%s.cloud.goog:%d", clusterId, region, projectId, port);
}

/** Constructs a GMK bootstrap server URL with default port 9092. */
public static String buildGmkBootstrapServer(String projectId, String region, String clusterId) {
return buildGmkBootstrapServer(projectId, region, clusterId, 9092);
}

/** Result of configuration validation. */
public static class ValidationResult {
private final boolean success;
private final String message;
private final Level level;

private ValidationResult(boolean success, String message, Level level) {
this.success = success;
this.message = message;
this.level = level;
}

public static ValidationResult success(String message) {
return new ValidationResult(true, message, Level.SUCCESS);
}

public static ValidationResult error(String message) {
return new ValidationResult(false, message, Level.ERROR);
}

public static ValidationResult warning(String message) {
return new ValidationResult(false, message, Level.WARNING);
}

public boolean isSuccess() {
return success;
}

public String getMessage() {
return message;
}

public Level getLevel() {
return level;
}

public enum Level {
SUCCESS,
WARNING,
ERROR
}

@Override
public String toString() {
return level + ": " + message;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.cloudpubsub;

/** Specifies the messaging backend to use for Publisher and Subscriber clients. */
public enum MessagingBackend {
/**
* Use Google Cloud Pub/Sub Lite (default). This is the traditional backend with zonal storage and
* predictable pricing.
*/
PUBSUB_LITE,

/**
* Use Google Cloud Managed Service for Apache Kafka. Provides Kafka-compatible API with Google
* Cloud management.
*/
MANAGED_KAFKA
}
Loading
Loading