Skip to content

Commit 92ae351

Browse files
committed
feat: add subscriber implementation as well as admin,cursor, and topicstats adjustments
1 parent d981d02 commit 92ae351

File tree

15 files changed

+2219
-19
lines changed

15 files changed

+2219
-19
lines changed

google-cloud-pubsublite/clirr-ignored-differences.xml

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,79 @@
5656
<className>com/google/cloud/pubsublite/internal/**</className>
5757
<method>*</method>
5858
</difference>
59-
</differences>
59+
<!-- Ignore addition of messagingBackend() method to PublisherSettings -->
60+
<difference>
61+
<differenceType>7013</differenceType>
62+
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings</className>
63+
<method>java.util.Optional kafkaProperties()</method>
64+
</difference>
65+
<difference>
66+
<differenceType>7013</differenceType>
67+
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings</className>
68+
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
69+
</difference>
70+
<!-- Ignore addition of new methods to AdminClientSettings -->
71+
<difference>
72+
<differenceType>7013</differenceType>
73+
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
74+
<method>int kafkaDefaultPartitions()</method>
75+
</difference>
76+
<difference>
77+
<differenceType>7013</differenceType>
78+
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
79+
<method>short kafkaDefaultReplicationFactor()</method>
80+
</difference>
81+
<difference>
82+
<differenceType>7013</differenceType>
83+
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
84+
<method>java.util.Optional kafkaProperties()</method>
85+
</difference>
86+
<difference>
87+
<differenceType>7013</differenceType>
88+
<className>com/google/cloud/pubsublite/AdminClientSettings</className>
89+
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
90+
</difference>
91+
<!-- Ignore addition of new methods to AdminClientSettings.Builder -->
92+
<difference>
93+
<differenceType>7013</differenceType>
94+
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
95+
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaDefaultPartitions(int)</method>
96+
</difference>
97+
<difference>
98+
<differenceType>7013</differenceType>
99+
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
100+
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaDefaultReplicationFactor(short)</method>
101+
</difference>
102+
<difference>
103+
<differenceType>7013</differenceType>
104+
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
105+
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setKafkaProperties(java.util.Map)</method>
106+
</difference>
107+
<difference>
108+
<differenceType>7013</differenceType>
109+
<className>com/google/cloud/pubsublite/AdminClientSettings$Builder</className>
110+
<method>com.google.cloud.pubsublite.AdminClientSettings$Builder setMessagingBackend(com.google.cloud.pubsublite.cloudpubsub.MessagingBackend)</method>
111+
</difference>
112+
<!-- Ignore addition of new methods to SubscriberSettings -->
113+
<difference>
114+
<differenceType>7013</differenceType>
115+
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings</className>
116+
<method>java.util.Optional kafkaProperties()</method>
117+
</difference>
118+
<difference>
119+
<differenceType>7013</differenceType>
120+
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings</className>
121+
<method>com.google.cloud.pubsublite.cloudpubsub.MessagingBackend messagingBackend()</method>
122+
</difference>
123+
<!-- Ignore addition of new methods to SubscriberSettings.Builder -->
124+
<difference>
125+
<differenceType>7013</differenceType>
126+
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings$Builder</className>
127+
<method>com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings$Builder setKafkaProperties(java.util.Map)</method>
128+
</difference>
129+
<difference>
130+
<differenceType>7013</differenceType>
131+
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings$Builder</className>
132+
<method>com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings$Builder setMessagingBackend(com.google.cloud.pubsublite.cloudpubsub.MessagingBackend)</method>
133+
</difference>
134+
</differences>

google-cloud-pubsublite/pom.xml

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,41 @@
126126
<version>3.7.0</version>
127127
<optional>true</optional>
128128
</dependency>
129-
129+
130+
<!-- Google Managed Kafka authentication handler -->
131+
<dependency>
132+
<groupId>com.google.cloud.hosted.kafka</groupId>
133+
<artifactId>managed-kafka-auth-login-handler</artifactId>
134+
<version>1.0.6</version>
135+
<optional>true</optional>
136+
<exclusions>
137+
<exclusion>
138+
<groupId>io.confluent</groupId>
139+
<artifactId>*</artifactId>
140+
</exclusion>
141+
</exclusions>
142+
</dependency>
143+
144+
<!-- Test dependencies for Kafka integration tests -->
145+
<dependency>
146+
<groupId>org.testcontainers</groupId>
147+
<artifactId>testcontainers</artifactId>
148+
<version>1.19.3</version>
149+
<scope>test</scope>
150+
</dependency>
151+
<dependency>
152+
<groupId>org.testcontainers</groupId>
153+
<artifactId>kafka</artifactId>
154+
<version>1.19.3</version>
155+
<scope>test</scope>
156+
</dependency>
157+
<dependency>
158+
<groupId>org.testcontainers</groupId>
159+
<artifactId>junit-jupiter</artifactId>
160+
<version>1.19.3</version>
161+
<scope>test</scope>
162+
</dependency>
163+
130164
<!--test dependencies-->
131165
<dependency>
132166
<groupId>com.google.truth</groupId>
@@ -172,8 +206,12 @@
172206
<ignoredUnusedDeclaredDependency>org.hamcrest:hamcrest</ignoredUnusedDeclaredDependency>
173207
<ignoredUnusedDeclaredDependency>com.google.flogger:flogger-system-backend</ignoredUnusedDeclaredDependency>
174208
<ignoredUnusedDeclaredDependency>javax.annotation:javax.annotation-api</ignoredUnusedDeclaredDependency>
209+
<ignoredUnusedDeclaredDependency>com.google.cloud.hosted.kafka:managed-kafka-auth-login-handler</ignoredUnusedDeclaredDependency>
175210
</ignoredUnusedDeclaredDependencies>
176-
<ignoredUsedUndeclaredDependencies>org.hamcrest:hamcrest-core</ignoredUsedUndeclaredDependencies>
211+
<ignoredUsedUndeclaredDependencies>
212+
<ignoredUsedUndeclaredDependency>org.hamcrest:hamcrest-core</ignoredUsedUndeclaredDependency>
213+
<ignoredUsedUndeclaredDependency>org.testcontainers:testcontainers</ignoredUsedUndeclaredDependency>
214+
</ignoredUsedUndeclaredDependencies>
177215
</configuration>
178216
</plugin>
179217
</plugins>
@@ -219,4 +257,4 @@
219257
</plugin>
220258
</plugins>
221259
</build>
222-
</project>
260+
</project>

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/AdminClientSettings.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@
2121
import com.google.api.gax.retrying.RetrySettings;
2222
import com.google.api.gax.rpc.ApiException;
2323
import com.google.auto.value.AutoValue;
24+
import com.google.cloud.pubsublite.cloudpubsub.MessagingBackend;
2425
import com.google.cloud.pubsublite.internal.AdminClientImpl;
2526
import com.google.cloud.pubsublite.internal.ExtractStatus;
27+
import com.google.cloud.pubsublite.internal.KafkaAdminClient;
2628
import com.google.cloud.pubsublite.v1.AdminServiceClient;
2729
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
30+
import java.util.Map;
2831
import java.util.Optional;
2932

3033
/** Settings for construction a Pub/Sub Lite AdminClient. */
@@ -44,9 +47,24 @@ public abstract class AdminClientSettings {
4447
/** A stub to use to connect. */
4548
abstract Optional<AdminServiceClient> serviceClient();
4649

50+
/** The backend messaging system to use (e.g., PUBSUB_LITE or MANAGED_KAFKA). */
51+
public abstract MessagingBackend messagingBackend();
52+
53+
/** Kafka-specific properties for when using MANAGED_KAFKA backend. */
54+
public abstract Optional<Map<String, Object>> kafkaProperties();
55+
56+
/** Default number of partitions for new Kafka topics. */
57+
public abstract int kafkaDefaultPartitions();
58+
59+
/** Default replication factor for new Kafka topics. */
60+
public abstract short kafkaDefaultReplicationFactor();
61+
4762
public static Builder newBuilder() {
4863
return new AutoValue_AdminClientSettings.Builder()
49-
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS);
64+
.setRetrySettings(Constants.DEFAULT_RETRY_SETTINGS)
65+
.setMessagingBackend(MessagingBackend.PUBSUB_LITE)
66+
.setKafkaDefaultPartitions(3)
67+
.setKafkaDefaultReplicationFactor((short) 1);
5068
}
5169

5270
@AutoValue.Builder
@@ -64,11 +82,37 @@ public abstract static class Builder {
6482
/** A service client to use to connect. */
6583
public abstract Builder setServiceClient(AdminServiceClient serviceClient);
6684

85+
/** Set the backend messaging system to use (e.g., PUBSUB_LITE or MANAGED_KAFKA). */
86+
public abstract Builder setMessagingBackend(MessagingBackend backend);
87+
88+
/** Set Kafka-specific properties for when using MANAGED_KAFKA backend. */
89+
public abstract Builder setKafkaProperties(Map<String, Object> kafkaProperties);
90+
91+
/** Set default number of partitions for new Kafka topics. */
92+
public abstract Builder setKafkaDefaultPartitions(int partitions);
93+
94+
/** Set default replication factor for new Kafka topics. */
95+
public abstract Builder setKafkaDefaultReplicationFactor(short replicationFactor);
96+
6797
/** Build the settings object. */
6898
public abstract AdminClientSettings build();
6999
}
70100

71101
AdminClient instantiate() throws ApiException {
102+
// For Kafka backend, use KafkaAdminClient
103+
if (messagingBackend() == MessagingBackend.MANAGED_KAFKA) {
104+
if (!kafkaProperties().isPresent()) {
105+
throw new IllegalStateException(
106+
"kafkaProperties must be set when using MANAGED_KAFKA backend");
107+
}
108+
return new KafkaAdminClient(
109+
region(),
110+
kafkaProperties().get(),
111+
kafkaDefaultPartitions(),
112+
kafkaDefaultReplicationFactor());
113+
}
114+
115+
// For Pub/Sub Lite backend, use AdminClientImpl
72116
AdminServiceClient serviceClient;
73117
if (serviceClient().isPresent()) {
74118
serviceClient = serviceClient().get();
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.cloudpubsub;
18+
19+
import java.net.InetAddress;
20+
import java.util.Map;
21+
22+
/** Utility methods for Google Managed Kafka (GMK) integration. */
23+
public class GmkUtils {
24+
25+
/**
26+
* Validates GMK bootstrap server connectivity and configuration.
27+
*
28+
* @param kafkaProperties The Kafka properties containing bootstrap.servers
29+
* @return Validation result with suggestions if issues are found
30+
*/
31+
public static ValidationResult validateGmkConfiguration(Map<String, Object> kafkaProperties) {
32+
String bootstrapServers = (String) kafkaProperties.get("bootstrap.servers");
33+
34+
if (bootstrapServers == null || bootstrapServers.trim().isEmpty()) {
35+
return ValidationResult.error("bootstrap.servers property is missing or empty");
36+
}
37+
38+
// Extract hostname from bootstrap servers
39+
String[] servers = bootstrapServers.split(",");
40+
for (String server : servers) {
41+
String hostname = server.trim().split(":")[0];
42+
43+
// Check if it looks like a GMK hostname
44+
if (hostname.contains("managedkafka") && hostname.contains(".cloud.goog")) {
45+
try {
46+
// Test DNS resolution
47+
InetAddress.getByName(hostname);
48+
return ValidationResult.success(
49+
"GMK bootstrap server resolved successfully: " + hostname);
50+
} catch (Exception e) {
51+
return ValidationResult.error(
52+
"Failed to resolve GMK hostname: "
53+
+ hostname
54+
+ ". Please verify:\n"
55+
+ "1. The GMK cluster exists and is running\n"
56+
+ "2. You have the correct project ID, region, and cluster name\n"
57+
+ "3. Your network allows DNS resolution to *.cloud.goog domains\n"
58+
+ "Use: gcloud managed-kafka clusters list --location=<region>"
59+
+ " --project=<project>");
60+
}
61+
}
62+
}
63+
64+
return ValidationResult.warning(
65+
"Bootstrap servers don't appear to be GMK endpoints: " + bootstrapServers);
66+
}
67+
68+
/**
69+
* Constructs a GMK bootstrap server URL from cluster details.
70+
*
71+
* @param projectId GCP project ID
72+
* @param region GCP region (e.g., "us-central1")
73+
* @param clusterId GMK cluster ID
74+
* @param port Port number
75+
* @return Formatted bootstrap server URL
76+
*/
77+
public static String buildGmkBootstrapServer(
78+
String projectId, String region, String clusterId, int port) {
79+
return String.format(
80+
"bootstrap.%s.%s.managedkafka.%s.cloud.goog:%d", clusterId, region, projectId, port);
81+
}
82+
83+
/** Constructs a GMK bootstrap server URL with default port 9092. */
84+
public static String buildGmkBootstrapServer(String projectId, String region, String clusterId) {
85+
return buildGmkBootstrapServer(projectId, region, clusterId, 9092);
86+
}
87+
88+
/** Result of configuration validation. */
89+
public static class ValidationResult {
90+
private final boolean success;
91+
private final String message;
92+
private final Level level;
93+
94+
private ValidationResult(boolean success, String message, Level level) {
95+
this.success = success;
96+
this.message = message;
97+
this.level = level;
98+
}
99+
100+
public static ValidationResult success(String message) {
101+
return new ValidationResult(true, message, Level.SUCCESS);
102+
}
103+
104+
public static ValidationResult error(String message) {
105+
return new ValidationResult(false, message, Level.ERROR);
106+
}
107+
108+
public static ValidationResult warning(String message) {
109+
return new ValidationResult(false, message, Level.WARNING);
110+
}
111+
112+
public boolean isSuccess() {
113+
return success;
114+
}
115+
116+
public String getMessage() {
117+
return message;
118+
}
119+
120+
public Level getLevel() {
121+
return level;
122+
}
123+
124+
public enum Level {
125+
SUCCESS,
126+
WARNING,
127+
ERROR
128+
}
129+
130+
@Override
131+
public String toString() {
132+
return level + ": " + message;
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)