Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"build": "yarn lint && yarn compile",
"compile": "tsc",
"test": "yarn lint && yarn mocha",
"ci": "yarn build && yarn cover",
"cover": "yarn clean && yarn nyc yarn mocha",
"lint": "eslint \"src/**/*.ts\"",
"benchmark": "tsc --extendedDiagnostics --incremental false",
Expand Down
14 changes: 7 additions & 7 deletions src/Message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,18 @@ export class Message extends (EventEmitter as new() => MessageEmitter) {
}

public raw: SQSMessage;
public body?: string | any;
public body?: any;
public subject?: string;
public topicArn?: string;
public topicName?: string;
public attributes: IMessageAttributes;
public sqsAttributes: { [k: string]: string };
private _squiss: Squiss;
private readonly _squiss: Squiss;
private _handled: boolean;
private _opts: IMessageOpts;
private readonly _opts: IMessageOpts;
private _deleteCallback?: () => Promise<void>;
private _s3Retriever: () => S3;
private _s3Retain: boolean;
private readonly _s3Retriever: () => S3;
private readonly _s3Retain: boolean;

constructor(opts: IMessageOpts) {
super();
Expand All @@ -77,7 +77,7 @@ export class Message extends (EventEmitter as new() => MessageEmitter) {
this.subject = unwrapped.Subject;
this.topicArn = unwrapped.TopicArn;
if (this.topicArn) {
this.topicName = unwrapped.TopicArn.substr(unwrapped.TopicArn.lastIndexOf(':') + 1);
this.topicName = unwrapped.TopicArn.substring(unwrapped.TopicArn.lastIndexOf(':') + 1);
}
}
this._squiss = opts.squiss;
Expand All @@ -88,7 +88,7 @@ export class Message extends (EventEmitter as new() => MessageEmitter) {
this._s3Retain = opts.s3Retain;
}

public parse(): Promise<string | any> {
public parse(): Promise<any> {
if (this.body === undefined || this.body === null) {
return Promise.resolve();
}
Expand Down
23 changes: 13 additions & 10 deletions src/Squiss.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {

constructor(opts?: ISquissOptions ) {
super();
this._opts = Object.assign({}, optDefaults, opts || {});
this._opts = {
...optDefaults,
...(opts ?? {}),
};
this._initOpts();
this._queueUrl = this._opts.queueUrl || '';
this._queueUrl = this._opts.queueUrl ?? '';
this.sqs = this._initSqs();
}

Expand Down Expand Up @@ -197,7 +200,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
QueueUrl: queueUrl,
});
}).then((res) => {
if (!res.Attributes || !res.Attributes.VisibilityTimeout) {
if (!res.Attributes?.VisibilityTimeout) {
throw new Error('SQS.GetQueueAttributes call did not return expected shape. Response: ' +
JSON.stringify(res));
}
Expand All @@ -216,7 +219,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
QueueUrl: queueUrl,
});
}).then((res) => {
if (!res.Attributes || !res.Attributes.MaximumMessageSize) {
if (!res.Attributes?.MaximumMessageSize) {
throw new Error('SQS.GetQueueAttributes call did not return expected shape. Response: ' +
JSON.stringify(res));
}
Expand Down Expand Up @@ -546,7 +549,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
private _prepareMessageParams(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes) {
const messageStr = isString(message) ? message : JSON.stringify(message);
const params: ISendMessageRequest = {MessageBody: messageStr, DelaySeconds: delay};
attributes = Object.assign({}, attributes);
attributes = {...attributes};
params.MessageGroupId = attributes.FIFO_MessageGroupId;
delete attributes.FIFO_MessageGroupId;
params.MessageDeduplicationId = attributes.FIFO_MessageDeduplicationId;
Expand Down Expand Up @@ -580,7 +583,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
.then((uploadData) => {
this.emit('s3Upload', uploadData);
params.MessageBody = JSON.stringify(uploadData);
params.MessageAttributes = params.MessageAttributes || {};
params.MessageAttributes = params.MessageAttributes ?? {};
params.MessageAttributes[S3_MARKER] = {
StringValue: `${uploadData.uploadSize}`,
DataType: 'Number',
Expand All @@ -592,10 +595,10 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {

private _prepareMessageRequest(message: IMessageToSend, delay?: number, attributes?: IMessageAttributes)
: Promise<ISendMessageRequest> {
if (attributes && attributes[GZIP_MARKER]) {
if (attributes?.[GZIP_MARKER]) {
return Promise.reject(new Error(`Using of internal attribute ${GZIP_MARKER} is not allowed`));
}
if (attributes && attributes[S3_MARKER]) {
if (attributes?.[S3_MARKER]) {
return Promise.reject(new Error(`Using of internal attribute ${S3_MARKER} is not allowed`));
}
return this._prepareMessageParams(message, delay, attributes)
Expand Down Expand Up @@ -678,7 +681,7 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {
return (data: ReceiveMessageCommandOutput) => {
let gotMessages = true;
this._activeReq = undefined;
if (data && data.Messages) {
if (data?.Messages) {
this.emit('gotMessages', data.Messages.length);
this._emitMessages(data.Messages);
} else {
Expand All @@ -703,6 +706,6 @@ export class Squiss extends (EventEmitter as new() => SquissEmitter) {

private _getMaxMessagesToGet() {
return !this._opts.maxInFlight ? this._opts.receiveBatchSize! :
Math.min(this._opts.maxInFlight! - this._inFlight, this._opts.receiveBatchSize!);
Math.min(this._opts.maxInFlight - this._inFlight, this._opts.receiveBatchSize!);
}
}
7 changes: 5 additions & 2 deletions src/TimeoutExtender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ export class TimeoutExtender {
public readonly _index: MessageIndex;
public _linkedList: LinkedList<Node>;
public _opts: ITimeoutExtenderOptions;
private _squiss: Squiss;
private readonly _squiss: Squiss;
private _timer: any;
private readonly _visTimeout: number;
private readonly _stopAfter: number;
private readonly _apiLeadMs: number;

constructor(squiss: Squiss, opts?: ITimeoutExtenderOptions) {
this._opts = Object.assign({}, optDefaults, opts || {});
this._opts = {
...optDefaults,
...(opts || {}),
};
this._index = {};
this._timer = undefined;
this._squiss = squiss;
Expand Down
6 changes: 3 additions & 3 deletions src/test/src/Message.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {SquissStub} from '../stubs/SquissStub';
import {Blobs, S3Stub} from '../stubs/S3Stub';
import delay from 'delay';

const wait = (ms?: number) => delay(ms === undefined ? 20 : ms);
const wait = (ms?: number) => delay(ms ?? 20 );

const getSquissStub = () => {
return new SquissStub() as any as Squiss;
Expand Down Expand Up @@ -201,7 +201,7 @@ describe('Message', () => {
});
});
it('not parse empty gzipped body', () => {
const rawMsg = getSQSMsg(undefined);
const rawMsg = getSQSMsg();
rawMsg.MessageAttributes!.__SQS_GZIP__ = {
DataType: 'Number',
StringValue: '1',
Expand Down Expand Up @@ -234,7 +234,7 @@ describe('Message', () => {
it('not parse empty body', () => {
const msg = new Message({
squiss: getSquissStub(),
msg: getSQSMsg(undefined),
msg: getSQSMsg(),
bodyFormat: 'json',
s3Retriever: getS3Stub(),
s3Retain: false,
Expand Down
2 changes: 1 addition & 1 deletion src/test/src/TimeoutExtender.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const bazMsg = new Message({

describe('TimeoutExtender', () => {
afterEach(() => {
if (clock && clock.restore) {
if (clock?.restore) {
clock.restore();
}
inst = null;
Expand Down
23 changes: 11 additions & 12 deletions src/test/src/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ const stubs = {

const {Squiss: SquissPatched, Message: MessagePatched} = proxyquire('../../', stubs);

import {ISquissOptions, Squiss} from '../../';
import {ISquissOptions, Squiss, Message} from '../../';
import {SQSStub} from '../stubs/SQSStub';
import delay from 'delay';
import {Message } from '../../';
import {IMessageOpts } from '../../Message';
// @ts-ignore
import * as sinon from 'sinon';
Expand All @@ -28,7 +27,7 @@ import {HttpHandlerOptions} from '@aws-sdk/types';

const should = chai.should();
let inst: Squiss | null = null;
const wait = (ms?: number) => delay(ms === undefined ? 20 : ms);
const wait = (ms?: number) => delay(ms ?? 20);

const getS3Stub = (blobs?: Blobs) => {
return new S3Stub(blobs) as any as S3;
Expand All @@ -45,7 +44,7 @@ const generateLargeMessage = (length: number) => {
describe('index', () => {
afterEach(() => {
if (inst) {
inst!.stop();
inst.stop();
}
inst = null;
});
Expand Down Expand Up @@ -510,7 +509,7 @@ describe('index', () => {
.then(() => wait())
.then(async () => {
inst!.inFlight.should.equal(5);
await inst!.handledMessage(new EventEmitter() as any);
inst!.handledMessage(new EventEmitter() as any);
await wait(1);
}).then(() => {
inst!.inFlight.should.equal(4);
Expand Down Expand Up @@ -699,7 +698,7 @@ describe('index', () => {
let message: Message;
inst!.on('message', (msg: Message) => {
message = msg;
msg!.on('deleted', msgSpyMessage);
msg.on('deleted', msgSpyMessage);
msg.del();
});
inst!.on('deleted', msgSpySquiss);
Expand Down Expand Up @@ -848,7 +847,7 @@ describe('index', () => {
inst!.sqs = new SQSStub(1) as any as SQS;
const spy = sinon.spy(inst!.sqs, 'createQueue');
return inst!.createQueue().then((queueUrl: string) => {
queueUrl!.should.be.a('string');
queueUrl.should.be.a('string');
spy.should.be.calledOnce();
spy.should.be.calledWith({
QueueName: 'foo',
Expand All @@ -866,7 +865,7 @@ describe('index', () => {
inst!.sqs = new SQSStub(1) as any as SQS;
const spy = sinon.spy(inst!.sqs, 'createQueue');
return inst!.createQueue().then((queueUrl: string) => {
queueUrl!.should.be.a('string');
queueUrl.should.be.a('string');
spy.should.be.calledOnce();
spy.should.be.calledWith({
QueueName: 'foo',
Expand All @@ -893,7 +892,7 @@ describe('index', () => {
inst!.sqs = new SQSStub(1) as any as SQS;
const spy = sinon.spy(inst!.sqs, 'createQueue');
return inst!.createQueue().then((queueUrl: string) => {
queueUrl!.should.be.a('string');
queueUrl.should.be.a('string');
spy.should.be.calledOnce();
spy.should.be.calledWith({
QueueName: 'foo',
Expand Down Expand Up @@ -1205,7 +1204,7 @@ describe('index', () => {
empty: undefined,
}).then(() => {
squissS3UploadEventEmitted.should.eql(true);
blobs.my_bucket!.my_uuid.should.be.eq(largeMessage);
blobs.my_bucket.my_uuid.should.be.eq(largeMessage);
spy.should.be.calledWith({
QueueUrl: 'foo',
MessageBody: '{"uploadSize":300,"bucket":"my_bucket","key":"my_uuid"}',
Expand Down Expand Up @@ -1300,7 +1299,7 @@ describe('index', () => {
const spy = sinon.spy(inst!.sqs, 'sendMessage');
const largeMessage = generateLargeMessage(300);
return inst!.sendMessage(largeMessage, 10).then(() => {
blobs.my_bucket!.my_uuid.should.be.eq(largeMessage);
blobs.my_bucket.my_uuid.should.be.eq(largeMessage);
spy.should.be.calledWith({
QueueUrl: 'foo',
MessageBody: '{"uploadSize":300,"bucket":"my_bucket","key":"my_uuid"}',
Expand All @@ -1319,7 +1318,7 @@ describe('index', () => {
const spy = sinon.spy(inst!.sqs, 'sendMessage');
const largeMessage = generateLargeMessage(300);
return inst!.sendMessage(largeMessage, 10).then(() => {
blobs.my_bucket!.my_uuid.should.be.eq(largeMessage);
blobs.my_bucket.my_uuid.should.be.eq(largeMessage);
spy.should.be.calledWith({
QueueUrl: 'foo',
MessageBody: '{"uploadSize":300,"bucket":"my_bucket","key":"my_uuid"}',
Expand Down
10 changes: 6 additions & 4 deletions src/test/stubs/S3Stub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class S3Stub extends EventEmitter {
}

public getObject({Key, Bucket}: GetObjectCommandInput): Promise<GetObjectOutput> {
if (Bucket && Key && this.blobs[Bucket] && this.blobs[Bucket][Key]) {
if (Bucket && Key && this.blobs[Bucket]?.[Key]) {
const body = new Readable();
body.push(this.blobs[Bucket][Key]);
body.push(null);
Expand All @@ -37,13 +37,15 @@ export class S3Stub extends EventEmitter {
if (Bucket && !this.blobs[Bucket]) {
this.blobs[Bucket] = {};
}
// @ts-ignore
this.blobs[Bucket][Key] = Body;
if (Bucket && Key) {
// @ts-ignore
this.blobs[Bucket][Key] = Body;
}
return Promise.resolve({});
}

public deleteObject({Key, Bucket}: DeleteObjectRequest): Promise<DeleteObjectOutput> {
if (Bucket && Key && this.blobs[Bucket] && this.blobs[Bucket][Key]) {
if (Bucket && Key && this.blobs[Bucket]?.[Key]) {
delete this.blobs[Bucket][Key];
return Promise.resolve({});
} else {
Expand Down
15 changes: 7 additions & 8 deletions src/test/stubs/SQSStub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ export class SQSStub extends EventEmitter {
constructor(msgCount?: number, timeout?: number) {
super();
this.msgs = [];
this.timeout = timeout === undefined ? 20 : timeout;
this.msgCount = msgCount || 0;
this.timeout = timeout ?? 20;
this.msgCount = msgCount ?? 0;
this.config = {
region: 'us-east-1',
endpoint: 'http://foo.bar',
Expand Down Expand Up @@ -99,11 +99,10 @@ export class SQSStub extends EventEmitter {
}, this.timeout);
const onAbort = () => {
removeListeners();
const err: any = new Error('Request aborted by user') as any;
err.name = 'AbortError';
err.retryable = false;
err.time = new Date();
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
const err = new Error('Request aborted by user');
(err as any).name = 'AbortError';
(err as any).retryable = false;
(err as any).time = new Date();
reject(err);
};
const onNewMessage = () => {
Expand Down Expand Up @@ -168,7 +167,7 @@ export class SQSStub extends EventEmitter {
this.msgs.push({
MessageId: `id_${id}`,
ReceiptHandle: `${id}`,
Body: body || `{"num": ${id}}`,
Body: body ?? `{"num": ${id}}`,
});
this.emit('newMessage');
}
Expand Down
Loading