diff --git a/README.md b/README.md index 4972a7433..6bbb20f75 100644 --- a/README.md +++ b/README.md @@ -123,6 +123,7 @@ Samples are in the [`samples/`](https://github.com/googleapis/nodejs-pubsub/tree | Sample | Source Code | Try it | | --------------------------- | --------------------------------- | ------ | +| Close Subscription with Timeout | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/closeSubscriptionWithTimeout.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/closeSubscriptionWithTimeout.js,samples/README.md) | | Commit an Avro-Based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/commitAvroSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/commitAvroSchema.js,samples/README.md) | | Commit an Proto-Based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/commitProtoSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/commitProtoSchema.js,samples/README.md) | | Create an Avro based Schema | [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/createAvroSchema.js) | [![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/createAvroSchema.js,samples/README.md) | diff --git a/samples/README.md b/samples/README.md index 7635d0758..5d312dda1 100644 --- a/samples/README.md +++ b/samples/README.md @@ -20,6 +20,7 @@ guides. * [Before you begin](#before-you-begin) * [Samples](#samples) + * [Close Subscription with Timeout](#close-subscription-with-timeout) * [Commit an Avro-Based Schema](#commit-an-avro-based-schema) * [Commit an Proto-Based Schema](#commit-an-proto-based-schema) * [Create an Avro based Schema](#create-an-avro-based-schema) @@ -108,6 +109,25 @@ Before running the samples, make sure you've followed the steps outlined in +### Close Subscription with Timeout + +Demonstrates closing a subscription with a specified timeout for graceful shutdown. + +View the [source code](https://github.com/googleapis/nodejs-pubsub/blob/main/samples/closeSubscriptionWithTimeout.js). + +[![Open in Cloud Shell][shell_img]](https://console.cloud.google.com/cloudshell/open?git_repo=https://github.com/googleapis/nodejs-pubsub&page=editor&open_in_editor=samples/closeSubscriptionWithTimeout.js,samples/README.md) + +__Usage:__ + + +`node closeSubscriptionWithTimeout.js ` + + +----- + + + + ### Commit an Avro-Based Schema Commits a new schema definition revision on a project, using Avro diff --git a/samples/closeSubscriptionWithTimeout.js b/samples/closeSubscriptionWithTimeout.js new file mode 100644 index 000000000..76c8940ee --- /dev/null +++ b/samples/closeSubscriptionWithTimeout.js @@ -0,0 +1,97 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This is a generated sample, using the typeless sample bot. Please +// look for the source TypeScript sample (.ts) for modifications. +'use strict'; + +/** + * This sample demonstrates how to use the `timeout` option when closing a Pub/Sub + * subscription using the Node.js client library. The timeout allows for graceful + * shutdown, attempting to nack any buffered messages before closing. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Close Subscription with Timeout +// description: Demonstrates closing a subscription with a specified timeout for graceful shutdown. +// usage: node closeSubscriptionWithTimeout.js + +// This sample is currently speculative. +// -START pubsub_close_subscription_with_timeout] + +// Imports the Google Cloud client library +const { + PubSub, + Duration, + SubscriptionCloseBehaviors, +} = require('@google-cloud/pubsub'); + +// Creates a client; cache this for further use +const pubsub = new PubSub(); + +async function closeSubscriptionWithTimeout( + topicNameOrId, + subscriptionNameOrId, +) { + const topic = pubsub.topic(topicNameOrId); + + // Closes the subscription immediately, not waiting for anything. + let subscription = topic.subscription(subscriptionNameOrId, { + closeOptions: { + timeout: Duration.from({seconds: 0}), + }, + }); + await subscription.close(); + + // Shuts down the gRPC connection, and waits for just before the timeout + // to send nacks for buffered messages. If `timeout` were missing, this + // would wait for the maximum leasing timeout. + subscription = topic.subscription(subscriptionNameOrId, { + closeOptions: { + behavior: SubscriptionCloseBehaviors.WaitForProcessing, + timeout: Duration.from({seconds: 10}), + }, + }); + await subscription.close(); + + // Shuts down the gRPC connection, sends nacks for buffered messages, and waits + // through the timeout for nacks to send. + subscription = topic.subscription(subscriptionNameOrId, { + closeOptions: { + behavior: SubscriptionCloseBehaviors.NackImmediately, + timeout: Duration.from({seconds: 10}), + }, + }); + await subscription.close(); +} +// -END pubsub_close_subscription_with_timeout] + +// Presumes topic and subscription have been created prior to running the sample. +// If you uncomment the cleanup code above, the sample will delete them afterwards. +function main( + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', +) { + closeSubscriptionWithTimeout(topicNameOrId, subscriptionNameOrId).catch( + err => { + console.error(err.message); + process.exitCode = 1; + }, + ); +} + +main(...process.argv.slice(2)); diff --git a/samples/typescript/closeSubscriptionWithTimeout.ts b/samples/typescript/closeSubscriptionWithTimeout.ts new file mode 100644 index 000000000..a13965b54 --- /dev/null +++ b/samples/typescript/closeSubscriptionWithTimeout.ts @@ -0,0 +1,93 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * This sample demonstrates how to use the `timeout` option when closing a Pub/Sub + * subscription using the Node.js client library. The timeout allows for graceful + * shutdown, attempting to nack any buffered messages before closing. + * + * For more information, see the README.md under /pubsub and the documentation + * at https://cloud.google.com/pubsub/docs. + */ + +// sample-metadata: +// title: Close Subscription with Timeout +// description: Demonstrates closing a subscription with a specified timeout for graceful shutdown. +// usage: node closeSubscriptionWithTimeout.js + +// This sample is currently speculative. +// -START pubsub_close_subscription_with_timeout] + +// Imports the Google Cloud client library +import { + PubSub, + Duration, + SubscriptionCloseBehaviors, +} from '@google-cloud/pubsub'; + +// Creates a client; cache this for further use +const pubsub = new PubSub(); + +async function closeSubscriptionWithTimeout( + topicNameOrId: string, + subscriptionNameOrId: string, +) { + const topic = pubsub.topic(topicNameOrId); + + // Closes the subscription immediately, not waiting for anything. + let subscription = topic.subscription(subscriptionNameOrId, { + closeOptions: { + timeout: Duration.from({seconds: 0}), + }, + }); + await subscription.close(); + + // Shuts down the gRPC connection, and waits for just before the timeout + // to send nacks for buffered messages. If `timeout` were missing, this + // would wait for the maximum leasing timeout. + subscription = topic.subscription(subscriptionNameOrId, { + closeOptions: { + behavior: SubscriptionCloseBehaviors.WaitForProcessing, + timeout: Duration.from({seconds: 10}), + }, + }); + await subscription.close(); + + // Shuts down the gRPC connection, sends nacks for buffered messages, and waits + // through the timeout for nacks to send. + subscription = topic.subscription(subscriptionNameOrId, { + closeOptions: { + behavior: SubscriptionCloseBehaviors.NackImmediately, + timeout: Duration.from({seconds: 10}), + }, + }); + await subscription.close(); +} +// -END pubsub_close_subscription_with_timeout] + +// Presumes topic and subscription have been created prior to running the sample. +// If you uncomment the cleanup code above, the sample will delete them afterwards. +function main( + topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID', + subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID', +) { + closeSubscriptionWithTimeout(topicNameOrId, subscriptionNameOrId).catch( + err => { + console.error(err.message); + process.exitCode = 1; + }, + ); +} + +main(...process.argv.slice(2)); diff --git a/src/exponential-retry.ts b/src/exponential-retry.ts index dd12a5b0b..95cc2b5e3 100644 --- a/src/exponential-retry.ts +++ b/src/exponential-retry.ts @@ -77,8 +77,8 @@ export class ExponentialRetry { private _timer?: NodeJS.Timeout; constructor(backoff: Duration, maxBackoff: Duration) { - this._backoffMs = backoff.totalOf('millisecond'); - this._maxBackoffMs = maxBackoff.totalOf('millisecond'); + this._backoffMs = backoff.milliseconds; + this._maxBackoffMs = maxBackoff.milliseconds; } /** @@ -170,7 +170,7 @@ export class ExponentialRetry { next.retryInfo!.callback( next as unknown as T, - Duration.from({millis: now - next.retryInfo!.firstRetry}), + Duration.from({milliseconds: now - next.retryInfo!.firstRetry}), ); } else { break; diff --git a/src/index.ts b/src/index.ts index 2c9371069..2b1432cdb 100644 --- a/src/index.ts +++ b/src/index.ts @@ -143,6 +143,8 @@ export { SubscriptionMetadata, SubscriptionOptions, SubscriptionCloseCallback, + SubscriptionCloseOptions, + SubscriptionCloseBehaviors, CreateSubscriptionOptions, CreateSubscriptionCallback, CreateSubscriptionResponse, diff --git a/src/lease-manager.ts b/src/lease-manager.ts index d61998c00..fdc0de141 100644 --- a/src/lease-manager.ts +++ b/src/lease-manager.ts @@ -145,6 +145,7 @@ export class LeaseManager extends EventEmitter { */ clear(): Message[] { const wasFull = this.isFull(); + const wasEmpty = this.isEmpty(); if (this.pending > 0) { logs.subscriberFlowControl.info( @@ -161,11 +162,15 @@ export class LeaseManager extends EventEmitter { if (wasFull) { process.nextTick(() => this.emit('free')); } + if (!wasEmpty && this.isEmpty()) { + process.nextTick(() => this.emit('empty')); + } this._cancelExtension(); return remaining; } + /** * Indicates if we're at or over capacity. * @@ -176,6 +181,17 @@ export class LeaseManager extends EventEmitter { const {maxBytes, maxMessages} = this._options; return this.size >= maxMessages! || this.bytes >= maxBytes!; } + + /** + * True if we have no messages in leasing. + * + * @returns {boolean} + * @private + */ + isEmpty(): boolean { + return this._messages.size === 0; + } + /** * Removes a message from the inventory. Stopping the deadline extender if no * messages are left over. @@ -216,6 +232,10 @@ export class LeaseManager extends EventEmitter { this._dispense(this._pending.shift()!); } + if (this.isEmpty()) { + this.emit('empty'); + } + if (this.size === 0 && this._isLeasing) { this._cancelExtension(); } @@ -265,6 +285,7 @@ export class LeaseManager extends EventEmitter { if (this._subscriber.isOpen) { message.subSpans.flowEnd(); process.nextTick(() => { + message.dispatched(); logs.callbackDelivery.info( 'message (ID %s, ackID %s) delivery to user callbacks', message.id, diff --git a/src/message-stream.ts b/src/message-stream.ts index 0c547d026..60c404bcb 100644 --- a/src/message-stream.ts +++ b/src/message-stream.ts @@ -73,7 +73,7 @@ const DEFAULT_OPTIONS: MessageStreamOptions = { highWaterMark: 0, maxStreams: defaultOptions.subscription.maxStreams, timeout: 300000, - retryMinBackoff: Duration.from({millis: 100}), + retryMinBackoff: Duration.from({milliseconds: 100}), retryMaxBackoff: Duration.from({seconds: 60}), }; diff --git a/src/pubsub.ts b/src/pubsub.ts index 075d673f5..68a905255 100644 --- a/src/pubsub.ts +++ b/src/pubsub.ts @@ -352,15 +352,34 @@ export class PubSub { } /** - * Closes out this object, releasing any server connections. Note that once - * you close a PubSub object, it may not be used again. Any pending operations - * (e.g. queued publish messages) will fail. If you have topic or subscription - * objects that may have pending operations, you should call close() on those - * first if you want any pending messages to be delivered correctly. The + * Closes the PubSub client, releasing any underlying gRPC connections. + * + * Note that once you close a PubSub object, it may not be used again. Any pending + * operations (e.g. queued publish messages) will fail. If you have topic or + * subscription objects that may have pending operations, you should call close() + * on those first if you want any pending messages to be delivered correctly. The * PubSub class doesn't track those. + + * Note that this method primarily closes the gRPC clients (Publisher and Subscriber) + * used for API requests. It does **not** automatically handle the graceful shutdown + * of active subscriptions. + * + * For graceful shutdown of subscriptions with specific timeout behavior (e.g., + * ensuring buffered messages are nacked before closing), please refer to the + * {@link Subscription#close} method. It is recommended to call + * `Subscription.close({timeout: ...})` directly on your active `Subscription` + * objects *before* calling `PubSub.close()` if you require that specific + * shutdown behavior. + * + * Calling `PubSub.close()` without first closing active subscriptions might + * result in abrupt termination of message processing for those subscriptions. + * Any pending operations on associated Topic or Subscription objects (e.g., + * queued publish messages or unacked subscriber messages) may fail after + * `PubSub.close()` is called. * * @callback EmptyCallback - * @returns {Promise} + * @param {Error} [err] Request error, if any. + * @returns {Promise} Resolves when the clients are closed. */ close(): Promise; close(callback: EmptyCallback): void; diff --git a/src/subscriber.ts b/src/subscriber.ts index 74cfbdc2c..c12996862 100644 --- a/src/subscriber.ts +++ b/src/subscriber.ts @@ -17,6 +17,7 @@ import {DateStruct, PreciseDate} from '@google-cloud/precise-date'; import {replaceProjectIdToken} from '@google-cloud/projectify'; import {promisify} from '@google-cloud/promisify'; +import defer = require('p-defer'); import {google} from '../protos/protos'; import {Histogram} from './histogram'; @@ -27,8 +28,10 @@ import {Subscription} from './subscription'; import {defaultOptions} from './default-options'; import {SubscriberClient} from './v1'; import * as tracing from './telemetry-tracing'; -import {Duration} from './temporal'; +import {Duration, atMost as durationAtMost} from './temporal'; import {EventEmitter} from 'events'; + +import {awaitWithTimeout} from './util'; import {logs as baseLogs} from './logs'; export {StatusError} from './message-stream'; @@ -41,6 +44,7 @@ export {StatusError} from './message-stream'; export const logs = { slowAck: baseLogs.pubsub.sublog('slow-ack'), ackNack: baseLogs.pubsub.sublog('ack-nack'), + debug: baseLogs.pubsub.sublog('debug'), }; export type PullResponse = google.pubsub.v1.IStreamingPullResponse; @@ -57,6 +61,30 @@ export const AckResponses = { }; export type AckResponse = ValueOf; +/** + * Enum values for behaviors of the Subscriber.close() method. + */ +export const SubscriberCloseBehaviors = { + NackImmediately: 'NACK' as const, + WaitForProcessing: 'WAIT' as const, +}; +export type SubscriberCloseBehavior = ValueOf; + +/** + * Options to modify the behavior of the Subscriber.close() method. If + * none is passed, the default is SubscriberCloseBehaviors.Wait. + */ +export interface SubscriberCloseOptions { + behavior?: SubscriberCloseBehavior; + timeout?: Duration; +} + +/** + * Specifies how long before the final close timeout, in WaitForProcessing mode, + * that we should give up and start shutting down cleanly. + */ +const FINAL_NACK_TIMEOUT = Duration.from({seconds: 1}); + /** * Thrown when an error is detected in an ack/nack/modack call, when * exactly-once delivery is enabled on the subscription. This will @@ -241,10 +269,12 @@ export class Message implements tracing.MessageWithAttributes { orderingKey?: string; publishTime: PreciseDate; received: number; + private _handledPromise: defer.DeferredPromise; private _handled: boolean; private _length: number; private _subscriber: Subscriber; private _ackFailed?: AckError; + private _dispatched: boolean; /** * @private @@ -378,7 +408,9 @@ export class Message implements tracing.MessageWithAttributes { */ this.isExactlyOnceDelivery = sub.isExactlyOnceDelivery; + this._dispatched = false; this._handled = false; + this._handledPromise = defer(); this._length = this.data.length; this._subscriber = sub; } @@ -392,6 +424,38 @@ export class Message implements tracing.MessageWithAttributes { return this._length; } + /** + * Resolves when the message has been handled fully; a handled message may + * not have any further operations performed on it. + * + * @private + */ + get handledPromise(): Promise { + return this._handledPromise.promise; + } + + /** + * When this message is dispensed to user callback code, this should be called. + * The time between the dispatch and the handledPromise resolving is when the + * message is with the user. + * + * @private + */ + dispatched(): void { + if (!this._dispatched) { + this.subSpans.processingStart(this._subscriber.name); + this._dispatched = true; + } + } + + /** + * @private + * @returns True if this message has been dispatched to user callback code. + */ + isDispatched(): boolean { + return this._dispatched; + } + /** * Sets this message's exactly once delivery acks to permanent failure. This is * meant for internal library use only. @@ -418,6 +482,7 @@ export class Message implements tracing.MessageWithAttributes { this.subSpans.ackCall(); this.subSpans.processingEnd(); void this._subscriber.ack(this); + this._handledPromise.resolve(); } } @@ -451,6 +516,8 @@ export class Message implements tracing.MessageWithAttributes { } catch (e) { this.ackFailed(e as AckError); throw e; + } finally { + this._handledPromise.resolve(); } } else { return AckResponses.Invalid; @@ -518,6 +585,7 @@ export class Message implements tracing.MessageWithAttributes { this.subSpans.nackCall(); this.subSpans.processingEnd(); void this._subscriber.nack(this); + this._handledPromise.resolve(); } } @@ -552,6 +620,8 @@ export class Message implements tracing.MessageWithAttributes { } catch (e) { this.ackFailed(e as AckError); throw e; + } finally { + this._handledPromise.resolve(); } } else { return AckResponses.Invalid; @@ -580,6 +650,18 @@ export class Message implements tracing.MessageWithAttributes { * settings at the Cloud PubSub server and uses the less accurate method * of only enforcing flow control at the client side. * @property {MessageStreamOptions} [streamingOptions] Streaming options. + * If no options are passed, it behaves like `SubscriberCloseBehaviors.Wait`. + * @property {SubscriberCloseOptions} [options] Determines the basic behavior of the + * close() function. + * @property {SubscriberCloseBehavior} [options.behavior] The behavior of the close operation. + * - NackImmediately: Sends nacks for all messages held by the client library, and + * wait for them to send. + * - WaitForProcessing: Continues normal ack/nack and leasing processes until close + * to the timeout, then switches to NackImmediately behavior to close down. + * Use {@link SubscriberCloseBehaviors} for enum values. + * @property {Duration} [options.timeout] In the case of Timeout, the maximum duration + * to wait for pending ack/nack requests to complete before resolving (or rejecting) + * the promise. */ export interface SubscriberOptions { minAckDeadline?: Duration; @@ -589,6 +671,7 @@ export interface SubscriberOptions { flowControl?: FlowControlOptions; useLegacyFlowControl?: boolean; streamingOptions?: MessageStreamOptions; + closeOptions?: SubscriberCloseOptions; } const minAckDeadlineForExactlyOnceDelivery = Duration.from({seconds: 60}); @@ -849,11 +932,31 @@ export class Subscriber extends EventEmitter { return AckResponses.Success; } + async #awaitTimeoutAndCheck( + promise: Promise, + timeout: Duration, + ): Promise { + const result = await awaitWithTimeout(promise, timeout); + if (result.exception || result.timedOut) { + // Don't try to deal with errors at this point, just warn-log. + if (result.timedOut === false) { + // This wasn't a timeout. + logs.debug.warn( + 'Error during Subscriber.close(): %j', + result.exception, + ); + } + } + } + /** - * Closes the subscriber. The returned promise will resolve once any pending - * acks/modAcks are finished. + * Closes the subscriber, stopping the reception of new messages and shutting + * down the underlying stream. The behavior of the returned Promise will depend + * on the closeOptions in the subscriber options. + * + * @returns {Promise} A promise that resolves when the subscriber is closed + * and pending operations are flushed or the timeout is reached. * - * @returns {Promise} * @private */ async close(): Promise { @@ -861,12 +964,72 @@ export class Subscriber extends EventEmitter { return; } + // Always close the stream right away so we don't receive more messages. this.isOpen = false; this._stream.destroy(); + + const options = this._options.closeOptions; + + // If no behavior is specified, default to Wait. + const behavior = + options?.behavior ?? SubscriberCloseBehaviors.WaitForProcessing; + + // The timeout can't realistically be longer than the longest time we're willing + // to lease messages. + let timeout = durationAtMost( + options?.timeout ?? this.maxExtensionTime, + this.maxExtensionTime, + ); + + // If the user specified a zero timeout, just bail immediately. + if (!timeout.milliseconds) { + this._inventory.clear(); + return; + } + + // Warn the user if the timeout is too short for NackImmediately. + if (Duration.compare(timeout, FINAL_NACK_TIMEOUT) < 0) { + logs.debug.warn( + 'Subscriber.close() timeout is less than the final shutdown time (%i ms). This may result in lost nacks.', + timeout.milliseconds, + ); + } + + // If we're in WaitForProcessing mode, then we first need to derive a NackImmediately + // timeout point. If everything finishes before then, we also want to go ahead and bail cleanly. + const shutdownStart = Date.now(); + if ( + behavior === SubscriberCloseBehaviors.WaitForProcessing && + !this._inventory.isEmpty + ) { + const waitTimeout = timeout.subtract(FINAL_NACK_TIMEOUT); + + const emptyPromise = new Promise(r => { + this._inventory.on('empty', r); + }); + + await this.#awaitTimeoutAndCheck(emptyPromise, waitTimeout); + } + + // Now we head into immediate shutdown mode with what time is left. + timeout = timeout.subtract({ + milliseconds: Date.now() - shutdownStart, + }); + if (timeout.milliseconds <= 0) { + // This probably won't work out, but go through the motions. + timeout = Duration.from({milliseconds: 0}); + } + + // Grab everything left in inventory. This includes messages that have already + // been dispatched to user callbacks. const remaining = this._inventory.clear(); + remaining.forEach(m => m.nack()); - await this._waitForFlush(); + // Wait for user callbacks to complete. + const flushCompleted = this._waitForFlush(); + await this.#awaitTimeoutAndCheck(flushCompleted, timeout); + // Clean up OTel spans for any remaining messages. remaining.forEach(m => { m.subSpans.shutdown(); m.endParentSpan(); @@ -1173,21 +1336,23 @@ export class Subscriber extends EventEmitter { * * @private * - * @returns {Promise} + * @returns {Promise} */ private async _waitForFlush(): Promise { const promises: Array> = []; + // Flush any batched requests immediately. if (this._acks.numPendingRequests) { promises.push(this._acks.onFlush()); - await this._acks.flush('message count'); + this._acks.flush('message count').catch(() => {}); } if (this._modAcks.numPendingRequests) { promises.push(this._modAcks.onFlush()); - await this._modAcks.flush('message count'); + this._modAcks.flush('message count').catch(() => {}); } + // Now, prepare the drain promises. if (this._acks.numInFlightRequests) { promises.push(this._acks.onDrain()); } @@ -1196,6 +1361,7 @@ export class Subscriber extends EventEmitter { promises.push(this._modAcks.onDrain()); } + // Wait for the flush promises. await Promise.all(promises); } } diff --git a/src/subscription.ts b/src/subscription.ts index 6a5f6023e..06e4ad602 100644 --- a/src/subscription.ts +++ b/src/subscription.ts @@ -41,7 +41,13 @@ import { SeekResponse, Snapshot, } from './snapshot'; -import {Message, Subscriber, SubscriberOptions} from './subscriber'; +import { + Message, + Subscriber, + SubscriberOptions, + SubscriberCloseOptions, + SubscriberCloseBehaviors, +} from './subscriber'; import {Topic} from './topic'; import {promisifySome} from './util'; import {StatusError} from './message-stream'; @@ -61,6 +67,8 @@ export type SubscriptionMetadata = { export type SubscriptionOptions = SubscriberOptions & {topic?: Topic}; export type SubscriptionCloseCallback = (err?: Error) => void; +export type SubscriptionCloseOptions = SubscriberCloseOptions; +export const SubscriptionCloseBehaviors = SubscriberCloseBehaviors; type SubscriptionCallback = ResourceCallback< Subscription, @@ -357,24 +365,17 @@ export class Subscription extends EventEmitter { } /** - * Closes the Subscription, once this is called you will no longer receive + * Closes the Subscription. Once this is called you will no longer receive * message events unless you call {Subscription#open} or add new message * listeners. * - * @param {function} [callback] The callback function. - * @param {?error} callback.err An error returned while closing the - * Subscription. + * @param {function} [callback] The callback function, if not using the Promise-based + * call signature. + * @param {?error} [callback.err] An error returned while closing the Subscription. * * @example * ``` - * subscription.close(err => { - * if (err) { - * // Error handling omitted. - * } - * }); - * - * // If the callback is omitted a Promise will be returned. - * subscription.close().then(() => {}); + * await subscription.close(); * ``` */ close(): Promise; diff --git a/src/temporal.ts b/src/temporal.ts index d20009960..d92aa0c1a 100644 --- a/src/temporal.ts +++ b/src/temporal.ts @@ -29,15 +29,44 @@ export interface DurationLike { hours?: number; minutes?: number; seconds?: number; + milliseconds?: number; + + /** + * tc39 has renamed this to milliseconds. + * + * @deprecated + */ millis?: number; } /** * Simplified list of values to pass to Duration.totalOf(). This * list is taken from the tc39 Temporal.Duration proposal, but - * larger and smaller units have been left off. + * larger and smaller units have been left off. The latest tc39 spec + * allows for both singular and plural forms. + */ +export type TotalOfUnit = + | 'hour' + | 'minute' + | 'second' + | 'millisecond' + | 'hours' + | 'minutes' + | 'seconds' + | 'milliseconds'; + +interface TypeCheck { + total(): number; +} + +/** + * Is it a Duration or a DurationLike? + * + * @private */ -export type TotalOfUnit = 'hour' | 'minute' | 'second' | 'millisecond'; +export function isDurationObject(value: unknown): value is Duration { + return typeof value === 'object' && !!(value as TypeCheck).total; +} /** * Duration class with an interface similar to the tc39 Temporal @@ -46,9 +75,10 @@ export type TotalOfUnit = 'hour' | 'minute' | 'second' | 'millisecond'; * used to set durations in Pub/Sub. * * This class will remain here for at least the next major version, - * eventually to be replaced by the tc39 Temporal built-in. + * eventually to be replaced by the tc39 Temporal.Duration built-in. * * https://tc39.es/proposal-temporal/docs/duration.html + * https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Temporal/Duration */ export class Duration { private millis: number; @@ -64,33 +94,133 @@ export class Duration { /** * Calculates the total number of units of type 'totalOf' that would * fit inside this duration. + * + * No longer part of the tc39 spec, superseded by total(). + * + * @deprecated */ totalOf(totalOf: TotalOfUnit): number { + return this.total(totalOf); + } + + /** + * Calculates the total number of units of type 'totalOf' that would + * fit inside this duration. The tc39 `options` parameter is not supported. + */ + total(totalOf: TotalOfUnit): number { switch (totalOf) { case 'hour': + case 'hours': return this.millis / Duration.hourInMillis; case 'minute': + case 'minutes': return this.millis / Duration.minuteInMillis; case 'second': + case 'seconds': return this.millis / Duration.secondInMillis; case 'millisecond': + case 'milliseconds': return this.millis; default: - throw new Error(`Invalid unit in call to totalOf(): ${totalOf}`); + throw new Error(`Invalid unit in call to total(): ${totalOf}`); } } + /** + * Equivalent to `total('hour')`. + */ + get hours(): number { + return this.total('hour'); + } + + /** + * Equivalent to `total('minute')`. + */ + get minutes(): number { + return this.total('minute'); + } + + /** + * Equivalent to `total('second')`. + */ + get seconds(): number { + return this.total('second'); + } + + /** + * Equivalent to `total('millisecond')`. + */ + get milliseconds(): number { + return this.total('millisecond'); + } + + /** + * Adds another Duration to this one and returns a new Duration. + * + * @param other A Duration or Duration-like object, like from() takes. + * @returns A new Duration. + */ + add(other: DurationLike | Duration): Duration { + const otherDuration = Duration.from(other); + return Duration.from({ + millis: this.milliseconds + otherDuration.milliseconds, + }); + } + + /** + * Subtracts another Duration from this one and returns a new Duration. + * + * @param other A Duration or Duration-like object, like from() takes. + * @returns A new Duration. + */ + subtract(other: DurationLike | Duration): Duration { + const otherDuration = Duration.from(other); + return Duration.from({ + millis: this.milliseconds - otherDuration.milliseconds, + }); + } + /** * Creates a Duration from a DurationLike, which is an object * containing zero or more of the following: hours, seconds, * minutes, millis. */ - static from(durationLike: DurationLike): Duration { - let millis = durationLike.millis ?? 0; + static from(durationLike: DurationLike | Duration): Duration { + if (isDurationObject(durationLike)) { + const d = durationLike as Duration; + return new Duration(d.milliseconds); + } + + let millis = durationLike.milliseconds ?? durationLike.millis ?? 0; millis += (durationLike.seconds ?? 0) * Duration.secondInMillis; millis += (durationLike.minutes ?? 0) * Duration.minuteInMillis; millis += (durationLike.hours ?? 0) * Duration.hourInMillis; return new Duration(millis); } + + /** + * Compare two Duration objects. Returns -1 if the first is less than the + * second, zero if they are equal, 1 if the first is greater than the second. + * + * Unlike tc39, this version does not accept options for relativeTo. + */ + static compare(first: Duration, second: Duration) { + const diffMs = first.total('millisecond') - second.total('millisecond'); + if (diffMs < 0) { + return -1; + } + if (diffMs > 0) { + return 1; + } + return 0; + } } + +// Simple accessors that can be used independent of the class. These are +// functions and not methods because we don't want to add to what's in +// the tc39 spec. +export const atLeast = (d: Duration, min: Duration) => + Duration.compare(d, min) < 0 ? min : d; +export const atMost = (d: Duration, max: Duration) => + Duration.compare(d, max) > 0 ? max : d; diff --git a/src/util.ts b/src/util.ts index 04bfb270d..9ee6a5e28 100644 --- a/src/util.ts +++ b/src/util.ts @@ -15,6 +15,7 @@ */ import {promisify, PromisifyOptions} from '@google-cloud/promisify'; +import {Duration} from './temporal'; /** * This replaces usage of promisifyAll(), going forward. Instead of opting @@ -83,3 +84,52 @@ export function addToBucket(map: Map, bucket: T, item: U) { items.push(item); map.set(bucket, items); } + +const timeoutToken: unique symbol = Symbol('pubsub promise timeout'); + +/** + * Return value from `awaitWithTimeout`. There are several variations on what + * might happen here, so this bundles it all up into a "report". + */ +export interface TimeoutReturn { + returnedValue?: T; + exception?: Error; + timedOut: boolean; +} + +/** + * Awaits on Promise with a timeout. Resolves on the passed promise resolving or + * rejecting, or on timeout. + * + * @param promise An existing Promise returning type T. + * @param timeout A timeout value as a Duration; if the timeout is exceeded while + * waiting for the promise, the Promise this function returns will resolve, but + * with `timedOut` set. + * @returns A TimeoutReturn with the returned value, if applicable, an exception if + * the promise rejected, or the timedOut set to true if it timed out. + */ +export async function awaitWithTimeout( + promise: Promise, + timeout: Duration, +): Promise> { + let timeoutId: NodeJS.Timeout | undefined; + const timeoutPromise = new Promise((_, rej) => { + timeoutId = setTimeout(() => rej(timeoutToken), timeout.milliseconds); + }); + try { + const value = await Promise.race([timeoutPromise, promise]); + clearTimeout(timeoutId); + return { + returnedValue: value, + timedOut: false, + }; + } catch (e) { + const err: Error | symbol = e as unknown as Error | symbol; + // The timeout passed or the promise rejected. + clearTimeout(timeoutId); + return { + exception: (err !== timeoutToken ? err : undefined) as Error | undefined, + timedOut: err === timeoutToken, + }; + } +} diff --git a/test/exponential-retry.ts b/test/exponential-retry.ts index 50834032a..5a36bd04d 100644 --- a/test/exponential-retry.ts +++ b/test/exponential-retry.ts @@ -55,8 +55,8 @@ describe('exponential retry class', () => { it('makes the first callback', () => { const clock = TestUtils.useFakeTimers(sandbox); const er = new ExponentialRetry( - Duration.from({millis: 100}), - Duration.from({millis: 1000}), + Duration.from({milliseconds: 100}), + Duration.from({milliseconds: 1000}), ); sandbox.stub(global.Math, 'random').returns(0.75); @@ -64,7 +64,7 @@ describe('exponential retry class', () => { let retried = false; er.retryLater(item, (s: typeof item, t: Duration) => { assert.strictEqual(s, item); - assert.strictEqual(t.totalOf('millisecond'), 125); + assert.strictEqual(t.milliseconds, 125); retried = true; }); @@ -78,8 +78,8 @@ describe('exponential retry class', () => { it('closes gracefully', () => { const clock = TestUtils.useFakeTimers(sandbox); const er = new ExponentialRetry( - Duration.from({millis: 100}), - Duration.from({millis: 1000}), + Duration.from({milliseconds: 100}), + Duration.from({milliseconds: 1000}), ); sandbox.stub(global.Math, 'random').returns(0.75); @@ -87,7 +87,7 @@ describe('exponential retry class', () => { const item = makeItem(); er.retryLater(item, (s: typeof item, t: Duration) => { assert.strictEqual(s, item); - assert.strictEqual(t.totalOf('millisecond'), 125); + assert.strictEqual(t.milliseconds, 125); called = true; }); @@ -108,13 +108,13 @@ describe('exponential retry class', () => { it('backs off exponentially', () => { const clock = TestUtils.useFakeTimers(sandbox); const er = new ExponentialRetry( - Duration.from({millis: 100}), - Duration.from({millis: 1000}), + Duration.from({milliseconds: 100}), + Duration.from({milliseconds: 1000}), ); sandbox.stub(global.Math, 'random').returns(0.75); let callbackCount = 0; - let callbackTime: Duration = Duration.from({millis: 0}); + let callbackTime: Duration = Duration.from({milliseconds: 0}); const item = makeItem(); const callback = (s: TestItem, t: Duration) => { @@ -129,11 +129,11 @@ describe('exponential retry class', () => { clock.tick(125); assert.strictEqual(callbackCount, 1); - assert.strictEqual(callbackTime.totalOf('millisecond'), 125); + assert.strictEqual(callbackTime.milliseconds, 125); clock.tick(400); assert.strictEqual(callbackCount, 2); - assert.strictEqual(callbackTime.totalOf('millisecond'), 375); + assert.strictEqual(callbackTime.milliseconds, 375); const leftovers = er.close(); assert.strictEqual(leftovers.length, 0); @@ -143,13 +143,13 @@ describe('exponential retry class', () => { const clock = TestUtils.useFakeTimers(sandbox); const item = makeItem(); const er = new ExponentialRetry( - Duration.from({millis: 100}), - Duration.from({millis: 150}), + Duration.from({milliseconds: 100}), + Duration.from({milliseconds: 150}), ); sandbox.stub(global.Math, 'random').returns(0.75); let callbackCount = 0; - let callbackTime: Duration = Duration.from({millis: 0}); + let callbackTime: Duration = Duration.from({milliseconds: 0}); const callback = (s: TestItem, t: Duration) => { assert.strictEqual(s, item); @@ -163,11 +163,11 @@ describe('exponential retry class', () => { clock.tick(125); assert.strictEqual(callbackCount, 1); - assert.strictEqual(callbackTime.totalOf('millisecond'), 125); + assert.strictEqual(callbackTime.milliseconds, 125); clock.tick(400); assert.strictEqual(callbackCount, 2); - assert.strictEqual(callbackTime.totalOf('millisecond'), 312); + assert.strictEqual(callbackTime.milliseconds, 312); const leftovers = er.close(); assert.strictEqual(leftovers.length, 0); @@ -178,8 +178,8 @@ describe('exponential retry class', () => { const items = [makeItem(), makeItem()]; const er = new ExponentialRetry( - Duration.from({millis: 100}), - Duration.from({millis: 1000}), + Duration.from({milliseconds: 100}), + Duration.from({milliseconds: 1000}), ); // Just disable the fuzz for this test. @@ -187,8 +187,8 @@ describe('exponential retry class', () => { const callbackCounts = [0, 0]; const callbackTimes: Duration[] = [ - Duration.from({millis: 0}), - Duration.from({millis: 0}), + Duration.from({milliseconds: 0}), + Duration.from({milliseconds: 0}), ]; const callback = (s: TestItem, t: Duration) => { @@ -207,7 +207,7 @@ describe('exponential retry class', () => { clock.tick(300); assert.deepStrictEqual(callbackCounts, [2, 0]); assert.deepStrictEqual( - callbackTimes.map(d => d.totalOf('millisecond')), + callbackTimes.map(d => d.milliseconds), [300, 0], ); @@ -220,7 +220,7 @@ describe('exponential retry class', () => { // while the second item should've retried once and quit. assert.deepStrictEqual(callbackCounts, [2, 1]); assert.deepStrictEqual( - callbackTimes.map(d => d.totalOf('millisecond')), + callbackTimes.map(d => d.milliseconds), [300, 100], ); diff --git a/test/lease-manager.ts b/test/lease-manager.ts index 4bfddb70a..0d74a3963 100644 --- a/test/lease-manager.ts +++ b/test/lease-manager.ts @@ -66,6 +66,7 @@ class FakeMessage { length = 20; received: number; subSpans: FakeSubscriberTelemetry = new FakeSubscriberTelemetry(); + _dispatched = false; constructor() { this.received = Date.now(); @@ -76,6 +77,15 @@ class FakeMessage { } ackFailed() {} endParentSpan() {} + dispatched() { + this._dispatched = true; + } + get isDispatched() { + return this._dispatched; + } + get handledPromise() { + return Promise.resolve(); + } } interface LeaseManagerInternals { @@ -98,6 +108,8 @@ describe('LeaseManager', () => { let LeaseManager: typeof leaseTypes.LeaseManager; let leaseManager: leaseTypes.LeaseManager; + let fakeLog: FakeLog | undefined; + before(() => { LeaseManager = proxyquire('../src/lease-manager.js', { os: fakeos, @@ -111,6 +123,7 @@ describe('LeaseManager', () => { }); afterEach(() => { + fakeLog?.remove(); leaseManager.clear(); sandbox.restore(); }); @@ -187,20 +200,20 @@ describe('LeaseManager', () => { fakeMessage.id = 'a'; fakeMessage.ackId = 'b'; - const fakeLog = new FakeLog(leaseTypes.logs.callbackDelivery); + fakeLog = new FakeLog(leaseTypes.logs.callbackDelivery); leaseManager.setOptions({ allowExcessMessages: true, }); subscriber.on('message', () => { - assert.strictEqual(fakeLog.called, true); + assert.strictEqual(fakeLog!.called, true); assert.strictEqual( - fakeLog.fields!.severity, + fakeLog!.fields!.severity, loggingUtils.LogSeverity.INFO, ); - assert.strictEqual(fakeLog.args![1] as string, 'a'); - assert.strictEqual(fakeLog.args![2] as string, 'b'); + assert.strictEqual(fakeLog!.args![1] as string, 'a'); + assert.strictEqual(fakeLog!.args![2] as string, 'b'); done(); }); @@ -212,7 +225,7 @@ describe('LeaseManager', () => { fakeMessage.id = 'a'; fakeMessage.ackId = 'b'; - const fakeLog = new FakeLog(leaseTypes.logs.callbackExceptions); + fakeLog = new FakeLog(leaseTypes.logs.callbackExceptions); leaseManager.setOptions({ allowExcessMessages: true, @@ -275,7 +288,7 @@ describe('LeaseManager', () => { const pendingStub = sandbox.stub(leaseManager, 'pending'); pendingStub.get(() => 0); leaseManager.setOptions({allowExcessMessages: false}); - const fakeLog = new FakeLog(leaseTypes.logs.subscriberFlowControl); + fakeLog = new FakeLog(leaseTypes.logs.subscriberFlowControl); leaseManager.add(fakeMessage); assert.strictEqual(fakeLog.called, true); @@ -382,7 +395,7 @@ describe('LeaseManager', () => { const removeStub = sandbox.stub(leaseManager, 'remove'); const modAckStub = sandbox.stub(goodMessage, 'modAck'); - const fakeLog = new FakeLog(leaseTypes.logs.expiry); + fakeLog = new FakeLog(leaseTypes.logs.expiry); leaseManager.add(goodMessage as {} as Message); clock.tick(halfway); @@ -464,7 +477,7 @@ describe('LeaseManager', () => { }); it('should log if it was full and is now empty', () => { - const fakeLog = new FakeLog(leaseTypes.logs.subscriberFlowControl); + fakeLog = new FakeLog(leaseTypes.logs.subscriberFlowControl); const pendingStub = sandbox.stub(leaseManager, 'pending'); pendingStub.get(() => 0); leaseManager.add(new FakeMessage() as {} as Message); @@ -603,7 +616,7 @@ describe('LeaseManager', () => { const pending = new FakeMessage() as {} as Message; leaseManager.setOptions({allowExcessMessages: false, maxMessages: 1}); - const fakeLog = new FakeLog(leaseTypes.logs.subscriberFlowControl); + fakeLog = new FakeLog(leaseTypes.logs.subscriberFlowControl); leaseManager.add(temp); leaseManager.add(pending); diff --git a/test/message-queues.ts b/test/message-queues.ts index bc88de2d5..f91129e7d 100644 --- a/test/message-queues.ts +++ b/test/message-queues.ts @@ -415,6 +415,8 @@ describe('MessageQueues', () => { messages.forEach(message => ackQueue.add(message as Message)); await ackQueue.flush('logtest'); + fakeLog.remove(); + assert.strictEqual(fakeLog.called, true); assert.strictEqual( fakeLog.fields!.severity, @@ -674,6 +676,8 @@ describe('MessageQueues', () => { messages.forEach(message => modAckQueue.add(message as Message)); await modAckQueue.flush('logtest'); + fakeLog.remove(); + assert.strictEqual(fakeLog.called, true); assert.strictEqual( fakeLog.fields!.severity, diff --git a/test/publisher/message-queues.ts b/test/publisher/message-queues.ts index 89136b0f0..7c810a9be 100644 --- a/test/publisher/message-queues.ts +++ b/test/publisher/message-queues.ts @@ -184,6 +184,8 @@ describe('Message Queues', () => { sandbox.stub(topic, 'request'); const fakeLog = new FakeLog(q.logs.publishBatch); void queue._publish(messages, callbacks, 0, 'test'); + fakeLog.remove(); + assert.strictEqual(fakeLog.called, true); assert.strictEqual( fakeLog.fields!.severity, diff --git a/test/subscriber.ts b/test/subscriber.ts index e022e8bfa..4e6917b15 100644 --- a/test/subscriber.ts +++ b/test/subscriber.ts @@ -104,6 +104,11 @@ class FakeLeaseManager extends EventEmitter { } // eslint-disable-next-line @typescript-eslint/no-unused-vars remove(message: s.Message): void {} + + _isEmpty = true; + get isEmpty() { + return this._isEmpty; + } } class FakeQueue { @@ -180,6 +185,7 @@ interface SubInternals { _inventory: FakeLeaseManager; _onData(response: PullResponse): void; _discardMessage(message: s.Message): void; + _waitForFlush(timeout?: Duration): Promise; } function getSubInternals(sub: s.Subscriber) { @@ -200,6 +206,8 @@ describe('Subscriber', () => { let Subscriber: typeof s.Subscriber; let subscriber: s.Subscriber; + let fakeLog: FakeLog | undefined; + beforeEach(() => { sandbox = sinon.createSandbox(); fakeProjectify = { @@ -237,6 +245,8 @@ describe('Subscriber', () => { }); afterEach(async () => { + fakeLog?.remove(); + fakeLog = undefined; sandbox.restore(); await subscriber.close(); tracing.setGloballyEnabled(false); @@ -435,7 +445,7 @@ describe('Subscriber', () => { }); it('should log on ack completion', async () => { - const fakeLog = new FakeLog(s.logs.ackNack); + fakeLog = new FakeLog(s.logs.ackNack); await subscriber.ack(message); @@ -453,7 +463,7 @@ describe('Subscriber', () => { message.received = 0; sandbox.stub(histogram, 'percentile').withArgs(99).returns(10); - const fakeLog = new FakeLog(s.logs.slowAck); + fakeLog = new FakeLog(s.logs.slowAck); await subscriber.ack(message); @@ -605,6 +615,10 @@ describe('Subscriber', () => { const inventory: FakeLeaseManager = stubs.get('inventory'); const stub = sandbox.stub(inventory, 'clear').returns([message]); + // The leaser would've immediately called dispatched(). Pretend that + // we're user code. + message.ack(); + await subscriber.close(); assert.strictEqual(stub.callCount, 1); assert.strictEqual(shutdownStub.callCount, 1); @@ -654,7 +668,7 @@ describe('Subscriber', () => { assert.strictEqual(modAckFlush.callCount, 1); }); - it('should resolve if no messages are pending', () => { + it('should resolve if no messages are pending', async () => { const ackQueue: FakeAckQueue = stubs.get('ackQueue'); sandbox.stub(ackQueue, 'flush').rejects(); @@ -666,7 +680,7 @@ describe('Subscriber', () => { sandbox.stub(modAckQueue, 'flush').rejects(); sandbox.stub(modAckQueue, 'onFlush').rejects(); - return subscriber.close(); + await subscriber.close(); }); it('should wait for in-flight messages to drain', async () => { @@ -683,6 +697,179 @@ describe('Subscriber', () => { assert.strictEqual(modAckOnDrain.callCount, 1); }); }); + + describe('close with timeout', () => { + let clock: sinon.SinonFakeTimers; + let inventory: FakeLeaseManager; + let ackQueue: FakeAckQueue; + let modAckQueue: FakeModAckQueue; + let subInternals: SubInternals; + + beforeEach(() => { + clock = sandbox.useFakeTimers(); + inventory = stubs.get('inventory'); + ackQueue = stubs.get('ackQueue'); + modAckQueue = stubs.get('modAckQueue'); + + // Ensure subscriber is open before each test + if (!subscriber.isOpen) { + subscriber.open(); + } + + subInternals = subscriber as unknown as SubInternals; + }); + + afterEach(() => { + clock.restore(); + }); + + it('should do nothing if isOpen = false', async () => { + const destroySpy = sandbox.spy(subInternals._stream, 'destroy'); + subscriber.isOpen = false; + await subscriber.close(); + assert.strictEqual(destroySpy.callCount, 0); + }); + + it('should clear inventory and bail for timeout = 0', async () => { + const clearSpy = sandbox.spy(inventory, 'clear'); + const onSpy = sandbox.spy(inventory, 'on'); + subscriber.setOptions({ + closeOptions: { + timeout: Duration.from({milliseconds: 0}), + }, + }); + await subscriber.close(); + assert.strictEqual(clearSpy.callCount, 1); + assert.strictEqual(onSpy.callCount, 0); + }); + + it('should not wait for an empty inventory in NackImmediately', async () => { + const onSpy = sandbox.spy(inventory, 'on'); + subscriber.setOptions({ + closeOptions: { + behavior: s.SubscriberCloseBehaviors.NackImmediately, + timeout: Duration.from({milliseconds: 100}), + }, + }); + await subscriber.close(); + assert.strictEqual(onSpy.callCount, 0); + }); + + it('should not wait for an empty inventory in WaitForProcessing if empty', async () => { + const onSpy = sandbox.spy(inventory, 'on'); + subscriber.setOptions({ + closeOptions: { + behavior: s.SubscriberCloseBehaviors.WaitForProcessing, + timeout: Duration.from({milliseconds: 100}), + }, + }); + await subscriber.close(); + assert.strictEqual(onSpy.callCount, 0); + }); + + it('should wait for an empty inventory in WaitForProcessing if not empty', async () => { + inventory._isEmpty = false; + const onSpy = sandbox.spy(inventory, 'on'); + subscriber.setOptions({ + closeOptions: { + behavior: s.SubscriberCloseBehaviors.WaitForProcessing, + timeout: Duration.from({seconds: 2}), + }, + }); + const prom = subscriber.close(); + assert.strictEqual(onSpy.callCount, 1); + clock.tick(3000); + await prom; + }); + + it('should nack remaining messages if timeout is non-zero', async () => { + const mockMessages = [ + new Message(subscriber, RECEIVED_MESSAGE), + new Message(subscriber, RECEIVED_MESSAGE), + ]; + sandbox.stub(inventory, 'clear').returns(mockMessages); + const nackSpy = sandbox.spy(subscriber, 'nack'); + + subscriber.setOptions({ + closeOptions: { + timeout: Duration.from({seconds: 5}), + }, + }); + const prom = subscriber.close(); + clock.tick(6000); + await prom; + + assert.strictEqual(nackSpy.callCount, mockMessages.length); + mockMessages.forEach((msg, i) => { + assert.strictEqual(nackSpy.getCall(i).args[0], msg); + }); + }); + + it('should wait for drain promises and respect timeout', async () => { + const ackDrainDeferred = defer(); + const modAckDrainDeferred = defer(); + sandbox.stub(ackQueue, 'onDrain').returns(ackDrainDeferred.promise); + sandbox + .stub(modAckQueue, 'onDrain') + .returns(modAckDrainDeferred.promise); + ackQueue.numInFlightRequests = 1; // Ensure drain is waited for + modAckQueue.numInFlightRequests = 1; + + let closed = false; + subscriber.setOptions({ + closeOptions: { + behavior: s.SubscriberCloseBehaviors.NackImmediately, + timeout: Duration.from({milliseconds: 100}), + }, + }); + const prom = subscriber.close().then(() => { + closed = true; + }); + + // Advance time past the timeout + clock.tick(200); + await prom; + + // Promise should resolve even though drains haven't + assert.strictEqual(closed, true); + + // Resolve drains afterwards to prevent hanging tests if logic fails + ackDrainDeferred.resolve(); + modAckDrainDeferred.resolve(); + }); + + it('should resolve early if drain completes before timeout', async () => { + const ackDrainDeferred = defer(); + const modAckDrainDeferred = defer(); + sandbox.stub(ackQueue, 'onDrain').returns(ackDrainDeferred.promise); + sandbox + .stub(modAckQueue, 'onDrain') + .returns(modAckDrainDeferred.promise); + ackQueue.numInFlightRequests = 1; // Ensure drain is waited for + modAckQueue.numInFlightRequests = 1; + + let closed = false; + subscriber.setOptions({ + closeOptions: { + timeout: Duration.from({milliseconds: 100}), + }, + }); + const prom = subscriber.close().then(() => { + closed = true; + }); + + // Resolve drains quickly + ackDrainDeferred.resolve(); + modAckDrainDeferred.resolve(); + + // Advance time slightly, but less than the timeout + clock.tick(50); + await prom; + + // Promise should resolve. + assert.strictEqual(closed, true); + }); + }); }); describe('getClient', () => { @@ -743,7 +930,7 @@ describe('Subscriber', () => { }); it('should log on ack completion', async () => { - const fakeLog = new FakeLog(s.logs.ackNack); + fakeLog = new FakeLog(s.logs.ackNack); await subscriber.nack(message); @@ -851,6 +1038,10 @@ describe('Subscriber', () => { delete msgAny.parentSpan; delete msgAny.subSpans; + // Delete baggage for discovering unprocessed messages. + delete addMsgAny._handledPromise; + delete msgAny._handledPromise; + assert.deepStrictEqual(addMsg, message); // test for receipt diff --git a/test/temporal.ts b/test/temporal.ts index 030af2122..2e906ce3b 100644 --- a/test/temporal.ts +++ b/test/temporal.ts @@ -13,26 +13,82 @@ // limitations under the License. import {describe, it} from 'mocha'; -import {Duration} from '../src/temporal'; +import { + Duration, + atLeast as durationAtLeast, + atMost as durationAtMost, +} from '../src/temporal'; import * as assert from 'assert'; describe('temporal', () => { describe('Duration', () => { it('can be created from millis', () => { - const duration = Duration.from({millis: 1234}); - assert.strictEqual(duration.totalOf('second'), 1.234); + const duration = Duration.from({milliseconds: 1234}); + assert.strictEqual(duration.seconds, 1.234); }); + it('can be created from seconds', () => { const duration = Duration.from({seconds: 1.234}); - assert.strictEqual(duration.totalOf('millisecond'), 1234); + assert.strictEqual(duration.milliseconds, 1234); }); + it('can be created from minutes', () => { const duration = Duration.from({minutes: 30}); - assert.strictEqual(duration.totalOf('hour'), 0.5); + assert.strictEqual(duration.total('hour'), 0.5); }); + it('can be created from hours', () => { const duration = Duration.from({hours: 1.5}); - assert.strictEqual(duration.totalOf('minute'), 90); + assert.strictEqual(duration.total('minute'), 90); + }); + + it('can be created from a Duration', () => { + const duration = Duration.from({seconds: 5}); + const second = Duration.from(duration); + assert.strictEqual(duration.milliseconds, second.milliseconds); + }); + + it('adds durations', () => { + const duration = Duration.from({seconds: 10}); + const second = duration.add({milliseconds: 1000}); + assert.strictEqual(second.seconds, 11); + }); + + it('subtracts durations', () => { + const duration = Duration.from({seconds: 10}); + const second = duration.subtract({seconds: 5}); + assert.strictEqual(second.milliseconds, 5000); + }); + + it('compares durations', () => { + const duration = Duration.from({seconds: 10}); + const less = Duration.from({seconds: 5}); + const more = Duration.from({seconds: 15}); + + const minus = Duration.compare(duration, more); + assert.strictEqual(minus, -1); + + const plus = Duration.compare(duration, less); + assert.strictEqual(plus, 1); + + const equal = Duration.compare(duration, duration); + assert.strictEqual(equal, 0); + }); + + it('has working helper functions', () => { + const duration = Duration.from({seconds: 10}); + + const atLeast1 = durationAtLeast(duration, Duration.from({seconds: 5})); + assert.strictEqual(atLeast1.seconds, 10); + + const atLeast2 = durationAtLeast(duration, Duration.from({seconds: 15})); + assert.strictEqual(atLeast2.seconds, 15); + + const atMost1 = durationAtMost(duration, Duration.from({seconds: 5})); + assert.strictEqual(atMost1.seconds, 5); + + const atMost2 = durationAtMost(duration, Duration.from({seconds: 15})); + assert.strictEqual(atMost2.seconds, 10); }); }); }); diff --git a/test/test-utils.ts b/test/test-utils.ts index a2f20946a..f171d4685 100644 --- a/test/test-utils.ts +++ b/test/test-utils.ts @@ -63,12 +63,23 @@ export class FakeLog { fields?: loggingUtils.LogFields; args?: unknown[]; called = false; + log: loggingUtils.AdhocDebugLogFunction; + listener: (lf: loggingUtils.LogFields, a: unknown[]) => void; constructor(log: loggingUtils.AdhocDebugLogFunction) { - log.on('log', (lf, a) => { + this.log = log; + this.listener = (lf: loggingUtils.LogFields, a: unknown[]) => { this.fields = lf; this.args = a; this.called = true; - }); + }; + this.log.on('log', this.listener); + } + + remove() { + // This really ought to be properly exposed, but since it's not, we'll + // do this for now to keep the tests from being leaky. + const instance = (this.log as loggingUtils.AdhocDebugLogFunction).instance; + instance.off('log', this.listener); } } diff --git a/test/util.ts b/test/util.ts index 6dbdd746c..ba8d0e63d 100644 --- a/test/util.ts +++ b/test/util.ts @@ -13,8 +13,10 @@ // limitations under the License. import {describe, it} from 'mocha'; -import {addToBucket, Throttler} from '../src/util'; +import {addToBucket, Throttler, awaitWithTimeout} from '../src/util'; import * as assert from 'assert'; +import * as sinon from 'sinon'; +import {Duration} from '../src'; describe('utils', () => { describe('Throttler', () => { @@ -58,4 +60,85 @@ describe('utils', () => { assert.deepStrictEqual(map.get('a'), ['c', 'b']); }); }); + + describe('awaitWithTimeout', () => { + let sandbox: sinon.SinonSandbox; + beforeEach(() => { + sandbox = sinon.createSandbox(); + }); + afterEach(() => { + sandbox.restore(); + }); + + it('handles non-timeout properly', async () => { + const fakeTimers = sandbox.useFakeTimers(0); + let resolve = () => {}; + const testString = 'fooby'; + const testPromise = new Promise(r => { + resolve = () => r(testString); + }); + fakeTimers.setTimeout(resolve, 500); + const awaitPromise = awaitWithTimeout( + testPromise, + Duration.from({seconds: 1}), + ); + fakeTimers.tick(500); + + const result = await awaitPromise; + assert.strictEqual(result.returnedValue, testString); + assert.strictEqual(result.exception, undefined); + assert.strictEqual( + result.timedOut, + false, + 'timeout was triggered, improperly', + ); + }); + + it('handles non-timeout errors properly', async () => { + const fakeTimers = sandbox.useFakeTimers(0); + let reject = () => {}; + const testString = 'fooby'; + const testPromise = new Promise((res, rej) => { + reject = () => rej(testString); + }); + fakeTimers.setTimeout(reject, 500); + const awaitPromise = awaitWithTimeout( + testPromise, + Duration.from({seconds: 1}), + ); + fakeTimers.tick(500); + + const result = await awaitPromise; + assert.strictEqual( + result.exception, + testString, + 'non-error was triggered, improperly', + ); + assert.strictEqual(result.timedOut, false); + }); + + it('handles timeout properly', async () => { + const fakeTimers = sandbox.useFakeTimers(0); + let resolve = () => {}; + const testString = 'fooby'; + const testPromise = new Promise(r => { + resolve = () => r(testString); + }); + fakeTimers.setTimeout(resolve, 1500); + const awaitPromise = awaitWithTimeout( + testPromise, + Duration.from({seconds: 1}), + ); + fakeTimers.tick(1500); + + const result = await awaitPromise; + assert.strictEqual( + result.timedOut, + true, + 'timeout was not triggered, improperly', + ); + + assert.strictEqual(result.timedOut, true); + }); + }); });