Skip to content

Commit 6073810

Browse files
docs: Add a basic readme for kafka client (#320)
1 parent 77c59d8 commit 6073810

File tree

1 file changed

+90
-0
lines changed

1 file changed

+90
-0
lines changed

pubsublite-kafka-shim/README.md

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
# Instructions for Pub/Sub Lite Kafka usage.
2+
3+
1. Add the following to your POM file to download the Pub/Sub Lite Kafka shim.
4+
```xml
5+
<dependency>
6+
<groupId>org.apache.kafka</groupId>
7+
<artifactId>kafka-clients</artifactId>
8+
<version>2.6.0</version>
9+
</dependency>
10+
<dependency>
11+
<groupId>com.google.cloud</groupId>
12+
<artifactId>pubsublite-kafka-shim</artifactId>
13+
<version>TODO: Make a release</version>
14+
</dependency>
15+
```
16+
17+
1. Create a topic using `gcloud pubsub lite-topics create`
18+
1. Write some messages using:
19+
20+
```java
21+
import com.google.cloud.pubsublite.kafka.ProducerSettings;
22+
import org.apache.kafka.clients.producer.*;
23+
import com.google.cloud.pubsublite.*;
24+
25+
...
26+
27+
private final static String ZONE = "us-central1-b";
28+
private final static Long PROJECT_NUM = 123L;
29+
30+
...
31+
32+
TopicPath topic = TopicPath.newBuilder()
33+
.setLocation(CloudZone.parse(ZONE))
34+
.setProject(ProjectNumber.of(PROJECT_NUM))
35+
.setName(TopicName.of("my-topic")).build();
36+
37+
ProducerSettings settings = ProducerSettings.newBuilder()
38+
.setTopicPath(topic)
39+
.build();
40+
41+
try (Producer<byte[], byte[]> producer = settings.instantiate()) {
42+
Future<RecordMetadata> sent = producer.send(new ProducerRecord(
43+
topic.toString(), // Required to be the same topic.
44+
"key".getBytes(),
45+
"value".getBytes()
46+
));
47+
RecordMetadata meta = sent.get();
48+
}
49+
```
50+
1. Create a subscription using `gcloud pubsub lite-subscriptions create`
51+
1. Read some messages using:
52+
53+
```java
54+
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
55+
import org.apache.kafka.clients.consumer.*;
56+
import com.google.cloud.pubsublite.*;
57+
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
58+
59+
...
60+
61+
private final static String ZONE = "us-central1-b";
62+
private final static Long PROJECT_NUM = 123L;
63+
64+
...
65+
66+
SubscriptionPath subscription = SubscriptionPath.newBuilder()
67+
.setLocation(CloudZone.parse(ZONE))
68+
.setProject(ProjectNumber.of(PROJECT_NUM))
69+
.setName(SubscriptionName.of("my-sub"))
70+
.build();
71+
72+
ConsumerSettings settings = ConsumerSettings.newBuilder()
73+
.setSubscriptionPath(subscription)
74+
.setPerPartitionFlowControlSettings(FlowControlSettings.builder()
75+
.setBytesOutstanding(10_000_000) // 10 MB
76+
.setMessagesOutstanding(Long.MAX_VALUE)
77+
.build())
78+
.setAutocommit(true);
79+
80+
try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
81+
while (true) {
82+
ConsumerRecords<byte[], byte[]> records = consumer.poll(Long.MAX_VALUE);
83+
for (ConsumerRecord<byte[], byte[]> record : records) {
84+
System.out.println(record.offset() +:+ record.value());
85+
}
86+
}
87+
} catch (WakeupException e) {
88+
// ignored
89+
}
90+
```

0 commit comments

Comments
 (0)