Skip to content

Commit d981d02

Browse files
committed
feat: add kafka publisher implementation
1 parent 6b798dc commit d981d02

File tree

7 files changed

+608
-5
lines changed

7 files changed

+608
-5
lines changed

google-cloud-pubsublite/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@
119119
<version>0.8</version>
120120
</dependency>
121121

122+
<!-- Apache Kafka Client (optional, for Managed Kafka backend) -->
123+
<dependency>
124+
<groupId>org.apache.kafka</groupId>
125+
<artifactId>kafka-clients</artifactId>
126+
<version>3.7.0</version>
127+
<optional>true</optional>
128+
</dependency>
129+
122130
<!--test dependencies-->
123131
<dependency>
124132
<groupId>com.google.truth</groupId>
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.cloudpubsub;
18+
19+
/** Specifies the messaging backend to use for Publisher and Subscriber clients. */
20+
public enum MessagingBackend {
21+
/**
22+
* Use Google Cloud Pub/Sub Lite (default). This is the traditional backend with zonal storage and
23+
* predictable pricing.
24+
*/
25+
PUBSUB_LITE,
26+
27+
/**
28+
* Use Google Cloud Managed Service for Apache Kafka. Provides Kafka-compatible API with Google
29+
* Cloud management.
30+
*/
31+
MANAGED_KAFKA
32+
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java

Lines changed: 36 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import com.google.cloud.pubsublite.MessageTransformer;
3535
import com.google.cloud.pubsublite.Partition;
3636
import com.google.cloud.pubsublite.TopicPath;
37+
import com.google.cloud.pubsublite.cloudpubsub.internal.KafkaPartitionPublisherFactory;
3738
import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher;
3839
import com.google.cloud.pubsublite.internal.CheckedApiException;
3940
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
@@ -52,6 +53,7 @@
5253
import com.google.pubsub.v1.PubsubMessage;
5354
import io.grpc.CallOptions;
5455
import java.time.Duration;
56+
import java.util.Map;
5557
import java.util.Optional;
5658

5759
/**
@@ -70,7 +72,7 @@ public abstract class PublisherSettings {
7072
// Required parameters.
7173

7274
/** The topic path to publish to. */
73-
abstract TopicPath topicPath();
75+
public abstract TopicPath topicPath();
7476

7577
// Optional parameters.
7678
/** A KeyExtractor for getting the routing key from a message. */
@@ -80,16 +82,16 @@ public abstract class PublisherSettings {
8082
abstract Optional<MessageTransformer<PubsubMessage, Message>> messageTransformer();
8183

8284
/** Batching settings for this publisher to use. Apply per-partition. */
83-
abstract BatchingSettings batchingSettings();
85+
public abstract BatchingSettings batchingSettings();
8486

8587
/**
8688
* Whether idempotence is enabled, where the server will ensure that unique messages within a
8789
* single publisher session are stored only once. Default true.
8890
*/
89-
abstract boolean enableIdempotence();
91+
public abstract boolean enableIdempotence();
9092

9193
/** Whether request compression is enabled. Default true. */
92-
abstract boolean enableCompression();
94+
public abstract boolean enableCompression();
9395

9496
/** A provider for credentials. */
9597
abstract CredentialsProvider credentialsProvider();
@@ -111,6 +113,17 @@ public abstract class PublisherSettings {
111113
// For testing.
112114
abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder();
113115

116+
/** The messaging backend to use. Defaults to PUBSUB_LITE for backward compatibility. */
117+
public abstract MessagingBackend messagingBackend();
118+
119+
/**
120+
* Kafka-specific configuration properties. Only used when messagingBackend is MANAGED_KAFKA.
121+
* Common properties include: - "bootstrap.servers": Kafka broker addresses - "compression.type":
122+
* Compression algorithm (e.g., "snappy", "gzip") - "max.in.flight.requests.per.connection":
123+
* Pipelining configuration
124+
*/
125+
public abstract Optional<Map<String, Object>> kafkaProperties();
126+
114127
/** Get a new builder for a PublisherSettings. */
115128
public static Builder newBuilder() {
116129
return new AutoValue_PublisherSettings.Builder()
@@ -120,7 +133,8 @@ public static Builder newBuilder() {
120133
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
121134
.setEnableIdempotence(true)
122135
.setEnableCompression(true)
123-
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
136+
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder())
137+
.setMessagingBackend(MessagingBackend.PUBSUB_LITE);
124138
}
125139

126140
@AutoValue.Builder
@@ -169,6 +183,12 @@ public abstract Builder setMessageTransformer(
169183
abstract Builder setUnderlyingBuilder(
170184
SinglePartitionPublisherBuilder.Builder underlyingBuilder);
171185

186+
/** Sets the messaging backend. Defaults to PUBSUB_LITE. */
187+
public abstract Builder setMessagingBackend(MessagingBackend backend);
188+
189+
/** Sets Kafka-specific properties. Only used when backend is MANAGED_KAFKA. */
190+
public abstract Builder setKafkaProperties(Map<String, Object> properties);
191+
172192
public abstract PublisherSettings build();
173193
}
174194

@@ -185,6 +205,12 @@ private PublisherServiceClient newServiceClient() throws ApiException {
185205
}
186206

187207
private PartitionPublisherFactory getPartitionPublisherFactory() {
208+
// Check backend and return appropriate factory
209+
if (messagingBackend() == MessagingBackend.MANAGED_KAFKA) {
210+
return new KafkaPartitionPublisherFactory(this);
211+
}
212+
213+
// Existing Pub/Sub Lite implementation
188214
PublisherServiceClient client = newServiceClient();
189215
ByteString publisherClientId = UuidBuilder.toByteString(UuidBuilder.generate());
190216
return new PartitionPublisherFactory() {
@@ -241,6 +267,11 @@ private AdminClient getAdminClient() throws ApiException {
241267

242268
@SuppressWarnings("CheckReturnValue")
243269
Publisher instantiate() throws ApiException {
270+
// For Kafka backend, use simpler publisher that doesn't need partition watching
271+
if (messagingBackend() == MessagingBackend.MANAGED_KAFKA) {
272+
return new com.google.cloud.pubsublite.cloudpubsub.internal.KafkaPublisher(this);
273+
}
274+
244275
if (batchingSettings().getFlowControlSettings().getMaxOutstandingElementCount() != null
245276
|| batchingSettings().getFlowControlSettings().getMaxOutstandingRequestBytes() != null) {
246277
throw new CheckedApiException(
Lines changed: 160 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,160 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.cloudpubsub.internal;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutures;
21+
import com.google.api.core.SettableApiFuture;
22+
import com.google.api.gax.rpc.StatusCode.Code;
23+
import com.google.cloud.pubsublite.MessageMetadata;
24+
import com.google.cloud.pubsublite.Offset;
25+
import com.google.cloud.pubsublite.Partition;
26+
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
27+
import com.google.cloud.pubsublite.internal.CheckedApiException;
28+
import com.google.cloud.pubsublite.internal.ProxyService;
29+
import com.google.cloud.pubsublite.internal.Publisher;
30+
import com.google.cloud.pubsublite.proto.PubSubMessage;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import java.util.concurrent.ConcurrentLinkedQueue;
34+
import org.apache.kafka.clients.producer.KafkaProducer;
35+
import org.apache.kafka.clients.producer.ProducerRecord;
36+
import org.apache.kafka.common.header.Header;
37+
import org.apache.kafka.common.header.internals.RecordHeader;
38+
39+
/** Adapts a Kafka producer to the internal Publisher interface for a specific partition. */
40+
public class KafkaPartitionPublisher extends ProxyService implements Publisher<MessageMetadata> {
41+
42+
private final KafkaProducer<byte[], byte[]> producer;
43+
private final String topicName;
44+
private final Partition partition;
45+
private final PublisherSettings settings;
46+
private final ConcurrentLinkedQueue<SettableApiFuture<MessageMetadata>> pendingFutures;
47+
48+
public KafkaPartitionPublisher(
49+
KafkaProducer<byte[], byte[]> producer,
50+
String topicName,
51+
Partition partition,
52+
PublisherSettings settings) {
53+
this.producer = producer;
54+
this.topicName = topicName;
55+
this.partition = partition;
56+
this.settings = settings;
57+
this.pendingFutures = new ConcurrentLinkedQueue<>();
58+
}
59+
60+
@Override
61+
public ApiFuture<MessageMetadata> publish(PubSubMessage message) {
62+
if (state() == State.FAILED) {
63+
return ApiFutures.immediateFailedFuture(
64+
new CheckedApiException("Publisher has failed", Code.FAILED_PRECONDITION).underlying);
65+
}
66+
67+
try {
68+
// Convert to Kafka ProducerRecord
69+
ProducerRecord<byte[], byte[]> record = convertToKafkaRecord(message);
70+
71+
// Create future for response
72+
SettableApiFuture<MessageMetadata> future = SettableApiFuture.create();
73+
pendingFutures.add(future);
74+
75+
// Send to Kafka
76+
producer.send(
77+
record,
78+
(metadata, exception) -> {
79+
pendingFutures.remove(future);
80+
81+
if (exception != null) {
82+
CheckedApiException apiException = new CheckedApiException(exception, Code.INTERNAL);
83+
future.setException(apiException.underlying);
84+
85+
// If this is a permanent error, fail the publisher
86+
if (isPermanentError(exception)) {
87+
onPermanentError(apiException);
88+
}
89+
} else {
90+
// Convert Kafka metadata to MessageMetadata
91+
MessageMetadata messageMetadata =
92+
MessageMetadata.of(
93+
Partition.of(metadata.partition()), Offset.of(metadata.offset()));
94+
future.set(messageMetadata);
95+
}
96+
});
97+
98+
return future;
99+
100+
} catch (Exception e) {
101+
CheckedApiException apiException = new CheckedApiException(e, Code.INTERNAL);
102+
onPermanentError(apiException);
103+
return ApiFutures.immediateFailedFuture(apiException.underlying);
104+
}
105+
}
106+
107+
@Override
108+
public void flush() {
109+
producer.flush();
110+
}
111+
112+
@Override
113+
public void cancelOutstandingPublishes() {
114+
CheckedApiException exception =
115+
new CheckedApiException("Publisher is shutting down", Code.CANCELLED);
116+
117+
pendingFutures.forEach(future -> future.setException(exception.underlying));
118+
pendingFutures.clear();
119+
}
120+
121+
// Note: doStart() and doStop() are handled by ProxyService
122+
123+
private ProducerRecord<byte[], byte[]> convertToKafkaRecord(PubSubMessage message) {
124+
// Extract key - use ordering key if available
125+
byte[] key = message.getKey().isEmpty() ? null : message.getKey().toByteArray();
126+
127+
// Create record with explicit partition
128+
ProducerRecord<byte[], byte[]> record =
129+
new ProducerRecord<byte[], byte[]>(
130+
topicName,
131+
Integer.valueOf((int) partition.value()), // Use explicit partition
132+
key,
133+
message.getData().toByteArray());
134+
135+
// Convert attributes to headers
136+
List<Header> headers = new ArrayList<>();
137+
message.getAttributesMap().forEach((k, v) -> headers.add(new RecordHeader(k, v.toByteArray())));
138+
139+
// Add event time as header if present
140+
if (message.hasEventTime()) {
141+
headers.add(
142+
new RecordHeader(
143+
"pubsublite.event_time",
144+
String.valueOf(message.getEventTime().getSeconds()).getBytes()));
145+
}
146+
147+
headers.forEach(record.headers()::add);
148+
149+
return record;
150+
}
151+
152+
private boolean isPermanentError(Exception e) {
153+
// Determine if error is permanent and should fail the publisher
154+
String message = e.getMessage();
155+
return message != null
156+
&& (message.contains("InvalidTopicException")
157+
|| message.contains("AuthorizationException")
158+
|| message.contains("SecurityDisabledException"));
159+
}
160+
}

0 commit comments

Comments
 (0)