diff --git a/packages/core/lib/queue/core/payloadBuilder.ts b/packages/core/lib/queue/core/payloadBuilder.ts index 792a509..71aed37 100644 --- a/packages/core/lib/queue/core/payloadBuilder.ts +++ b/packages/core/lib/queue/core/payloadBuilder.ts @@ -9,17 +9,18 @@ type Complete = { : T[P] | undefined; }; -const calculateDelay = (delay: number | string | Date): number => { +const calculateDelay = (delay: number | string | Date): [number, number] => { const now = Date.now(); if (delay instanceof Date) { const time = delay.getTime(); - return now > time ? now : time; + const netDelayInSeconds = (time - now) / 1000; + return now > time ? [now, 0] : [time, netDelayInSeconds]; } const delayInMs = typeof delay === 'string' ? ms(delay) : delay * 1000; const calculatedDelay = now + delayInMs; - if (calculatedDelay < now) return now; - return calculatedDelay; + if (calculatedDelay < now) return [now, 0]; + return [calculatedDelay, (calculatedDelay - now) / 1000]; }; export class PayloadBuilder { @@ -37,7 +38,9 @@ export class PayloadBuilder { ...message, } as Complete; - payload.delay = calculateDelay(payload.delay || 0); + const [delay, netDelayInSeconds] = calculateDelay(payload.delay || 0); + payload.delay = delay; + payload.netDelayInSeconds = netDelayInSeconds; payload.connection = payload.connection || defaultOptions.connection; if (!payload.queue) { diff --git a/packages/core/lib/queue/drivers/redis.ts b/packages/core/lib/queue/drivers/redis.ts index b1a61a3..08b4c9d 100644 --- a/packages/core/lib/queue/drivers/redis.ts +++ b/packages/core/lib/queue/drivers/redis.ts @@ -70,11 +70,13 @@ export class RedisQueueDriver implements PollQueueDriver { } async purge(options: Record): Promise { + await this.initializeModules(); await this.client.del(this.getQueue(options.queue)); await this.client.del(this.getDelayedQueue(options.queue)); } async count(options: Record): Promise { + await this.initializeModules(); return await this.client.llen(this.getQueue(options.queue)); } @@ -97,6 +99,7 @@ export class RedisQueueDriver implements PollQueueDriver { } async scheduledTask(options: Record): Promise { + await this.initializeModules(); await (this.client as any).findDelayedJob( this.getDelayedQueue(options.queue), this.getQueue(options.queue), diff --git a/packages/core/lib/queue/drivers/sqs.ts b/packages/core/lib/queue/drivers/sqs.ts index c380db0..3ed014b 100644 --- a/packages/core/lib/queue/drivers/sqs.ts +++ b/packages/core/lib/queue/drivers/sqs.ts @@ -11,25 +11,15 @@ export class SqsQueueDriver implements PollQueueDriver { constructor(private options: Record) { validateOptions(options, SqsQueueOptionsDto, { cls: SqsQueueDriver.name }); - this.initializeModules().then(() => { - this.client = new this.AWS.SQS({ - region: options.region, - apiVersion: options.apiVersion, - credentials: options.credentials || { - accessKeyId: options.accessKey, - secretAccessKey: options.secretKey, - }, - }); - }); + this.initializeModules(); } - init(): Promise { - throw new Error('Method not implemented.'); - } + async init(): Promise {} async push(message: string, rawPayload: InternalMessage): Promise { + await this.initializeModules(); const params = { - DelaySeconds: rawPayload.delay, + DelaySeconds: rawPayload.netDelayInSeconds, MessageBody: message, QueueUrl: joinUrl(this.options.prefix, rawPayload.queue), }; @@ -39,6 +29,7 @@ export class SqsQueueDriver implements PollQueueDriver { } async pull(options: Record): Promise { + await this.initializeModules(); const params = { MaxNumberOfMessages: 10, MessageAttributeNames: ['All'], @@ -53,6 +44,7 @@ export class SqsQueueDriver implements PollQueueDriver { } async remove(job: SqsJob, options: Record): Promise { + await this.initializeModules(); const params = { QueueUrl: joinUrl(this.options.prefix, options.queue), ReceiptHandle: job.data.ReceiptHandle, @@ -64,6 +56,7 @@ export class SqsQueueDriver implements PollQueueDriver { } async purge(options: Record): Promise { + await this.initializeModules(); const params = { QueueUrl: joinUrl(this.options.prefix, options.queue), }; @@ -74,6 +67,7 @@ export class SqsQueueDriver implements PollQueueDriver { } async count(options: Record): Promise { + await this.initializeModules(); const params = { QueueUrl: joinUrl(this.options.prefix, options.queue), AttributeNames: ['ApproximateNumberOfMessages'], @@ -84,6 +78,15 @@ export class SqsQueueDriver implements PollQueueDriver { } async initializeModules(): Promise { + if (this.AWS && this.client) return Promise.resolve(); this.AWS = await Package.load('@aws-sdk/client-sqs'); + this.client = new this.AWS.SQS({ + region: this.options.region, + apiVersion: this.options.apiVersion, + credentials: this.options.credentials || { + accessKeyId: this.options.accessKey, + secretAccessKey: this.options.secretKey, + }, + }); } } diff --git a/packages/core/lib/queue/strategy/message.ts b/packages/core/lib/queue/strategy/message.ts index e4957c0..feaaaad 100644 --- a/packages/core/lib/queue/strategy/message.ts +++ b/packages/core/lib/queue/strategy/message.ts @@ -17,4 +17,5 @@ export interface InternalMessage extends Message { attemptCount: number; id: string; delay?: number; + netDelayInSeconds?: number; } diff --git a/packages/core/lib/queue/workers/pollQueue.ts b/packages/core/lib/queue/workers/pollQueue.ts index 293088d..47f6b01 100644 --- a/packages/core/lib/queue/workers/pollQueue.ts +++ b/packages/core/lib/queue/workers/pollQueue.ts @@ -173,7 +173,7 @@ export class PollQueueWorker extends BaseQueueWorker { this.emitEvent(new JobMaxRetriesExceeed(message, job)); return; } - await Dispatch(message); + await Dispatch({ ...message, delay: message.netDelayInSeconds }); this.emitEvent(new JobFailed(message, job)); } diff --git a/packages/core/package.json b/packages/core/package.json index 0c48972..4456b25 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "@intentjs/core", - "version": "0.1.48", + "version": "0.1.53", "description": "Core module for Intent", "repository": { "type": "git", @@ -131,7 +131,7 @@ "typescript": "^5.5.2" }, "dependencies": { - "@intentjs/hyper-express": "^0.0.7", + "@intentjs/hyper-express": "^0.0.11", "@nestjs/common": "^11.0.12", "@nestjs/core": "^11.0.12", "@react-email/components": "^0.0.32", diff --git a/packages/hyper-express/index.js b/packages/hyper-express/index.js index b34020a..9bc9487 100644 --- a/packages/hyper-express/index.js +++ b/packages/hyper-express/index.js @@ -10,6 +10,7 @@ const LiveFile = require('./src/components/plugins/LiveFile.js'); const MultipartField = require('./src/components/plugins/MultipartField.js'); const SSEventStream = require('./src/components/plugins/SSEventStream.js'); const Websocket = require('./src/components/ws/Websocket.js'); +const UploadedFile = require('./src/shared/uploaded-file.js'); // Disable the uWebsockets.js version header if not specified to be kept if (!process.env['KEEP_UWS_HEADER']) { @@ -28,6 +29,9 @@ module.exports = { MultipartField, SSEventStream, Websocket, + UploadedFile, compressors: uWebsockets, - express(...args) { return new Server(...args); }, + express(...args) { + return new Server(...args); + }, }; diff --git a/packages/hyper-express/package.json b/packages/hyper-express/package.json index 64b41c4..dee1b78 100644 --- a/packages/hyper-express/package.json +++ b/packages/hyper-express/package.json @@ -1,6 +1,6 @@ { "name": "@intentjs/hyper-express", - "version": "0.0.7", + "version": "0.0.11", "description": "A fork of hyper-express to suit IntentJS requirements. High performance Node.js webserver with a simple-to-use API powered by uWebsockets.js under the hood.", "main": "index.js", "types": "./types/index.d.ts", diff --git a/packages/hyper-express/src/components/compatibility/ExpressRequest.js b/packages/hyper-express/src/components/compatibility/ExpressRequest.js index 6f5ea92..14c7495 100644 --- a/packages/hyper-express/src/components/compatibility/ExpressRequest.js +++ b/packages/hyper-express/src/components/compatibility/ExpressRequest.js @@ -100,6 +100,10 @@ class ExpressRequest { return this.url; } + set originalUrl(url) { + this.url = url; + } + get fresh() { this._throw_unsupported('fresh'); } diff --git a/packages/hyper-express/src/components/http/Request.js b/packages/hyper-express/src/components/http/Request.js index 00876d7..78c42d5 100644 --- a/packages/hyper-express/src/components/http/Request.js +++ b/packages/hyper-express/src/components/http/Request.js @@ -858,6 +858,10 @@ class Request { return this._url; } + set url(url) { + this._url = url; + } + /** * Returns path for incoming request. * @returns {String} diff --git a/packages/hyper-express/src/shared/uploaded-file.js b/packages/hyper-express/src/shared/uploaded-file.js index 2cbb4f0..1f0077e 100644 --- a/packages/hyper-express/src/shared/uploaded-file.js +++ b/packages/hyper-express/src/shared/uploaded-file.js @@ -28,11 +28,11 @@ class UploadedFile { } get extension() { - return this.filename; + return this._filename.split('.').pop(); } async toBuffer() { - return fs.readFileSync(this.tempPath); + return fs.readFileSync(this._tempPath); } } diff --git a/packages/hyper-express/types/shared/uploaded-file.d.ts b/packages/hyper-express/types/shared/uploaded-file.d.ts index 0be241b..abc59f1 100644 --- a/packages/hyper-express/types/shared/uploaded-file.d.ts +++ b/packages/hyper-express/types/shared/uploaded-file.d.ts @@ -1,5 +1,3 @@ -import { readFileSync } from 'fs-extra'; - export class UploadedFile { _filename: string; _size: number;