Skip to content

Commit d229eb5

Browse files
committed
feat: fix a bunch of linter errors, and import new gapic classes
1 parent a285b67 commit d229eb5

16 files changed

+1649
-638
lines changed

src/message-queues.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ export abstract class MessageQueue {
195195
* @param {number} [deadline] The deadline in seconds.
196196
* @private
197197
*/
198-
add(message: Message, deadline?: number): Promise<void> {
198+
async add(message: Message, deadline?: number): Promise<void> {
199199
if (this._closed) {
200200
if (this._subscriber.isExactlyOnceDelivery) {
201201
throw new AckError(AckResponses.Invalid, 'Subscriber closed');
@@ -213,7 +213,7 @@ export abstract class MessageQueue {
213213
this._requests.length + 1 >= maxMessages! ||
214214
this.bytes + size >= MAX_BATCH_BYTES
215215
) {
216-
this.flush();
216+
await this.flush();
217217
}
218218

219219
// Add the message to the current batch.

src/message-stream.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -346,7 +346,7 @@ export class MessageStream extends PassThrough {
346346
*/
347347
private async _getClient(): Promise<ClientStub> {
348348
const client = await this._subscriber.getClient();
349-
client.initialize();
349+
await client.initialize();
350350
return client.subscriberStub as Promise<ClientStub>;
351351
}
352352

@@ -397,9 +397,9 @@ export class MessageStream extends PassThrough {
397397
if (PullRetry.resetFailures(status)) {
398398
this._retrier.reset(this._streams[index]);
399399
}
400-
this._retrier.retryLater(this._streams[index], () => {
401-
this._fillOne(index);
402-
});
400+
this._retrier.retryLater(this._streams[index], () =>
401+
this._fillOne(index),
402+
);
403403
} else if (this._activeStreams() === 0) {
404404
this.emit(
405405
'debug',

src/publisher/flow-publisher.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ export class FlowControlledPublisher {
9090
const waitPromise = this.flowControl.willSend(size, 1);
9191
return waitPromise.then(doPublish);
9292
} else {
93-
this.flowControl.willSend(size, 1).then(() => {});
93+
void this.flowControl.willSend(size, 1).then(() => {});
9494
doPublish();
9595
return null;
9696
}

src/subscriber.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -406,7 +406,7 @@ export class Message implements tracing.MessageWithAttributes {
406406
this._handled = true;
407407
this.subSpans.ackCall();
408408
this.subSpans.processingEnd();
409-
this._subscriber.ack(this);
409+
void this._subscriber.ack(this);
410410
}
411411
}
412412

@@ -456,7 +456,7 @@ export class Message implements tracing.MessageWithAttributes {
456456
modAck(deadline: number): void {
457457
if (!this._handled) {
458458
this.subSpans.modAckCall(Duration.from({seconds: deadline}));
459-
this._subscriber.modAck(this, deadline);
459+
void this._subscriber.modAck(this, deadline);
460460
}
461461
}
462462

@@ -506,7 +506,7 @@ export class Message implements tracing.MessageWithAttributes {
506506
this._handled = true;
507507
this.subSpans.nackCall();
508508
this.subSpans.processingEnd();
509-
this._subscriber.nack(this);
509+
void this._subscriber.nack(this);
510510
}
511511
}
512512

@@ -951,7 +951,7 @@ export class Subscriber extends EventEmitter {
951951

952952
this._stream.start().catch(err => {
953953
this.emit('error', err);
954-
this.close();
954+
void this.close();
955955
});
956956

957957
this.isOpen = true;
@@ -1106,12 +1106,12 @@ export class Subscriber extends EventEmitter {
11061106

11071107
if (this._acks.numPendingRequests) {
11081108
promises.push(this._acks.onFlush());
1109-
this._acks.flush();
1109+
await this._acks.flush();
11101110
}
11111111

11121112
if (this._modAcks.numPendingRequests) {
11131113
promises.push(this._modAcks.onFlush());
1114-
this._modAcks.flush();
1114+
await this._modAcks.flush();
11151115
}
11161116

11171117
if (this._acks.numInFlightRequests) {

src/subscription.ts

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ import {promisifySome} from './util';
4747
import {StatusError} from './message-stream';
4848
import {DebugMessage} from './debug';
4949
import {EventEmitter} from 'stream';
50+
import {promisify} from 'util';
5051

5152
export {AckError, AckResponse, AckResponses} from './subscriber';
5253

@@ -588,23 +589,34 @@ export class Subscription extends EventEmitter {
588589
const gaxOpts = typeof optsOrCallback === 'object' ? optsOrCallback : {};
589590
callback = typeof optsOrCallback === 'function' ? optsOrCallback : callback;
590591

592+
const prom = this._deleteAsync(gaxOpts);
593+
if (callback) {
594+
prom.then(r => callback(null, r)).catch(e => callback(e));
595+
} else {
596+
return prom;
597+
}
598+
}
599+
600+
/*!
601+
* Async innards of delete() so we can wait for subscription.close() if needed.
602+
*/
603+
async _deleteAsync(gaxOpts: CallOptions): Promise<EmptyResponse> {
591604
const reqOpts = {
592605
subscription: this.name,
593606
};
594607

595608
if (this.isOpen) {
596-
this._subscriber.close();
609+
await this._subscriber.close();
597610
}
598611

599-
this.request<google.protobuf.Empty>(
600-
{
601-
client: 'SubscriberClient',
602-
method: 'deleteSubscription',
603-
reqOpts,
604-
gaxOpts,
605-
},
606-
callback!,
607-
);
612+
await promisify(this.request<google.protobuf.Empty>)({
613+
client: 'SubscriberClient',
614+
method: 'deleteSubscription',
615+
reqOpts,
616+
gaxOpts,
617+
});
618+
619+
return [{}];
608620
}
609621

610622
/**
@@ -1164,7 +1176,7 @@ export class Subscription extends EventEmitter {
11641176

11651177
this.on('removeListener', () => {
11661178
if (this.isOpen && this.listenerCount('message') === 0) {
1167-
this._subscriber.close();
1179+
void this._subscriber.close();
11681180
}
11691181
});
11701182
}

0 commit comments

Comments
 (0)