Skip to content

Commit 858b565

Browse files
authored
samples: various sample and doc improvements we had queued up (googleapis#1987)
* samples: update publishing samples to clarify that topic objects should be cached; also fix a usage of publish() * samples: convert publishWithRetrySettings to use veneer * samples: update publishWithRetrySettings with new defaults; add comment about including all items * docs: clarify what subscriber batching means * samples: update EOD sample with endpoint * samples: add comments about ordered publishing as well
1 parent 6e2c28a commit 858b565

24 files changed

+155
-120
lines changed

samples/listenForMessagesWithExactlyOnceDelivery.js

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2022 Google LLC
1+
// Copyright 2022-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -38,8 +38,11 @@
3838
// Imports the Google Cloud client library
3939
const {PubSub} = require('@google-cloud/pubsub');
4040

41-
// Creates a client; cache this for further use
42-
const pubSubClient = new PubSub();
41+
// Pub/Sub's exactly once delivery guarantee only applies when subscribers connect to the service in the same region.
42+
// For list of locational endpoints for Pub/Sub, see https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
43+
const pubSubClient = new PubSub({
44+
apiEndpoint: 'us-west1-pubsub.googleapis.com:443',
45+
});
4346

4447
async function listenForMessagesWithExactlyOnceDelivery(
4548
subscriptionNameOrId,

samples/publishAvroRecords.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2021 Google LLC
1+
// Copyright 2019-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -46,8 +46,10 @@ const fs = require('fs');
4646
const pubSubClient = new PubSub();
4747

4848
async function publishAvroRecords(topicNameOrId) {
49-
// Get the topic metadata to learn about its schema encoding.
49+
// Cache topic objects (publishers) and reuse them.
5050
const topic = pubSubClient.topic(topicNameOrId);
51+
52+
// Get the topic metadata to learn about its schema encoding.
5153
const [topicMetadata] = await topic.getMetadata();
5254
const topicSchemaMetadata = topicMetadata.schemaSettings;
5355

samples/publishBatchedMessages.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2023 Google LLC
1+
// Copyright 2019-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -53,6 +53,7 @@ async function publishBatchedMessages(
5353
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
5454
const dataBuffer = Buffer.from(data);
5555

56+
// Cache topic objects (publishers) and reuse them.
5657
const publishOptions = {
5758
batching: {
5859
maxMessages: maxMessages,

samples/publishMessage.js

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2023 Google LLC
1+
// Copyright 2019-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -47,10 +47,11 @@ async function publishMessage(topicNameOrId, data) {
4747
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
4848
const dataBuffer = Buffer.from(data);
4949

50+
// Cache topic objects (publishers) and reuse them.
51+
const topic = pubSubClient.topic(topicNameOrId);
52+
5053
try {
51-
const messageId = await pubSubClient
52-
.topic(topicNameOrId)
53-
.publishMessage({data: dataBuffer});
54+
const messageId = topic.publishMessage({data: dataBuffer});
5455
console.log(`Message ${messageId} published.`);
5556
} catch (error) {
5657
console.error(`Received error while publishing: ${error.message}`);

samples/publishMessageWithCustomAttributes.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2023 Google LLC
1+
// Copyright 2019-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -52,9 +52,13 @@ async function publishMessageWithCustomAttributes(topicNameOrId, data) {
5252
username: 'gcp',
5353
};
5454

55-
const messageId = await pubSubClient
56-
.topic(topicNameOrId)
57-
.publishMessage({data: dataBuffer, attributes: customAttributes});
55+
// Cache topic objects (publishers) and reuse them.
56+
const topic = pubSubClient.topic(topicNameOrId);
57+
58+
const messageId = topic.publishMessage({
59+
data: dataBuffer,
60+
attributes: customAttributes,
61+
});
5862
console.log(`Message ${messageId} published.`);
5963
}
6064
// [END pubsub_publish_custom_attributes]

samples/publishOrderedMessage.js

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2023 Google LLC
1+
// Copyright 2019-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -61,14 +61,18 @@ async function publishOrderedMessage(topicNameOrId, data, orderingKey) {
6161
orderingKey: orderingKey,
6262
};
6363

64+
// Cache topic objects (publishers) and reuse them.
65+
//
66+
// Pub/Sub's ordered delivery guarantee only applies when publishes for an ordering
67+
// key are in the same region. For list of locational endpoints for Pub/Sub, see:
68+
// https://cloud.google.com/pubsub/docs/reference/service_apis_overview#list_of_locational_endpoints
6469
const publishOptions = {
6570
messageOrdering: true,
6671
};
72+
const topic = pubSubClient.topic(topicNameOrId, publishOptions);
6773

6874
// Publishes the message
69-
const messageId = await pubSubClient
70-
.topic(topicNameOrId, publishOptions)
71-
.publishMessage(message);
75+
const messageId = topic.publishMessage(message);
7276

7377
console.log(`Message ${messageId} published.`);
7478

samples/publishProtobufMessages.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2021 Google LLC
1+
// Copyright 2019-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -45,8 +45,10 @@ const protobuf = require('protobufjs');
4545
const pubSubClient = new PubSub();
4646

4747
async function publishProtobufMessages(topicNameOrId) {
48-
// Get the topic metadata to learn about its schema.
48+
// Cache topic objects (publishers) and reuse them.
4949
const topic = pubSubClient.topic(topicNameOrId);
50+
51+
// Get the topic metadata to learn about its schema.
5052
const [topicMetadata] = await topic.getMetadata();
5153
const topicSchemaMetadata = topicMetadata.schemaSettings;
5254

@@ -87,7 +89,7 @@ async function publishProtobufMessages(topicNameOrId) {
8789
return;
8890
}
8991

90-
const messageId = await topic.publish(dataBuffer);
92+
const messageId = await topic.publishMessage({data: dataBuffer});
9193
console.log(`Protobuf message ${messageId} published.`);
9294
}
9395
// [END pubsub_publish_proto_messages]

samples/publishWithFlowControl.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2021 Google LLC
1+
// Copyright 2021-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -50,7 +50,7 @@ async function publishWithFlowControl(topicNameOrId) {
5050
},
5151
};
5252

53-
// Get a publisher.
53+
// Get a publisher. Cache topic objects (publishers) and reuse them.
5454
const topic = pubSubClient.topic(topicNameOrId, options);
5555

5656
// For flow controlled publishing, we'll use a publisher flow controller

samples/publishWithOpenTelemetryTracing.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,10 @@ async function publishMessage(topicNameOrId, data) {
8686
// Publishes the message as a string, e.g. "Hello, world!"
8787
// or JSON.stringify(someObject)
8888
const dataBuffer = Buffer.from(data);
89+
90+
// Cache topic objects (publishers) and reuse them.
8991
const publisher = pubSubClient.topic(topicNameOrId);
92+
9093
const messageId = await publisher.publishMessage({data: dataBuffer});
9194
console.log(`Message ${messageId} published.`);
9295

samples/publishWithRetrySettings.js

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2019-2023 Google LLC
1+
// Copyright 2019-2024 Google LLC
22
//
33
// Licensed under the Apache License, Version 2.0 (the "License");
44
// you may not use this file except in compliance with the License.
@@ -39,34 +39,20 @@
3939

4040
// Imports the Google Cloud client library. v1 is for the lower level
4141
// proto access.
42-
const {v1} = require('@google-cloud/pubsub');
42+
const {PubSub} = require('@google-cloud/pubsub');
4343

44-
// Creates a publisher client.
45-
const publisherClient = new v1.PublisherClient({
46-
// optional auth parameters
47-
});
48-
async function publishWithRetrySettings(projectId, topicNameOrId, data) {
49-
const formattedTopic = publisherClient.projectTopicPath(
50-
projectId,
51-
topicNameOrId
52-
);
53-
54-
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
55-
const dataBuffer = Buffer.from(data);
56-
const messagesElement = {
57-
data: dataBuffer,
58-
};
59-
const messages = [messagesElement];
60-
61-
// Build the request
62-
const request = {
63-
topic: formattedTopic,
64-
messages: messages,
65-
};
44+
async function publishWithRetrySettings(topicNameOrId, data) {
45+
const pubsubClient = new PubSub();
6646

6747
// Retry settings control how the publisher handles retryable failures. Default values are shown.
6848
// The `retryCodes` array determines which grpc errors will trigger an automatic retry.
6949
// The `backoffSettings` object lets you specify the behaviour of retries over time.
50+
//
51+
// Reference this document to see the current defaults for publishing:
52+
// https://github.com/googleapis/nodejs-pubsub/blob/6e2c28a9298a49dc1b194ce747ff5258c8df6deb/src/v1/publisher_client_config.json#L59
53+
//
54+
// Please note that _all_ items must be included when passing these settings to topic().
55+
// Otherwise, unpredictable (incorrect) defaults may be assumed.
7056
const retrySettings = {
7157
retryCodes: [
7258
10, // 'ABORTED'
@@ -83,36 +69,42 @@ async function publishWithRetrySettings(projectId, topicNameOrId, data) {
8369
initialRetryDelayMillis: 100,
8470
// The multiplier by which to increase the delay time between the completion
8571
// of failed requests, and the initiation of the subsequent retrying request.
86-
retryDelayMultiplier: 1.3,
72+
retryDelayMultiplier: 4,
8773
// The maximum delay time, in milliseconds, between requests.
8874
// When this value is reached, retryDelayMultiplier will no longer be used to increase delay time.
8975
maxRetryDelayMillis: 60000,
9076
// The initial timeout parameter to the request.
91-
initialRpcTimeoutMillis: 5000,
77+
initialRpcTimeoutMillis: 60000,
9278
// The multiplier by which to increase the timeout parameter between failed requests.
9379
rpcTimeoutMultiplier: 1.0,
9480
// The maximum timeout parameter, in milliseconds, for a request. When this value is reached,
9581
// rpcTimeoutMultiplier will no longer be used to increase the timeout.
96-
maxRpcTimeoutMillis: 600000,
82+
maxRpcTimeoutMillis: 60000,
9783
// The total time, in milliseconds, starting from when the initial request is sent,
9884
// after which an error will be returned, regardless of the retrying attempts made meanwhile.
9985
totalTimeoutMillis: 600000,
10086
},
10187
};
10288

103-
const [response] = await publisherClient.publish(request, {
104-
retry: retrySettings,
89+
// Cache topic objects (publishers) and reuse them.
90+
const topic = pubsubClient.topic(topicNameOrId, {
91+
gaxOpts: {
92+
retry: retrySettings,
93+
},
10594
});
106-
console.log(`Message ${response.messageIds} published.`);
95+
96+
// Publishes the message as a string, e.g. "Hello, world!" or JSON.stringify(someObject)
97+
const dataBuffer = Buffer.from(data);
98+
const messageId = await topic.publishMessage({data: dataBuffer});
99+
console.log(`Message ${messageId} published.`);
107100
}
108101
// [END pubsub_publisher_retry_settings]
109102

110103
function main(
111-
projectId = 'YOUR_PROJECT_ID',
112104
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
113105
data = JSON.stringify({foo: 'bar'})
114106
) {
115-
publishWithRetrySettings(projectId, topicNameOrId, data).catch(err => {
107+
publishWithRetrySettings(topicNameOrId, data).catch(err => {
116108
console.error(err.message);
117109
process.exitCode = 1;
118110
});

0 commit comments

Comments
 (0)