Skip to content

Commit 87f018c

Browse files
committed
feat!: remove legacy ack deadline parameters, and move maxExtension into subscriber options
1 parent 12e94df commit 87f018c

File tree

6 files changed

+59
-83
lines changed

6 files changed

+59
-83
lines changed

src/default-options.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,16 @@ export const defaultOptions = {
3434
// The maximum length of time a message's lease will be extended by.
3535
maxAckDeadline: Duration.from({minutes: 10}),
3636

37-
// The maximum number of minutes that a message's lease will ever
37+
// The maximum amount of time that a message's lease will ever
3838
// be extended.
39-
maxExtensionMinutes: 60,
39+
maxExtensionTime: Duration.from({minutes: 60}),
4040

4141
// The maximum number of subscription streams/threads that will ever
4242
// be opened.
4343
maxStreams: 5,
4444

4545
// The starting number of seconds that ack deadlines will be extended.
46-
ackDeadline: 10,
46+
startingAckDeadline: Duration.from({seconds: 10}),
4747
},
4848

4949
publish: {

src/lease-manager.ts

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,6 @@ export interface FlowControlOptions {
2323
allowExcessMessages?: boolean;
2424
maxBytes?: number;
2525
maxMessages?: number;
26-
maxExtensionMinutes?: number;
27-
28-
/** @deprecated Use maxExtensionMinutes. */
29-
maxExtension?: number;
3026
}
3127

3228
/**
@@ -194,28 +190,9 @@ export class LeaseManager extends EventEmitter {
194190
* @private
195191
*/
196192
setOptions(options: FlowControlOptions): void {
197-
// Convert the old deprecated maxExtension to avoid breaking clients,
198-
// but allow only one.
199-
if (
200-
options.maxExtension !== undefined &&
201-
options.maxExtensionMinutes !== undefined
202-
) {
203-
throw new RangeError(
204-
'Only one of "maxExtension" or "maxExtensionMinutes" may be set for subscriber lease management options',
205-
);
206-
}
207-
if (
208-
options.maxExtension !== undefined &&
209-
options.maxExtensionMinutes === undefined
210-
) {
211-
options.maxExtensionMinutes = options.maxExtension / 60;
212-
delete options.maxExtension;
213-
}
214-
215193
const defaults: FlowControlOptions = {
216194
allowExcessMessages: true,
217195
maxBytes: defaultOptions.subscription.maxOutstandingBytes,
218-
maxExtensionMinutes: defaultOptions.subscription.maxExtensionMinutes,
219196
maxMessages: defaultOptions.subscription.maxOutstandingMessages,
220197
};
221198

@@ -260,19 +237,20 @@ export class LeaseManager extends EventEmitter {
260237
* @private
261238
*/
262239
private _extendDeadlines(): void {
263-
const deadline = this._subscriber.ackDeadline;
240+
const deadline = Duration.from({seconds: this._subscriber.ackDeadline});
241+
const maxExtensionMinutes =
242+
this._subscriber.maxExtensionTime.totalOf('minute');
264243

265244
for (const message of this._messages) {
266245
// Lifespan here is in minutes.
267246
const lifespan = (Date.now() - message.received) / (60 * 1000);
268247

269-
if (lifespan < this._options.maxExtensionMinutes!) {
270-
const deadlineDuration = Duration.from({seconds: deadline});
271-
message.subSpans.modAckStart(deadlineDuration, false);
248+
if (lifespan < maxExtensionMinutes) {
249+
message.subSpans.modAckStart(deadline, false);
272250

273251
if (this._subscriber.isExactlyOnceDelivery) {
274252
message
275-
.modAckWithResponse(deadline)
253+
.modAckWithResponse(deadline.totalOf('second'))
276254
.catch(e => {
277255
// In the case of a permanent failure (temporary failures are retried),
278256
// we need to stop trying to lease-manage the message.
@@ -283,8 +261,8 @@ export class LeaseManager extends EventEmitter {
283261
message.subSpans.modAckEnd();
284262
});
285263
} else {
286-
message.modAck(deadline);
287-
message.subSpans.modAckStart(deadlineDuration, false);
264+
message.modAck(deadline.totalOf('second'));
265+
message.subSpans.modAckStart(deadline, false);
288266
}
289267
} else {
290268
this.remove(message);

src/subscriber.ts

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,8 @@ export class Message implements tracing.MessageWithAttributes {
560560
* ever have, while it's under library control.
561561
* @property {Duration} [maxAckDeadline] The maximum time that ackDeadline should
562562
* ever have, while it's under library control.
563+
* @property {Duration} [maxExtensionTime] The maximum time that ackDeadline should
564+
* ever have, while it's under library control.
563565
* @property {BatchOptions} [batching] Request batching options; this is for
564566
* batching acks and modacks being sent back to the server.
565567
* @property {FlowControlOptions} [flowControl] Flow control options.
@@ -569,11 +571,9 @@ export class Message implements tracing.MessageWithAttributes {
569571
* @property {MessageStreamOptions} [streamingOptions] Streaming options.
570572
*/
571573
export interface SubscriberOptions {
572-
/** @deprecated Use minAckDeadline and maxAckDeadline. */
573-
ackDeadline?: number;
574-
575574
minAckDeadline?: Duration;
576575
maxAckDeadline?: Duration;
576+
maxExtensionTime?: Duration;
577577
batching?: BatchOptions;
578578
flowControl?: FlowControlOptions;
579579
useLegacyFlowControl?: boolean;
@@ -597,6 +597,7 @@ export class Subscriber extends EventEmitter {
597597
maxBytes: number;
598598
useLegacyFlowControl: boolean;
599599
isOpen: boolean;
600+
maxExtensionTime: Duration;
600601
private _acks!: AckQueue;
601602
private _histogram: Histogram;
602603
private _inventory!: LeaseManager;
@@ -612,9 +613,11 @@ export class Subscriber extends EventEmitter {
612613
constructor(subscription: Subscription, options = {}) {
613614
super();
614615

615-
this.ackDeadline = defaultOptions.subscription.ackDeadline;
616+
this.ackDeadline =
617+
defaultOptions.subscription.startingAckDeadline.totalOf('second');
616618
this.maxMessages = defaultOptions.subscription.maxOutstandingMessages;
617619
this.maxBytes = defaultOptions.subscription.maxOutstandingBytes;
620+
this.maxExtensionTime = defaultOptions.subscription.maxExtensionTime;
618621
this.useLegacyFlowControl = false;
619622
this.isOpen = false;
620623
this._histogram = new Histogram({min: 10, max: 600});
@@ -960,15 +963,6 @@ export class Subscriber extends EventEmitter {
960963
setOptions(options: SubscriberOptions): void {
961964
this._options = options;
962965

963-
// The user-set ackDeadline value basically pegs the extension time.
964-
// We'll emulate it by overwriting min/max.
965-
const passedAckDeadline = options.ackDeadline;
966-
if (passedAckDeadline !== undefined) {
967-
this.ackDeadline = passedAckDeadline;
968-
options.minAckDeadline = Duration.from({seconds: passedAckDeadline});
969-
options.maxAckDeadline = Duration.from({seconds: passedAckDeadline});
970-
}
971-
972966
this.useLegacyFlowControl = options.useLegacyFlowControl || false;
973967
if (options.flowControl) {
974968
this.maxMessages =
@@ -998,6 +992,18 @@ export class Subscriber extends EventEmitter {
998992
if (this._inventory) {
999993
this._inventory.setOptions(this._options.flowControl!);
1000994
}
995+
996+
this.updateAckDeadline();
997+
}
998+
999+
/**
1000+
* Retrieves our effective options. This is mostly for unit test use.
1001+
*
1002+
* @private
1003+
* @returns {SubscriberOptions} The options.
1004+
*/
1005+
getOptions(): SubscriberOptions {
1006+
return this._options;
10011007
}
10021008

10031009
/**

system-test/pubsub.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
SchemaTypes,
3232
SchemaViews,
3333
ISchema,
34+
Duration,
3435
} from '../src';
3536
import {Policy, IamPermissionsMap} from '../src/iam';
3637
import {MessageOptions} from '../src/topic';
@@ -428,10 +429,12 @@ describe('pubsub', () => {
428429
const SUB_NAMES = [generateSubName(), generateSubName()];
429430
const SUB_DETACH_NAME = generateSubForDetach();
430431

432+
const thirty = Duration.from({minutes: 30});
433+
const sixty = Duration.from({minutes: 60});
431434
const SUBSCRIPTIONS = [
432-
topic.subscription(SUB_NAMES[0], {ackDeadline: 30}),
433-
topic.subscription(SUB_NAMES[1], {ackDeadline: 60}),
434-
topic.subscription(SUB_DETACH_NAME, {ackDeadline: 30}),
435+
topic.subscription(SUB_NAMES[0], {minAckDeadline: thirty, maxAckDeadline: thirty}),
436+
topic.subscription(SUB_NAMES[1], {minAckDeadline: sixty, maxAckDeadline: sixty}),
437+
topic.subscription(SUB_DETACH_NAME, {minAckDeadline: thirty, maxAckDeadline: thirty}),
435438
];
436439

437440
before(async () => {

test/lease-manager.ts

Lines changed: 5 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
} from '../src/subscriber';
3232
import {defaultOptions} from '../src/default-options';
3333
import {TestUtils} from './test-utils';
34+
import {Duration} from '../src';
3435

3536
const FREE_MEM = 9376387072;
3637
const fakeos = {
@@ -41,6 +42,7 @@ class FakeSubscriber extends EventEmitter {
4142
ackDeadline = 10;
4243
isOpen = true;
4344
modAckLatency = 2000;
45+
maxExtensionTime = Duration.from({minutes: 60});
4446
async modAck(): Promise<void> {}
4547
async modAckWithResponse(): Promise<AckResponse> {
4648
return AckResponses.Success;
@@ -286,30 +288,12 @@ describe('LeaseManager', () => {
286288
});
287289
});
288290

289-
it('should properly convert any legacy maxExtension values', () => {
290-
const maxExtension = 60 * 1000;
291-
leaseManager.setOptions({maxExtension});
292-
// eslint-disable-next-line @typescript-eslint/no-explicit-any
293-
const options = (leaseManager as any)._options;
294-
assert.strictEqual(options.maxExtensionMinutes, maxExtension / 60);
295-
assert.strictEqual(options.maxExtension, undefined);
296-
});
297-
298-
it('should not allow both maxExtension and maxExtensionMinutes', () => {
299-
assert.throws(() => {
300-
leaseManager.setOptions({
301-
maxExtension: 10,
302-
maxExtensionMinutes: 10,
303-
});
304-
});
305-
});
306-
307291
it('should remove any messages that pass the maxExtensionMinutes value', () => {
308292
const maxExtensionSeconds = (expectedTimeout - 100) / 1000;
309293
const badMessages = [new FakeMessage(), new FakeMessage()];
310294

311-
leaseManager.setOptions({
312-
maxExtensionMinutes: maxExtensionSeconds / 60,
295+
subscriber.maxExtensionTime = Duration.from({
296+
seconds: maxExtensionSeconds,
313297
});
314298
badMessages.forEach(message =>
315299
leaseManager.add(message as {} as Message),
@@ -339,9 +323,7 @@ describe('LeaseManager', () => {
339323
it('should remove and ackFailed any messages that fail to ack', done => {
340324
(subscriber as unknown as FakeSubscriber).isExactlyOnceDelivery = true;
341325

342-
leaseManager.setOptions({
343-
maxExtensionMinutes: 600,
344-
});
326+
subscriber.maxExtensionTime = Duration.from({minutes: 600});
345327

346328
const goodMessage = new FakeMessage();
347329

test/subscriber.ts

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,14 @@ describe('Subscriber', () => {
259259
});
260260

261261
it('should set any options passed in', () => {
262-
const stub = sandbox.stub(Subscriber.prototype, 'setOptions');
263-
const fakeOptions = {};
264-
new Subscriber(subscription, fakeOptions);
265-
266-
const [options] = stub.lastCall.args;
267-
assert.strictEqual(options, fakeOptions);
262+
const options = {
263+
streamingOptions: {},
264+
};
265+
const subscriber = new Subscriber(subscription, options);
266+
assert.strictEqual(
267+
subscriber.getOptions().streamingOptions,
268+
options.streamingOptions,
269+
);
268270
});
269271
});
270272

@@ -464,7 +466,7 @@ describe('Subscriber', () => {
464466
});
465467

466468
it('should default to 60s min for exactly-once delivery subscriptions', async () => {
467-
subscriber.subscriptionProperties = {exactlyOnceDeliveryEnabled: true};
469+
subscriber.setSubscriptionProperties({exactlyOnceDeliveryEnabled: true});
468470

469471
const histogram: FakeHistogram = stubs.get('histogram');
470472
const now = Date.now();
@@ -501,8 +503,10 @@ describe('Subscriber', () => {
501503
sandbox.stub(histogram, 'add').throws();
502504
sandbox.stub(histogram, 'percentile').throws();
503505

506+
const deadlineTime = Duration.from({seconds: ackDeadline});
504507
subscriber.setOptions({
505-
ackDeadline,
508+
minAckDeadline: deadlineTime,
509+
maxAckDeadline: deadlineTime,
506510
flowControl: {maxMessages: maxMessages, maxBytes: maxBytes},
507511
});
508512
void subscriber.ack(message);
@@ -847,10 +851,13 @@ describe('Subscriber', () => {
847851
beforeEach(() => subscriber.close());
848852

849853
it('should capture the ackDeadline', () => {
850-
const ackDeadline = 1232;
854+
const ackDeadline = Duration.from({seconds: 1232});
851855

852-
subscriber.setOptions({ackDeadline});
853-
assert.strictEqual(subscriber.ackDeadline, ackDeadline);
856+
subscriber.setOptions({
857+
minAckDeadline: ackDeadline,
858+
maxAckDeadline: ackDeadline,
859+
});
860+
assert.strictEqual(subscriber.ackDeadline, ackDeadline.totalOf('second'));
854861
});
855862

856863
it('should not set maxStreams higher than maxMessages', () => {

0 commit comments

Comments
 (0)