Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
24c37bb
feat: Add timeout option to Subscription.close()
google-labs-jules[bot] May 6, 2025
687212d
docs: revert README change so release-please can do it
feywind May 9, 2025
da726d0
feat: jules' vibin' is too lo-fi, fix some bad assumptions
feywind May 13, 2025
629aa43
samples: typeless a JS sample for close with timeout
feywind May 13, 2025
a0ba234
feat: add awaitWithTimeout and test
feywind Jun 16, 2025
13e58f2
tests: also test error results without timeout
feywind Jun 16, 2025
5db179e
feat: update for the current spec, test updates coming
feywind Jun 16, 2025
62afe56
tests: misc fixes before further additions
feywind Jun 19, 2025
b904884
feat: update Temporal shims to match latest standards
feywind Jun 20, 2025
2cf219f
chore: linter fix
feywind Jun 20, 2025
6e41e4d
feat: update to latest spec doc info, finish unit tests
feywind Jun 20, 2025
49003e9
Merge branch 'main' into feat-close-timeout
feywind Jun 20, 2025
ca50c3c
feat: also move the options from close() parameters to subscriber opt…
feywind Jun 20, 2025
ce5e917
chore: fix linter errors
feywind Jul 4, 2025
f2898a3
samples: update to latest API changes
feywind Jul 8, 2025
073c6d7
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 3, 2025
bcad5e7
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 16, 2025
f28be03
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jul 16, 2025
9aabe08
Merge branch 'feat-close-timeout' of https://github.com/googleapis/no…
gcf-owl-bot[bot] Jul 16, 2025
8866c98
fix: no need to check isEmpty on remove
feywind Aug 6, 2025
82e0c81
chore: merge remote-tracking branch 'refs/remotes/origin/feat-close-t…
feywind Aug 6, 2025
f08ae6d
chore: merge remote-tracking branch 'remotes/origin/main' into feat-c…
feywind Aug 6, 2025
791a7f0
chore: remove unneeded promise skip code
feywind Aug 6, 2025
36e5361
fix: substantially clarify the awaitWithTimeout interface
feywind Aug 6, 2025
d3a3489
chore: hoist timeout logic into its own method
feywind Aug 6, 2025
a285935
fix: tests were leaking EventEmitter handlers
feywind Aug 6, 2025
abeeb6c
chore: change constant to CONSTANT_CASE
feywind Aug 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree

| Sample | Source Code | Try it |
| --------------------------- | --------------------------------- | ------ |
| Close Subscription with Timeout | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/closeSubscriptionWithTimeout.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/closeSubscriptionWithTimeout.js,samples/README.md) |
| Commit an Avro-Based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/commitAvroSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/commitAvroSchema.js,samples/README.md) |
| Commit an Proto-Based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/commitProtoSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/commitProtoSchema.js,samples/README.md) |
| Create an Avro based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createAvroSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createAvroSchema.js,samples/README.md) |
Expand Down
20 changes: 20 additions & 0 deletions samples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ guides.

* [Before you begin](#before-you-begin)
* [Samples](#samples)
* [Close Subscription with Timeout](#close-subscription-with-timeout)
* [Commit an Avro-Based Schema](#commit-an-avro-based-schema)
* [Commit an Proto-Based Schema](#commit-an-proto-based-schema)
* [Create an Avro based Schema](#create-an-avro-based-schema)
Expand Down Expand Up @@ -108,6 +109,25 @@ Before running the samples, make sure you've followed the steps outlined in



### Close Subscription with Timeout

Demonstrates closing a subscription with a specified timeout for graceful shutdown.

View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/closeSubscriptionWithTimeout.js).

[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/closeSubscriptionWithTimeout.js,samples/README.md)

__Usage:__


`node closeSubscriptionWithTimeout.js <topic-name> <subscription-name>`


-----




### Commit an Avro-Based Schema

Commits a new schema definition revision on a project, using Avro
Expand Down
97 changes: 97 additions & 0 deletions samples/closeSubscriptionWithTimeout.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// This is a generated sample, using the typeless sample bot. Please
// look for the source TypeScript sample (.ts) for modifications.
'use strict';

/**
* This sample demonstrates how to use the `timeout` option when closing a Pub/Sub
* subscription using the Node.js client library. The timeout allows for graceful
* shutdown, attempting to nack any buffered messages before closing.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Close Subscription with Timeout
// description: Demonstrates closing a subscription with a specified timeout for graceful shutdown.
// usage: node closeSubscriptionWithTimeout.js <topic-name> <subscription-name>

// This sample is currently speculative.
// -START pubsub_close_subscription_with_timeout]

// Imports the Google Cloud client library
const {
PubSub,
Duration,
SubscriptionCloseBehaviors,
} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubsub = new PubSub();

async function closeSubscriptionWithTimeout(
topicNameOrId,
subscriptionNameOrId,
) {
const topic = pubsub.topic(topicNameOrId);

// Closes the subscription immediately, not waiting for anything.
let subscription = topic.subscription(subscriptionNameOrId, {
closeOptions: {
timeout: Duration.from({seconds: 0}),
},
});
await subscription.close();

// Shuts down the gRPC connection, and waits for just before the timeout
// to send nacks for buffered messages. If `timeout` were missing, this
// would wait for the maximum leasing timeout.
subscription = topic.subscription(subscriptionNameOrId, {
closeOptions: {
behavior: SubscriptionCloseBehaviors.WaitForProcessing,
timeout: Duration.from({seconds: 10}),
},
});
await subscription.close();

// Shuts down the gRPC connection, sends nacks for buffered messages, and waits
// through the timeout for nacks to send.
subscription = topic.subscription(subscriptionNameOrId, {
closeOptions: {
behavior: SubscriptionCloseBehaviors.NackImmediately,
timeout: Duration.from({seconds: 10}),
},
});
await subscription.close();
}
// -END pubsub_close_subscription_with_timeout]

// Presumes topic and subscription have been created prior to running the sample.
// If you uncomment the cleanup code above, the sample will delete them afterwards.
function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
) {
closeSubscriptionWithTimeout(topicNameOrId, subscriptionNameOrId).catch(
err => {
console.error(err.message);
process.exitCode = 1;
},
);
}

main(...process.argv.slice(2));
93 changes: 93 additions & 0 deletions samples/typescript/closeSubscriptionWithTimeout.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2025 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* This sample demonstrates how to use the `timeout` option when closing a Pub/Sub
* subscription using the Node.js client library. The timeout allows for graceful
* shutdown, attempting to nack any buffered messages before closing.
*
* For more information, see the README.md under /pubsub and the documentation
* at https://cloud.google.com/pubsub/docs.
*/

// sample-metadata:
// title: Close Subscription with Timeout
// description: Demonstrates closing a subscription with a specified timeout for graceful shutdown.
// usage: node closeSubscriptionWithTimeout.js <topic-name> <subscription-name>

// This sample is currently speculative.
// -START pubsub_close_subscription_with_timeout]

// Imports the Google Cloud client library
import {
PubSub,
Duration,
SubscriptionCloseBehaviors,
} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubsub = new PubSub();

async function closeSubscriptionWithTimeout(
topicNameOrId: string,
subscriptionNameOrId: string,
) {
const topic = pubsub.topic(topicNameOrId);

// Closes the subscription immediately, not waiting for anything.
let subscription = topic.subscription(subscriptionNameOrId, {
closeOptions: {
timeout: Duration.from({seconds: 0}),
},
});
await subscription.close();

// Shuts down the gRPC connection, and waits for just before the timeout
// to send nacks for buffered messages. If `timeout` were missing, this
// would wait for the maximum leasing timeout.
subscription = topic.subscription(subscriptionNameOrId, {
closeOptions: {
behavior: SubscriptionCloseBehaviors.WaitForProcessing,
timeout: Duration.from({seconds: 10}),
},
});
await subscription.close();

// Shuts down the gRPC connection, sends nacks for buffered messages, and waits
// through the timeout for nacks to send.
subscription = topic.subscription(subscriptionNameOrId, {
closeOptions: {
behavior: SubscriptionCloseBehaviors.NackImmediately,
timeout: Duration.from({seconds: 10}),
},
});
await subscription.close();
}
// -END pubsub_close_subscription_with_timeout]

// Presumes topic and subscription have been created prior to running the sample.
// If you uncomment the cleanup code above, the sample will delete them afterwards.
function main(
topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID',
subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID',
) {
closeSubscriptionWithTimeout(topicNameOrId, subscriptionNameOrId).catch(
err => {
console.error(err.message);
process.exitCode = 1;
},
);
}

main(...process.argv.slice(2));
6 changes: 3 additions & 3 deletions src/exponential-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ export class ExponentialRetry<T> {
private _timer?: NodeJS.Timeout;

constructor(backoff: Duration, maxBackoff: Duration) {
this._backoffMs = backoff.totalOf('millisecond');
this._maxBackoffMs = maxBackoff.totalOf('millisecond');
this._backoffMs = backoff.milliseconds;
this._maxBackoffMs = maxBackoff.milliseconds;
}

/**
Expand Down Expand Up @@ -170,7 +170,7 @@ export class ExponentialRetry<T> {

next.retryInfo!.callback(
next as unknown as T,
Duration.from({millis: now - next.retryInfo!.firstRetry}),
Duration.from({milliseconds: now - next.retryInfo!.firstRetry}),
);
} else {
break;
Expand Down
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ export {
SubscriptionMetadata,
SubscriptionOptions,
SubscriptionCloseCallback,
SubscriptionCloseOptions,
SubscriptionCloseBehaviors,
CreateSubscriptionOptions,
CreateSubscriptionCallback,
CreateSubscriptionResponse,
Expand Down
21 changes: 21 additions & 0 deletions src/lease-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ export class LeaseManager extends EventEmitter {
*/
clear(): Message[] {
const wasFull = this.isFull();
const wasEmpty = this.isEmpty();

if (this.pending > 0) {
logs.subscriberFlowControl.info(
Expand All @@ -161,11 +162,15 @@ export class LeaseManager extends EventEmitter {
if (wasFull) {
process.nextTick(() => this.emit('free'));
}
if (!wasEmpty && this.isEmpty()) {
process.nextTick(() => this.emit('empty'));
}

this._cancelExtension();

return remaining;
}

/**
* Indicates if we're at or over capacity.
*
Expand All @@ -176,6 +181,17 @@ export class LeaseManager extends EventEmitter {
const {maxBytes, maxMessages} = this._options;
return this.size >= maxMessages! || this.bytes >= maxBytes!;
}

/**
* True if we have no messages in leasing.
*
* @returns {boolean}
* @private
*/
isEmpty(): boolean {
return this._messages.size === 0;
}

/**
* Removes a message from the inventory. Stopping the deadline extender if no
* messages are left over.
Expand Down Expand Up @@ -216,6 +232,10 @@ export class LeaseManager extends EventEmitter {
this._dispense(this._pending.shift()!);
}

if (this.isEmpty()) {
this.emit('empty');
}

if (this.size === 0 && this._isLeasing) {
this._cancelExtension();
}
Expand Down Expand Up @@ -265,6 +285,7 @@ export class LeaseManager extends EventEmitter {
if (this._subscriber.isOpen) {
message.subSpans.flowEnd();
process.nextTick(() => {
message.dispatched();
logs.callbackDelivery.info(
'message (ID %s, ackID %s) delivery to user callbacks',
message.id,
Expand Down
2 changes: 1 addition & 1 deletion src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ const DEFAULT_OPTIONS: MessageStreamOptions = {
highWaterMark: 0,
maxStreams: defaultOptions.subscription.maxStreams,
timeout: 300000,
retryMinBackoff: Duration.from({millis: 100}),
retryMinBackoff: Duration.from({milliseconds: 100}),
retryMaxBackoff: Duration.from({seconds: 60}),
};

Expand Down
31 changes: 25 additions & 6 deletions src/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,34 @@ export class PubSub {
}

/**
* Closes out this object, releasing any server connections. Note that once
* you close a PubSub object, it may not be used again. Any pending operations
* (e.g. queued publish messages) will fail. If you have topic or subscription
* objects that may have pending operations, you should call close() on those
* first if you want any pending messages to be delivered correctly. The
* Closes the PubSub client, releasing any underlying gRPC connections.
*
* Note that once you close a PubSub object, it may not be used again. Any pending
* operations (e.g. queued publish messages) will fail. If you have topic or
* subscription objects that may have pending operations, you should call close()
* on those first if you want any pending messages to be delivered correctly. The
* PubSub class doesn't track those.

* Note that this method primarily closes the gRPC clients (Publisher and Subscriber)
* used for API requests. It does **not** automatically handle the graceful shutdown
* of active subscriptions.
*
* For graceful shutdown of subscriptions with specific timeout behavior (e.g.,
* ensuring buffered messages are nacked before closing), please refer to the
* {@link Subscription#close} method. It is recommended to call
* `Subscription.close({timeout: ...})` directly on your active `Subscription`
* objects *before* calling `PubSub.close()` if you require that specific
* shutdown behavior.
*
* Calling `PubSub.close()` without first closing active subscriptions might
* result in abrupt termination of message processing for those subscriptions.
* Any pending operations on associated Topic or Subscription objects (e.g.,
* queued publish messages or unacked subscriber messages) may fail after
* `PubSub.close()` is called.
*
* @callback EmptyCallback
* @returns {Promise<void>}
* @param {Error} [err] Request error, if any.
* @returns {Promise<void>} Resolves when the clients are closed.
*/
close(): Promise<void>;
close(callback: EmptyCallback): void;
Expand Down
Loading
Loading