Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions packages/core/lib/queue/core/payloadBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ type Complete<T> = {
: T[P] | undefined;
};

const calculateDelay = (delay: number | string | Date): number => {
const calculateDelay = (delay: number | string | Date): [number, number] => {
const now = Date.now();
if (delay instanceof Date) {
const time = delay.getTime();
return now > time ? now : time;
const netDelayInSeconds = (time - now) / 1000;
return now > time ? [now, 0] : [time, netDelayInSeconds];
}

const delayInMs = typeof delay === 'string' ? ms(delay) : delay * 1000;
const calculatedDelay = now + delayInMs;
if (calculatedDelay < now) return now;
return calculatedDelay;
if (calculatedDelay < now) return [now, 0];
return [calculatedDelay, (calculatedDelay - now) / 1000];
};

export class PayloadBuilder {
Expand All @@ -37,7 +38,9 @@ export class PayloadBuilder {
...message,
} as Complete<InternalMessage>;

payload.delay = calculateDelay(payload.delay || 0);
const [delay, netDelayInSeconds] = calculateDelay(payload.delay || 0);
payload.delay = delay;
payload.netDelayInSeconds = netDelayInSeconds;
payload.connection = payload.connection || defaultOptions.connection;

if (!payload.queue) {
Expand Down
3 changes: 3 additions & 0 deletions packages/core/lib/queue/drivers/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,13 @@ export class RedisQueueDriver implements PollQueueDriver {
}

async purge(options: Record<string, any>): Promise<void> {
await this.initializeModules();
await this.client.del(this.getQueue(options.queue));
await this.client.del(this.getDelayedQueue(options.queue));
}

async count(options: Record<string, any>): Promise<number> {
await this.initializeModules();
return await this.client.llen(this.getQueue(options.queue));
}

Expand All @@ -97,6 +99,7 @@ export class RedisQueueDriver implements PollQueueDriver {
}

async scheduledTask(options: Record<string, any>): Promise<void> {
await this.initializeModules();
await (this.client as any).findDelayedJob(
this.getDelayedQueue(options.queue),
this.getQueue(options.queue),
Expand Down
31 changes: 17 additions & 14 deletions packages/core/lib/queue/drivers/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,15 @@ export class SqsQueueDriver implements PollQueueDriver {

constructor(private options: Record<string, any>) {
validateOptions(options, SqsQueueOptionsDto, { cls: SqsQueueDriver.name });
this.initializeModules().then(() => {
this.client = new this.AWS.SQS({
region: options.region,
apiVersion: options.apiVersion,
credentials: options.credentials || {
accessKeyId: options.accessKey,
secretAccessKey: options.secretKey,
},
});
});
this.initializeModules();
}

init(): Promise<void> {
throw new Error('Method not implemented.');
}
async init(): Promise<void> {}

async push(message: string, rawPayload: InternalMessage): Promise<void> {
await this.initializeModules();
const params = {
DelaySeconds: rawPayload.delay,
DelaySeconds: rawPayload.netDelayInSeconds,
MessageBody: message,
QueueUrl: joinUrl(this.options.prefix, rawPayload.queue),
};
Expand All @@ -39,6 +29,7 @@ export class SqsQueueDriver implements PollQueueDriver {
}

async pull(options: Record<string, any>): Promise<SqsJob[] | null> {
await this.initializeModules();
const params = {
MaxNumberOfMessages: 10,
MessageAttributeNames: ['All'],
Expand All @@ -53,6 +44,7 @@ export class SqsQueueDriver implements PollQueueDriver {
}

async remove(job: SqsJob, options: Record<string, any>): Promise<void> {
await this.initializeModules();
const params = {
QueueUrl: joinUrl(this.options.prefix, options.queue),
ReceiptHandle: job.data.ReceiptHandle,
Expand All @@ -64,6 +56,7 @@ export class SqsQueueDriver implements PollQueueDriver {
}

async purge(options: Record<string, any>): Promise<void> {
await this.initializeModules();
const params = {
QueueUrl: joinUrl(this.options.prefix, options.queue),
};
Expand All @@ -74,6 +67,7 @@ export class SqsQueueDriver implements PollQueueDriver {
}

async count(options: Record<string, any>): Promise<number> {
await this.initializeModules();
const params = {
QueueUrl: joinUrl(this.options.prefix, options.queue),
AttributeNames: ['ApproximateNumberOfMessages'],
Expand All @@ -84,6 +78,15 @@ export class SqsQueueDriver implements PollQueueDriver {
}

async initializeModules(): Promise<void> {
if (this.AWS && this.client) return Promise.resolve();
this.AWS = await Package.load('@aws-sdk/client-sqs');
this.client = new this.AWS.SQS({
region: this.options.region,
apiVersion: this.options.apiVersion,
credentials: this.options.credentials || {
accessKeyId: this.options.accessKey,
secretAccessKey: this.options.secretKey,
},
});
}
}
1 change: 1 addition & 0 deletions packages/core/lib/queue/strategy/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ export interface InternalMessage extends Message {
attemptCount: number;
id: string;
delay?: number;
netDelayInSeconds?: number;
}
2 changes: 1 addition & 1 deletion packages/core/lib/queue/workers/pollQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export class PollQueueWorker extends BaseQueueWorker {
this.emitEvent(new JobMaxRetriesExceeed(message, job));
return;
}
await Dispatch(message);
await Dispatch({ ...message, delay: message.netDelayInSeconds });
this.emitEvent(new JobFailed(message, job));
}

Expand Down
4 changes: 2 additions & 2 deletions packages/core/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@intentjs/core",
"version": "0.1.48",
"version": "0.1.53",
"description": "Core module for Intent",
"repository": {
"type": "git",
Expand Down Expand Up @@ -131,7 +131,7 @@
"typescript": "^5.5.2"
},
"dependencies": {
"@intentjs/hyper-express": "^0.0.7",
"@intentjs/hyper-express": "^0.0.11",
"@nestjs/common": "^11.0.12",
"@nestjs/core": "^11.0.12",
"@react-email/components": "^0.0.32",
Expand Down
6 changes: 5 additions & 1 deletion packages/hyper-express/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const LiveFile = require('./src/components/plugins/LiveFile.js');
const MultipartField = require('./src/components/plugins/MultipartField.js');
const SSEventStream = require('./src/components/plugins/SSEventStream.js');
const Websocket = require('./src/components/ws/Websocket.js');
const UploadedFile = require('./src/shared/uploaded-file.js');

// Disable the uWebsockets.js version header if not specified to be kept
if (!process.env['KEEP_UWS_HEADER']) {
Expand All @@ -28,6 +29,9 @@ module.exports = {
MultipartField,
SSEventStream,
Websocket,
UploadedFile,
compressors: uWebsockets,
express(...args) { return new Server(...args); },
express(...args) {
return new Server(...args);
},
};
2 changes: 1 addition & 1 deletion packages/hyper-express/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@intentjs/hyper-express",
"version": "0.0.7",
"version": "0.0.11",
"description": "A fork of hyper-express to suit IntentJS requirements. High performance Node.js webserver with a simple-to-use API powered by uWebsockets.js under the hood.",
"main": "index.js",
"types": "./types/index.d.ts",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class ExpressRequest {
return this.url;
}

set originalUrl(url) {
this.url = url;
}

get fresh() {
this._throw_unsupported('fresh');
}
Expand Down
4 changes: 4 additions & 0 deletions packages/hyper-express/src/components/http/Request.js
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,10 @@ class Request {
return this._url;
}

set url(url) {
this._url = url;
}

/**
* Returns path for incoming request.
* @returns {String}
Expand Down
4 changes: 2 additions & 2 deletions packages/hyper-express/src/shared/uploaded-file.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ class UploadedFile {
}

get extension() {
return this.filename;
return this._filename.split('.').pop();
}

async toBuffer() {
return fs.readFileSync(this.tempPath);
return fs.readFileSync(this._tempPath);
}
}

Expand Down
2 changes: 0 additions & 2 deletions packages/hyper-express/types/shared/uploaded-file.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
import { readFileSync } from 'fs-extra';

export class UploadedFile {
_filename: string;
_size: number;
Expand Down
Loading