1+ custom_content : |
2+ #### Creating a topic
3+
4+ With Pub/Sub Lite you can create topics. A topic is a named resource to which messages are sent by
5+ publishers. Add the following imports at the top of your file:
6+
7+ ```java
8+ import com.google.cloud.pubsublite.*;
9+ import com.google.cloud.pubsublite.proto.Topic;
10+ import com.google.cloud.pubsublite.proto.Topic.*;
11+ import com.google.protobuf.util.Durations;
12+ ```
13+ Then, to create the topic, use the following code:
14+
15+ ```java
16+ CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
17+ CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
18+ ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
19+ TopicName topicName = TopicName.of(TOPIC_NAME);
20+
21+ TopicPath topicPath =
22+ TopicPaths.newBuilder()
23+ .setZone(zone)
24+ .setProjectNumber(projectNum)
25+ .setTopicName(topicName)
26+ .build();
27+
28+ Topic topic =
29+ Topic.newBuilder()
30+ .setPartitionConfig(
31+ PartitionConfig.newBuilder()
32+ // Set publishing throughput to 1 times the standard partition
33+ // throughput of 4 MiB per sec. This must be in the range [1,4]. A
34+ // topic with `scale` of 2 and count of 10 is charged for 20 partitions.
35+ .setScale(1)
36+ .setCount(PARTITIONS))
37+ .setRetentionConfig(
38+ RetentionConfig.newBuilder()
39+ // How long messages are retained.
40+ .setPeriod(Durations.fromDays(1))
41+ // Set storage per partition to 100 GiB. This must be 30 GiB-10 TiB.
42+ // If the number of bytes stored in any of the topic's partitions grows
43+ // beyond this value, older messages will be dropped to make room for
44+ // newer ones, regardless of the value of `period`.
45+ .setPerPartitionBytes(100 * 1024 * 1024 * 1024L))
46+ .setName(topicPath.value())
47+ .build();
48+
49+ AdminClientSettings adminClientSettings =
50+ AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
51+
52+ try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
53+
54+ Topic response = adminClient.createTopic(topic).get();
55+
56+ System.out.println(response.getAllFields() + "created successfully.");
57+ }
58+ ```
59+
60+ #### Publishing messages
61+
62+ With Pub/Sub Lite, you can publish messages to a topic. Add the following import at the top of your file:
63+
64+ ```java
65+ import com.google.api.core.*;
66+ import com.google.cloud.pubsublite.*;
67+ import com.google.cloud.pubsublite.cloudpubsub.*;
68+ import com.google.protobuf.ByteString;
69+ import com.google.pubsub.v1.PubsubMessage;
70+ import java.util.*;
71+ ```
72+ Then, to publish messages asynchronously, use the following code:
73+
74+ ```java
75+ public class PublisherExample {
76+ private static final int MESSAGE_COUNT = 10;
77+
78+ // Load the project number from a commandline flag.
79+ private static final long PROJECT_NUMBER = 123L;
80+ // Load the zone from a commandline flag.
81+ private static final String ZONE = "us-central1-b";
82+ // Load the topic name from a commandline flag.
83+ private static final String TOPIC_NAME = "my-new-topic";
84+
85+ public static List<ApiFuture<String>> runPublisher(Publisher publisher) throws Exception {
86+ List<ApiFuture<String>> futures = new ArrayList<>();
87+ for (int i = 0; i < MESSAGE_COUNT; i++) {
88+ String message = "message-" + i;
89+
90+ // Convert the message to a byte string.
91+ ByteString data = ByteString.copyFromUtf8(message);
92+ PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
93+
94+ // Schedule a message to be published. Messages are automatically batched.
95+ ApiFuture<String> future = publisher.publish(pubsubMessage);
96+ futures.add(future);
97+ }
98+ return futures;
99+ }
100+
101+ // Publish messages to a topic.
102+ public static void main(String[] args) throws Exception {
103+ PublisherSettings settings =
104+ PublisherSettings.newBuilder()
105+ .setTopicPath(
106+ TopicPaths.newBuilder()
107+ .setProjectNumber(ProjectNumber.of(PROJECT_NUMBER))
108+ .setZone(CloudZone.parse(ZONE))
109+ .setTopicName(TopicName.of(TOPIC_NAME))
110+ .build())
111+ .build();
112+ Publisher publisher = Publisher.create(settings);
113+ publisher.startAsync().awaitRunning();
114+ List<ApiFuture<String>> futureAckIds = runPublisher(publisher);
115+ publisher.stopAsync().awaitTerminated();
116+
117+ List<String> ackIds = ApiFutures.allAsList(futureAckIds).get();
118+ ArrayList<PublishMetadata> metadata = new ArrayList<>();
119+ for (String id : ackIds) {
120+ metadata.add(PublishMetadata.decode(id));
121+ }
122+ for (PublishMetadata one : metadata) {
123+ System.out.println(one);
124+ }
125+ }
126+ }
127+ ```
128+
129+ #### Creating a subscription
130+
131+ With Pub/Sub Lite you can create subscriptions. A subscription represents the stream of messages from a
132+ single, specific topic. Add the following imports at the top of your file:
133+
134+ ```java
135+ import com.google.cloud.pubsublite.*;
136+ import com.google.cloud.pubsublite.proto.Subscription;
137+ import com.google.cloud.pubsublite.proto.Subscription.*;
138+ import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.*;
139+ ```
140+ Then, to create the subscription, use the following code:
141+
142+ ```java
143+ CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
144+ CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
145+ ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
146+ TopicName topicName = TopicName.of(TOPIC_NAME);
147+ SubscriptionName subscriptionName = SubscriptionName.of(SUBSCRIPTION_NAME);
148+
149+ TopicPath topicPath =
150+ TopicPaths.newBuilder()
151+ .setZone(zone)
152+ .setProjectNumber(projectNum)
153+ .setTopicName(topicName)
154+ .build();
155+
156+ SubscriptionPath subscriptionPath =
157+ SubscriptionPaths.newBuilder()
158+ .setZone(zone)
159+ .setProjectNumber(projectNum)
160+ .setSubscriptionName(subscriptionName)
161+ .build();
162+
163+ Subscription subscription =
164+ Subscription.newBuilder()
165+ .setDeliveryConfig(
166+ // The server does not wait for a published message to be successfully
167+ // written to storage before delivering it to subscribers. As such, a
168+ // subscriber may receive a message for which the write to storage failed.
169+ // If the subscriber re-reads the offset of that message later on, there
170+ // may be a gap at that offset.
171+ DeliveryConfig.newBuilder()
172+ .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY))
173+ .setName(subscriptionPath.value())
174+ .setTopic(topicPath.value())
175+ .build();
176+
177+ AdminClientSettings adminClientSettings =
178+ AdminClientSettings.newBuilder().setRegion(cloudRegion).build();
179+
180+ try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
181+
182+ Subscription response = adminClient.createSubscription(subscription).get();
183+
184+ System.out.println(response.getAllFields() + "created successfully.");
185+ }
186+ ```
187+
188+ #### Receiving messages
189+
190+ With Pub/Sub Lite you can receive messages from a subscription. Add the
191+ following imports at the top of your file:
192+
193+ ```java
194+ import com.google.cloud.pubsub.v1.AckReplyConsumer;
195+ import com.google.cloud.pubsub.v1.MessageReceiver;
196+ import com.google.cloud.pubsublite.*;
197+ import com.google.cloud.pubsublite.cloudpubsub.*;
198+ import com.google.common.util.concurrent.MoreExecutors;
199+ import com.google.pubsub.v1.PubsubMessage;
200+ import java.util.*;
201+ ```
202+ Then, to pull messages asynchronously, use the following code:
203+
204+ ```java
205+ CloudRegion cloudRegion = CloudRegion.of(CLOUD_REGION);
206+ CloudZone zone = CloudZone.of(cloudRegion, ZONE_ID);
207+ ProjectNumber projectNum = ProjectNumber.of(PROJECT_NUMBER);
208+ SubscriptionName subscriptionName = SubscriptionName.of(SUBSCRIPTION_NAME);
209+
210+ SubscriptionPath subscriptionPath =
211+ SubscriptionPaths.newBuilder()
212+ .setZone(zone)
213+ .setProjectNumber(projectNum)
214+ .setSubscriptionName(subscriptionName)
215+ .build();
216+
217+ FlowControlSettings flowControlSettings =
218+ FlowControlSettings.builder()
219+ // Set outstanding bytes to 10 MiB per partition.
220+ .setBytesOutstanding(10 * 1024 * 1024L)
221+ .setMessagesOutstanding(Long.MAX_VALUE)
222+ .build();
223+
224+ List<Partition> partitions = new ArrayList<>();
225+ for (Integer num : PARTITION_NOS) {
226+ partitions.add(Partition.of(num));
227+ }
228+
229+ MessageReceiver receiver =
230+ new MessageReceiver() {
231+ @Override
232+ public void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
233+ System.out.println("Id : " + message.getMessageId());
234+ System.out.println("Data : " + message.getData().toStringUtf8());
235+ consumer.ack();
236+ }
237+ };
238+
239+ SubscriberSettings subscriberSettings =
240+ SubscriberSettings.newBuilder()
241+ .setSubscriptionPath(subscriptionPath)
242+ .setPerPartitionFlowControlSettings(flowControlSettings)
243+ .setPartitions(partitions)
244+ .setReceiver(receiver)
245+ .build();
246+
247+ Subscriber subscriber = Subscriber.create(subscriberSettings);
248+
249+ // Start the subscriber. Upon successful starting, its state will become RUNNING.
250+ subscriber.startAsync().awaitRunning();
251+
252+ System.out.println("Listening to messages on " + subscriptionPath.value() + " ...");
253+
254+ try {
255+ System.out.println(subscriber.state());
256+ // Wait 30 seconds for the subscriber to reach TERMINATED state. If it encounters
257+ // unrecoverable errors before then, its state will change to FAILED and
258+ // an IllegalStateException will be thrown.
259+ subscriber.awaitTerminated(30, TimeUnit.SECONDS);
260+ } catch (TimeoutException t) {
261+ // Shut down the subscriber. This will change the state of the
262+ // subscriber to TERMINATED.
263+ subscriber.stopAsync();
264+ System.out.println(subscriber.state());
265+ }
266+ ```
267+ about : |
268+ [Google Pub/Sub Lite][product-docs] is designed to provide reliable,
269+ many-to-many, asynchronous messaging between applications. Publisher
270+ applications can send messages to a topic and other applications can
271+ subscribe to that topic to receive the messages. By decoupling senders and
272+ receivers, Google Cloud Pub/Sub allows developers to communicate between
273+ independently written applications.
274+
275+ Compared to Google Pub/Sub, Pub/Sub Lite provides partitioned zonal data
276+ storage with predefined capacity. Both products present a similar API, but
277+ Pub/Sub Lite has more usage caveats.
278+
279+ See the [Google Pub/Sub Lite docs](https://cloud.google.com/pubsub/quickstart-console#before-you-begin) for more details on how to activate
280+ Pub/Sub Lite for your project, as well as guidance on how to choose between
281+ Cloud Pub/Sub and Pub/Sub Lite.
0 commit comments