From 30e4be4f5b6727a6717971e507466ebdd5e3c0b3 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Sun, 6 Jul 2025 13:57:05 +0600 Subject: [PATCH 01/11] fix(controlplane): introducing abstract classes to improve worker & queue - Replaced individual worker and queue interfaces with abstract base classes for better code reuse and maintainability. - Updated worker and queue classes to extend from the new base abstract classes, simplifying their constructors and error handling and log managements. - Removed deprecated worker creation functions in favor of the new base worker & queue pattern. - Adjusted imports and cleaned up unused code across multiple worker files. --- controlplane/src/core/build-server.ts | 127 ++++++++---------- .../src/core/workers/AIGraphReadmeWorker.ts | 50 ++----- .../src/core/workers/CacheWarmerWorker.ts | 65 ++------- .../workers/DeactivateOrganizationWorker.ts | 65 ++------- .../DeleteOrganizationAuditLogsWorker.ts | 63 ++------- .../core/workers/DeleteOrganizationWorker.ts | 65 ++------- .../src/core/workers/DeleteUserQueue.ts | 64 ++------- .../workers/ReactivateOrganizationWorker.ts | 64 ++------- controlplane/src/core/workers/Worker.ts | 11 -- controlplane/src/core/workers/base/Queue.ts | 47 +++++++ controlplane/src/core/workers/base/Worker.ts | 29 ++++ controlplane/src/core/workers/base/index.ts | 2 + 12 files changed, 212 insertions(+), 440 deletions(-) delete mode 100644 controlplane/src/core/workers/Worker.ts create mode 100644 controlplane/src/core/workers/base/Queue.ts create mode 100644 controlplane/src/core/workers/base/Worker.ts create mode 100644 controlplane/src/core/workers/base/index.ts diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index 115dc5eb7c..fcb3d2f145 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -10,7 +10,7 @@ import { App } from 'octokit'; import { Worker } from 'bullmq'; import routes from './routes.js'; import fastifyHealth from './plugins/health.js'; -import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js'; +import fastifyMetrics from './plugins/metrics.js'; import fastifyDatabase from './plugins/database.js'; import fastifyClickHouse from './plugins/clickhouse.js'; import fastifyRedis from './plugins/redis.js'; @@ -36,23 +36,17 @@ import { Authorization } from './services/Authorization.js'; import { BillingRepository } from './repositories/BillingRepository.js'; import { BillingService } from './services/BillingService.js'; import { UserRepository } from './repositories/UserRepository.js'; -import { AIGraphReadmeQueue, createAIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js'; +import { AIGraphReadmeQueue, AIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js'; import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName } from './util.js'; import { ApiKeyRepository } from './repositories/ApiKeyRepository.js'; -import { createDeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; +import { DeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; import { - createDeleteOrganizationAuditLogsWorker, + DeleteOrganizationAuditLogsWorker, DeleteOrganizationAuditLogsQueue, } from './workers/DeleteOrganizationAuditLogsWorker.js'; -import { - createDeactivateOrganizationWorker, - DeactivateOrganizationQueue, -} from './workers/DeactivateOrganizationWorker.js'; -import { createDeleteUserWorker, DeleteUserQueue } from './workers/DeleteUserQueue.js'; -import { - createReactivateOrganizationWorker, - ReactivateOrganizationQueue, -} from './workers/ReactivateOrganizationWorker.js'; +import { DeactivateOrganizationWorker, DeactivateOrganizationQueue } from './workers/DeactivateOrganizationWorker.js'; +import { DeleteUserWorker, DeleteUserQueue } from './workers/DeleteUserQueue.js'; +import { ReactivateOrganizationWorker, ReactivateOrganizationQueue } from './workers/ReactivateOrganizationWorker.js'; export interface BuildConfig { logger: LoggerOptions; @@ -317,73 +311,68 @@ export default async function build(opts: BuildConfig) { const readmeQueue = new AIGraphReadmeQueue(logger, fastify.redisForQueue); if (opts.openaiAPIKey) { - bullWorkers.push( - createAIGraphReadmeWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - openAiApiKey: opts.openaiAPIKey, - }), - ); - } - - const deleteOrganizationAuditLogsQueue = new DeleteOrganizationAuditLogsQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeleteOrganizationAuditLogsWorker({ + const readmeWorker = new AIGraphReadmeWorker({ redisConnection: fastify.redisForWorker, db: fastify.db, logger, - }), - ); + openAiApiKey: opts.openaiAPIKey, + }); + + bullWorkers.push(readmeWorker.create()); + } + + const deleteOrganizationAuditLogsQueue = new DeleteOrganizationAuditLogsQueue(logger, fastify.redisForQueue); + const deleteOrganizationAuditLogsWorker = new DeleteOrganizationAuditLogsWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + }); + bullWorkers.push(deleteOrganizationAuditLogsWorker.create()); const deleteOrganizationQueue = new DeleteOrganizationQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeleteOrganizationWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - keycloakClient, - keycloakRealm: opts.keycloak.realm, - blobStorage, - deleteOrganizationAuditLogsQueue, - }), - ); + const deleteOrganizationWorker = new DeleteOrganizationWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + keycloakClient, + keycloakRealm: opts.keycloak.realm, + blobStorage, + deleteOrganizationAuditLogsQueue, + }); + bullWorkers.push(deleteOrganizationWorker.create()); const deactivateOrganizationQueue = new DeactivateOrganizationQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeactivateOrganizationWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - keycloakClient, - keycloakRealm: opts.keycloak.realm, - deleteOrganizationQueue, - }), - ); + const deactivateOrganizationWorker = new DeactivateOrganizationWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + keycloakClient, + keycloakRealm: opts.keycloak.realm, + deleteOrganizationQueue, + }); + bullWorkers.push(deactivateOrganizationWorker.create()); const reactivateOrganizationQueue = new ReactivateOrganizationQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createReactivateOrganizationWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - deleteOrganizationQueue, - }), - ); + const reactivateOrganizationWorker = new ReactivateOrganizationWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + deleteOrganizationQueue, + }); + bullWorkers.push(reactivateOrganizationWorker.create()); const deleteUserQueue = new DeleteUserQueue(logger, fastify.redisForQueue); - bullWorkers.push( - createDeleteUserWorker({ - redisConnection: fastify.redisForWorker, - db: fastify.db, - logger, - keycloakClient, - keycloakRealm: opts.keycloak.realm, - blobStorage, - platformWebhooks, - deleteOrganizationAuditLogsQueue, - }), - ); + const deleteUserWorker = new DeleteUserWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + keycloakClient, + keycloakRealm: opts.keycloak.realm, + blobStorage, + platformWebhooks, + deleteOrganizationAuditLogsQueue, + }); + bullWorkers.push(deleteUserWorker.create()); // required to verify webhook payloads await fastify.register(import('fastify-raw-body'), { diff --git a/controlplane/src/core/workers/AIGraphReadmeWorker.ts b/controlplane/src/core/workers/AIGraphReadmeWorker.ts index 05cc28c93c..89653b372f 100644 --- a/controlplane/src/core/workers/AIGraphReadmeWorker.ts +++ b/controlplane/src/core/workers/AIGraphReadmeWorker.ts @@ -1,11 +1,11 @@ import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; -import { ConnectionOptions, Job, Worker, Queue } from 'bullmq'; +import { ConnectionOptions, Job } from 'bullmq'; import pino from 'pino'; import * as schema from '../../db/schema.js'; import { OpenAIGraphql } from '../openai-graphql/index.js'; import { SubgraphRepository } from '../repositories/SubgraphRepository.js'; import { FederatedGraphRepository } from '../repositories/FederatedGraphRepository.js'; -import { IQueue, IWorker } from './Worker.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'ai.graph-readme-generator'; const WorkerName = 'AIGraphReadmeWorker'; @@ -16,15 +16,13 @@ export interface CreateReadmeInputEvent { type: 'subgraph' | 'federated_graph'; } -export class AIGraphReadmeQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class AIGraphReadmeQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { + super({ + name: QueueName, + conn, + log, + jobsOptions: { removeOnComplete: true, removeOnFail: true, attempts: 3, @@ -34,10 +32,6 @@ export class AIGraphReadmeQueue implements IQueue { }, }, }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); } public addJob(job: CreateReadmeInputEvent) { @@ -61,7 +55,7 @@ export class AIGraphReadmeQueue implements IQueue { } } -class AIGraphReadmeWorker implements IWorker { +export class AIGraphReadmeWorker extends BaseWorker { private readonly openaiGraphql: OpenAIGraphql; constructor( @@ -72,10 +66,14 @@ class AIGraphReadmeWorker implements IWorker { openAiApiKey: string; }, ) { + super(WorkerName, QueueName, input.logger, { + connection: input.redisConnection, + concurrency: 10, + }); + this.openaiGraphql = new OpenAIGraphql({ openAiApiKey: input.openAiApiKey, }); - this.input.logger = input.logger.child({ worker: WorkerName }); } private async generateSubgraphReadme(job: Job) { @@ -140,23 +138,3 @@ class AIGraphReadmeWorker implements IWorker { } } } - -export const createAIGraphReadmeWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - openAiApiKey: string; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker(QueueName, (job) => new AIGraphReadmeWorker(input).handler(job), { - connection: input.redisConnection, - concurrency: 10, - }); - worker.on('stalled', (job) => { - log.warn(`Job ${job} stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/CacheWarmerWorker.ts b/controlplane/src/core/workers/CacheWarmerWorker.ts index cb170f77e0..c6f7de66bc 100644 --- a/controlplane/src/core/workers/CacheWarmerWorker.ts +++ b/controlplane/src/core/workers/CacheWarmerWorker.ts @@ -1,4 +1,4 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import * as schema from '../../db/schema.js'; @@ -6,7 +6,7 @@ import { BlobStorage } from '../blobstorage/index.js'; import { ClickHouseClient } from '../clickhouse/index.js'; import { S3RouterConfigMetadata } from '../composition/composer.js'; import { CacheWarmerRepository } from '../repositories/CacheWarmerRepository.js'; -import { IQueue, IWorker } from './Worker.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'cache.warmer'; const WorkerName = 'CacheWarmerWorker'; @@ -17,31 +17,12 @@ export interface CacheWarmerInput { rangeInHours: number; } -export class CacheWarmerQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class CacheWarmerQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); + super({ + name: QueueName, + conn, + log, }); } @@ -65,17 +46,21 @@ export class CacheWarmerQueue implements IQueue { } } -class CacheWarmerWorker implements IWorker { +export class CacheWarmerWorker extends BaseWorker { constructor( private input: { + redisConnection: ConnectionOptions; db: PostgresJsDatabase; + logger: pino.Logger; chClient: ClickHouseClient | undefined; blobStorage: BlobStorage; - logger: pino.Logger; cacheWarmerQueue: CacheWarmerQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, input.logger, { + connection: input.redisConnection, + concurrency: 10, + }); } public async handler(job: Job) { @@ -116,25 +101,3 @@ class CacheWarmerWorker implements IWorker { } } } - -export const createCacheWarmerWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - chClient: ClickHouseClient | undefined; - blobStorage: BlobStorage; - cacheWarmerQueue: CacheWarmerQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker(QueueName, (job) => new CacheWarmerWorker(input).handler(job), { - connection: input.redisConnection, - concurrency: 10, - }); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts index d1ee93cb3e..63a0dcd679 100644 --- a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts @@ -1,11 +1,11 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import Keycloak from '../services/Keycloak.js'; import { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; -import { IQueue, IWorker } from './Worker.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.deactivate'; const WorkerName = 'DeactivateOrganizationWorker'; @@ -16,32 +16,9 @@ export interface DeactivateOrganizationInput { deactivationReason?: string; } -export class DeactivateOrganizationQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class DeactivateOrganizationQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, conn, log }); } public addJob(job: DeactivateOrganizationInput, opts?: Omit) { @@ -61,9 +38,10 @@ export class DeactivateOrganizationQueue implements IQueue { constructor( private input: { + redisConnection: ConnectionOptions; db: PostgresJsDatabase; logger: pino.Logger; keycloakClient: Keycloak; @@ -71,7 +49,10 @@ class DeactivateOrganizationWorker implements IWorker { deleteOrganizationQueue: DeleteOrganizationQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, input.logger, { + connection: input.redisConnection, + concurrency: 10, + }); } public async handler(job: Job) { @@ -106,29 +87,3 @@ class DeactivateOrganizationWorker implements IWorker { } } } - -export const createDeactivateOrganizationWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - keycloakClient: Keycloak; - keycloakRealm: string; - deleteOrganizationQueue: DeleteOrganizationQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new DeactivateOrganizationWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts index 77465438c4..7e0ae4854c 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts @@ -1,9 +1,9 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import * as schema from '../../db/schema.js'; import { AuditLogRepository } from '../repositories/AuditLogRepository.js'; -import { IQueue, IWorker } from './Worker.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.delete_audit_logs'; const WorkerName = 'DeleteOrganizationAuditLogsWorker'; @@ -12,32 +12,9 @@ export interface DeleteOrganizationAuditLogsInput { organizationId: string; } -export class DeleteOrganizationAuditLogsQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class DeleteOrganizationAuditLogsQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, log, conn }); } public addJob(job: DeleteOrganizationAuditLogsInput, opts?: Omit) { @@ -56,7 +33,7 @@ export class DeleteOrganizationAuditLogsQueue implements IQueue { constructor( private input: { redisConnection: ConnectionOptions; @@ -64,7 +41,10 @@ class DeleteOrganizationAuditLogsWorker implements IWorker { logger: pino.Logger; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, input.logger, { + connection: input.redisConnection, + concurrency: 10, + }); } public async handler(job: Job) { @@ -82,28 +62,3 @@ class DeleteOrganizationAuditLogsWorker implements IWorker { } } } - -export const createDeleteOrganizationAuditLogsWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new DeleteOrganizationAuditLogsWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - - worker.on('stalled', (job) => { - log.warn({ joinId: job }, 'Job stalled'); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - - return worker; -}; diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganizationWorker.ts index 2846b1a0d0..e2b9218e65 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationWorker.ts @@ -1,4 +1,4 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import * as schema from '../../db/schema.js'; @@ -7,8 +7,8 @@ import Keycloak from '../services/Keycloak.js'; import { OidcRepository } from '../repositories/OidcRepository.js'; import OidcProvider from '../services/OidcProvider.js'; import { BlobStorage } from '../blobstorage/index.js'; -import { IQueue, IWorker } from './Worker.js'; import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogsWorker.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.delete'; const WorkerName = 'DeleteOrganizationWorker'; @@ -17,32 +17,9 @@ export interface DeleteOrganizationInput { organizationId: string; } -export class DeleteOrganizationQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class DeleteOrganizationQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, log, conn }) } public addJob(job: DeleteOrganizationInput, opts?: Omit) { @@ -61,7 +38,7 @@ export class DeleteOrganizationQueue implements IQueue } } -class DeleteOrganizationWorker implements IWorker { +export class DeleteOrganizationWorker extends BaseWorker { constructor( private input: { redisConnection: ConnectionOptions; @@ -73,7 +50,10 @@ class DeleteOrganizationWorker implements IWorker { deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, input.logger, { + connection: input.redisConnection, + concurrency: 10, + }); } public async handler(job: Job) { @@ -131,30 +111,3 @@ class DeleteOrganizationWorker implements IWorker { } } } - -export const createDeleteOrganizationWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - keycloakClient: Keycloak; - keycloakRealm: string; - blobStorage: BlobStorage; - deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new DeleteOrganizationWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/DeleteUserQueue.ts b/controlplane/src/core/workers/DeleteUserQueue.ts index e03e29a35b..f4fe340ec8 100644 --- a/controlplane/src/core/workers/DeleteUserQueue.ts +++ b/controlplane/src/core/workers/DeleteUserQueue.ts @@ -1,5 +1,5 @@ import { PlatformEventName } from '@wundergraph/cosmo-connect/dist/notifications/events_pb'; -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import * as schema from '../../db/schema.js'; @@ -8,7 +8,7 @@ import { OrganizationRepository } from '../repositories/OrganizationRepository.j import { UserRepository } from '../repositories/UserRepository.js'; import Keycloak from '../services/Keycloak.js'; import { PlatformWebhookService } from '../webhooks/PlatformWebhookService.js'; -import { IQueue, IWorker } from './Worker.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogsWorker.js'; const QueueName = 'user.delete'; @@ -19,32 +19,9 @@ export interface DeleteUserInput { userEmail: string; } -export class DeleteUserQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class DeleteUserQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, log, conn }); } public addJob(job: DeleteUserInput, opts?: Omit) { @@ -63,11 +40,11 @@ export class DeleteUserQueue implements IQueue { } } -class DeleteUserWorker implements IWorker { +export class DeleteUserWorker extends BaseWorker { constructor( private input: { - redisConnection: ConnectionOptions; db: PostgresJsDatabase; + redisConnection: ConnectionOptions; logger: pino.Logger; keycloakClient: Keycloak; keycloakRealm: string; @@ -76,7 +53,10 @@ class DeleteUserWorker implements IWorker { deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, input.logger, { + connection: input.redisConnection, + concurrency: 10, + }); } public async handler(job: Job) { @@ -123,27 +103,3 @@ class DeleteUserWorker implements IWorker { } } } - -export const createDeleteUserWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - keycloakClient: Keycloak; - keycloakRealm: string; - blobStorage: BlobStorage; - platformWebhooks: PlatformWebhookService; - deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker(QueueName, (job) => new DeleteUserWorker(input).handler(job), { - connection: input.redisConnection, - concurrency: 10, - }); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts index 24c20fdf1a..d8884f7bd9 100644 --- a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts @@ -1,11 +1,10 @@ -import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; -import Keycloak from '../services/Keycloak.js'; import { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; -import { IQueue, IWorker } from './Worker.js'; +import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.reactivate'; const WorkerName = 'ReactivateOrganizationWorker'; @@ -15,32 +14,9 @@ export interface ReactivateOrganizationInput { organizationSlug: string; } -export class ReactivateOrganizationQueue implements IQueue { - private readonly queue: Queue; - private readonly logger: pino.Logger; - +export class ReactivateOrganizationQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - this.logger = log.child({ queue: QueueName }); - this.queue = new Queue(QueueName, { - connection: conn, - defaultJobOptions: { - removeOnComplete: { - age: 90 * 86_400, - }, - removeOnFail: { - age: 90 * 86_400, - }, - attempts: 6, - backoff: { - type: 'exponential', - delay: 112_000, - }, - }, - }); - - this.queue.on('error', (err) => { - this.logger.error(err, 'Queue error'); - }); + super({ name: QueueName, log, conn }); } public addJob(job: ReactivateOrganizationInput, opts?: Omit) { @@ -59,15 +35,19 @@ export class ReactivateOrganizationQueue implements IQueue { constructor( private input: { + redisConnection: ConnectionOptions; db: PostgresJsDatabase; logger: pino.Logger; deleteOrganizationQueue: DeleteOrganizationQueue; }, ) { - this.input.logger = input.logger.child({ worker: WorkerName }); + super(WorkerName, QueueName, input.logger, { + connection: input.redisConnection, + concurrency: 10, + }); } public async handler(job: Job) { @@ -96,27 +76,3 @@ class ReactivateOrganizationWorker implements IWorker { } } } - -export const createReactivateOrganizationWorker = (input: { - redisConnection: ConnectionOptions; - db: PostgresJsDatabase; - logger: pino.Logger; - deleteOrganizationQueue: DeleteOrganizationQueue; -}) => { - const log = input.logger.child({ worker: WorkerName }); - const worker = new Worker( - QueueName, - (job) => new ReactivateOrganizationWorker(input).handler(job), - { - connection: input.redisConnection, - concurrency: 10, - }, - ); - worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); - }); - worker.on('error', (err) => { - log.error(err, 'Worker error'); - }); - return worker; -}; diff --git a/controlplane/src/core/workers/Worker.ts b/controlplane/src/core/workers/Worker.ts deleted file mode 100644 index b268460a33..0000000000 --- a/controlplane/src/core/workers/Worker.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { Job, JobsOptions } from 'bullmq'; - -export interface IQueue { - addJob(job: T, opts?: Omit): Promise | undefined>; - removeJob(job: T): Promise; - getJob(job: T): Promise | undefined>; -} - -export interface IWorker { - handler(job: Job): Promise; -} diff --git a/controlplane/src/core/workers/base/Queue.ts b/controlplane/src/core/workers/base/Queue.ts new file mode 100644 index 0000000000..5349b5ed8a --- /dev/null +++ b/controlplane/src/core/workers/base/Queue.ts @@ -0,0 +1,47 @@ + +import { ConnectionOptions, Job, JobsOptions, Queue } from 'bullmq'; +import pino from 'pino'; + +const defaultJobOptions: JobsOptions = { + removeOnComplete: { + age: 90 * 86_400, + }, + removeOnFail: { + age: 90 * 86_400, + }, + attempts: 6, + backoff: { + type: 'exponential', + delay: 112_000, + }, +}; + +export type BaseQueueParams = { + name: string; + conn: ConnectionOptions; + log: pino.Logger; + jobsOptions?: JobsOptions; +} + +export abstract class BaseQueue { + protected readonly queue: Queue; + protected readonly logger: pino.Logger; + + constructor({ name, conn, log, jobsOptions = defaultJobOptions }: BaseQueueParams) { + this.logger = log.child({ queue: name }); + this.queue = new Queue(name, { + connection: conn, + defaultJobOptions: jobsOptions, + }); + + this.queue.on('error', (err) => { + this.logger.error(err, `error [Queue: ${name}]`); + }); + } + + public abstract addJob(job: T, opts?: Omit): Promise | undefined>; + + public abstract removeJob(job: T): Promise; + + public abstract getJob(job: T): Promise | undefined>; +} diff --git a/controlplane/src/core/workers/base/Worker.ts b/controlplane/src/core/workers/base/Worker.ts new file mode 100644 index 0000000000..877fca9308 --- /dev/null +++ b/controlplane/src/core/workers/base/Worker.ts @@ -0,0 +1,29 @@ +import { WorkerOptions, Job, Worker } from 'bullmq'; +import pino from 'pino'; + +export abstract class BaseWorker { + constructor( + protected readonly name: string, + protected readonly queueName: string, + protected logger: pino.Logger, + protected options: WorkerOptions + ) { + this.logger = logger.child({ worker: name }); + } + + public create(): Worker { + const worker = new Worker(this.queueName, (job) => this.handler(job), this.options); + + worker.on('stalled', (jobId) => { + this.logger.warn({ jobId }, `Job stalled [Worker: ${this.name}]`); + }); + + worker.on('error', (err) => { + this.logger.error(err, `error [Worker: ${this.name}]`); + }); + + return worker; + } + + protected abstract handler(job: Job): Promise; +} diff --git a/controlplane/src/core/workers/base/index.ts b/controlplane/src/core/workers/base/index.ts new file mode 100644 index 0000000000..1112c69e6a --- /dev/null +++ b/controlplane/src/core/workers/base/index.ts @@ -0,0 +1,2 @@ +export * from './Queue.js'; +export * from './Worker.js'; From 7752503919e6242a4a1dee06eb865a2df2cf5d86 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Sun, 6 Jul 2025 16:04:33 +0600 Subject: [PATCH 02/11] chore: fix lint issues --- controlplane/src/core/workers/DeleteOrganizationWorker.ts | 2 +- controlplane/src/core/workers/base/Queue.ts | 7 +++---- controlplane/src/core/workers/base/Worker.ts | 8 ++++---- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganizationWorker.ts index e2b9218e65..0fb9e7eb19 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationWorker.ts @@ -19,7 +19,7 @@ export interface DeleteOrganizationInput { export class DeleteOrganizationQueue extends BaseQueue { constructor(log: pino.Logger, conn: ConnectionOptions) { - super({ name: QueueName, log, conn }) + super({ name: QueueName, log, conn }); } public addJob(job: DeleteOrganizationInput, opts?: Omit) { diff --git a/controlplane/src/core/workers/base/Queue.ts b/controlplane/src/core/workers/base/Queue.ts index 5349b5ed8a..7776249831 100644 --- a/controlplane/src/core/workers/base/Queue.ts +++ b/controlplane/src/core/workers/base/Queue.ts @@ -1,4 +1,3 @@ - import { ConnectionOptions, Job, JobsOptions, Queue } from 'bullmq'; import pino from 'pino'; @@ -15,15 +14,15 @@ const defaultJobOptions: JobsOptions = { delay: 112_000, }, }; - + export type BaseQueueParams = { name: string; conn: ConnectionOptions; log: pino.Logger; jobsOptions?: JobsOptions; -} +}; -export abstract class BaseQueue { +export abstract class BaseQueue { protected readonly queue: Queue; protected readonly logger: pino.Logger; diff --git a/controlplane/src/core/workers/base/Worker.ts b/controlplane/src/core/workers/base/Worker.ts index 877fca9308..be581b8b68 100644 --- a/controlplane/src/core/workers/base/Worker.ts +++ b/controlplane/src/core/workers/base/Worker.ts @@ -3,10 +3,10 @@ import pino from 'pino'; export abstract class BaseWorker { constructor( - protected readonly name: string, - protected readonly queueName: string, - protected logger: pino.Logger, - protected options: WorkerOptions + protected readonly name: string, + protected readonly queueName: string, + protected logger: pino.Logger, + protected options: WorkerOptions, ) { this.logger = logger.child({ worker: name }); } From 1f7ffbe28e94844721f974557d5ba3351c18dd5c Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Sun, 6 Jul 2025 16:12:57 +0600 Subject: [PATCH 03/11] fix: parent logger overright --- controlplane/src/core/workers/AIGraphReadmeWorker.ts | 5 +---- controlplane/src/core/workers/CacheWarmerWorker.ts | 5 +---- .../src/core/workers/DeleteOrganizationAuditLogsWorker.ts | 5 +---- controlplane/src/core/workers/DeleteOrganizationWorker.ts | 5 +---- .../src/core/workers/ReactivateOrganizationWorker.ts | 5 +---- controlplane/src/core/workers/base/Worker.ts | 4 +++- 6 files changed, 8 insertions(+), 21 deletions(-) diff --git a/controlplane/src/core/workers/AIGraphReadmeWorker.ts b/controlplane/src/core/workers/AIGraphReadmeWorker.ts index 89653b372f..3c514df74b 100644 --- a/controlplane/src/core/workers/AIGraphReadmeWorker.ts +++ b/controlplane/src/core/workers/AIGraphReadmeWorker.ts @@ -66,10 +66,7 @@ export class AIGraphReadmeWorker extends BaseWorker { openAiApiKey: string; }, ) { - super(WorkerName, QueueName, input.logger, { - connection: input.redisConnection, - concurrency: 10, - }); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); this.openaiGraphql = new OpenAIGraphql({ openAiApiKey: input.openAiApiKey, diff --git a/controlplane/src/core/workers/CacheWarmerWorker.ts b/controlplane/src/core/workers/CacheWarmerWorker.ts index c6f7de66bc..4b87cab506 100644 --- a/controlplane/src/core/workers/CacheWarmerWorker.ts +++ b/controlplane/src/core/workers/CacheWarmerWorker.ts @@ -57,10 +57,7 @@ export class CacheWarmerWorker extends BaseWorker { cacheWarmerQueue: CacheWarmerQueue; }, ) { - super(WorkerName, QueueName, input.logger, { - connection: input.redisConnection, - concurrency: 10, - }); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } public async handler(job: Job) { diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts index 7e0ae4854c..4a1f69fb6d 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts @@ -41,10 +41,7 @@ export class DeleteOrganizationAuditLogsWorker extends BaseWorker) { diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganizationWorker.ts index 0fb9e7eb19..f31bcca868 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationWorker.ts @@ -50,10 +50,7 @@ export class DeleteOrganizationWorker extends BaseWorker) { diff --git a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts index d8884f7bd9..3b813a079b 100644 --- a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts @@ -44,10 +44,7 @@ export class ReactivateOrganizationWorker extends BaseWorker) { diff --git a/controlplane/src/core/workers/base/Worker.ts b/controlplane/src/core/workers/base/Worker.ts index be581b8b68..5dc0f15deb 100644 --- a/controlplane/src/core/workers/base/Worker.ts +++ b/controlplane/src/core/workers/base/Worker.ts @@ -2,11 +2,13 @@ import { WorkerOptions, Job, Worker } from 'bullmq'; import pino from 'pino'; export abstract class BaseWorker { + protected readonly logger: pino.Logger; + constructor( protected readonly name: string, protected readonly queueName: string, - protected logger: pino.Logger, protected options: WorkerOptions, + logger: pino.Logger, ) { this.logger = logger.child({ worker: name }); } From cd269dc65ac710ffac550ebe973b9c44d8fbfa51 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Mon, 7 Jul 2025 09:41:40 +0600 Subject: [PATCH 04/11] fix: used async await with handler for proper sync --- controlplane/src/core/workers/base/Worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/src/core/workers/base/Worker.ts b/controlplane/src/core/workers/base/Worker.ts index 5dc0f15deb..af0a41b2ff 100644 --- a/controlplane/src/core/workers/base/Worker.ts +++ b/controlplane/src/core/workers/base/Worker.ts @@ -14,7 +14,7 @@ export abstract class BaseWorker { } public create(): Worker { - const worker = new Worker(this.queueName, (job) => this.handler(job), this.options); + const worker = new Worker(this.queueName, async (job) => await this.handler(job), this.options); worker.on('stalled', (jobId) => { this.logger.warn({ jobId }, `Job stalled [Worker: ${this.name}]`); From 085738eec9a24d17c60fdbaa738f0748f9f4d93a Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Mon, 7 Jul 2025 09:43:56 +0600 Subject: [PATCH 05/11] fix: change access modifier of handle func to protected --- controlplane/src/core/workers/AIGraphReadmeWorker.ts | 2 +- controlplane/src/core/workers/CacheWarmerWorker.ts | 2 +- .../src/core/workers/DeactivateOrganizationWorker.ts | 7 ++----- .../src/core/workers/DeleteOrganizationAuditLogsWorker.ts | 2 +- controlplane/src/core/workers/DeleteOrganizationWorker.ts | 2 +- controlplane/src/core/workers/DeleteUserQueue.ts | 7 ++----- .../src/core/workers/ReactivateOrganizationWorker.ts | 2 +- 7 files changed, 9 insertions(+), 15 deletions(-) diff --git a/controlplane/src/core/workers/AIGraphReadmeWorker.ts b/controlplane/src/core/workers/AIGraphReadmeWorker.ts index 3c514df74b..a71fb30af2 100644 --- a/controlplane/src/core/workers/AIGraphReadmeWorker.ts +++ b/controlplane/src/core/workers/AIGraphReadmeWorker.ts @@ -117,7 +117,7 @@ export class AIGraphReadmeWorker extends BaseWorker { }); } - public async handler(job: Job) { + protected async handler(job: Job) { try { if (job.data.type === 'subgraph') { await this.generateSubgraphReadme(job); diff --git a/controlplane/src/core/workers/CacheWarmerWorker.ts b/controlplane/src/core/workers/CacheWarmerWorker.ts index 4b87cab506..9b080f3572 100644 --- a/controlplane/src/core/workers/CacheWarmerWorker.ts +++ b/controlplane/src/core/workers/CacheWarmerWorker.ts @@ -60,7 +60,7 @@ export class CacheWarmerWorker extends BaseWorker { super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } - public async handler(job: Job) { + protected async handler(job: Job) { const organizationId = job.data.organizationId; const federatedGraphId = job.data.federatedGraphId; const rangeInHours = job.data.rangeInHours; diff --git a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts index 63a0dcd679..e4731821a8 100644 --- a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts @@ -49,13 +49,10 @@ export class DeactivateOrganizationWorker extends BaseWorker) { + protected async handler(job: Job) { try { this.input.logger.info('Processing deactivate job'); const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts index 4a1f69fb6d..d211366b1a 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts @@ -44,7 +44,7 @@ export class DeleteOrganizationAuditLogsWorker extends BaseWorker) { + protected async handler(job: Job) { try { const auditLogRepo = new AuditLogRepository(this.input.db); diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganizationWorker.ts index f31bcca868..54105a86bd 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationWorker.ts @@ -53,7 +53,7 @@ export class DeleteOrganizationWorker extends BaseWorker) { + protected async handler(job: Job) { try { const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); const oidcRepo = new OidcRepository(this.input.db); diff --git a/controlplane/src/core/workers/DeleteUserQueue.ts b/controlplane/src/core/workers/DeleteUserQueue.ts index f4fe340ec8..a5546b92b6 100644 --- a/controlplane/src/core/workers/DeleteUserQueue.ts +++ b/controlplane/src/core/workers/DeleteUserQueue.ts @@ -53,13 +53,10 @@ export class DeleteUserWorker extends BaseWorker { deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; }, ) { - super(WorkerName, QueueName, input.logger, { - connection: input.redisConnection, - concurrency: 10, - }); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } - public async handler(job: Job) { + protected async handler(job: Job) { try { const userRepo = new UserRepository(this.input.logger, this.input.db); const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); diff --git a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts index 3b813a079b..9228d3deb7 100644 --- a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts @@ -47,7 +47,7 @@ export class ReactivateOrganizationWorker extends BaseWorker) { + protected async handler(job: Job) { try { const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); From a859b2fbcf44c165d4fcd1a101640b802cdf1100 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Mon, 7 Jul 2025 09:48:56 +0600 Subject: [PATCH 06/11] fix: linted al changes --- controlplane/src/core/workers/DeactivateOrganizationWorker.ts | 2 +- controlplane/src/core/workers/DeleteUserQueue.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts index e4731821a8..4cd40b0916 100644 --- a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts @@ -49,7 +49,7 @@ export class DeactivateOrganizationWorker extends BaseWorker) { diff --git a/controlplane/src/core/workers/DeleteUserQueue.ts b/controlplane/src/core/workers/DeleteUserQueue.ts index a5546b92b6..d73c643978 100644 --- a/controlplane/src/core/workers/DeleteUserQueue.ts +++ b/controlplane/src/core/workers/DeleteUserQueue.ts @@ -53,7 +53,7 @@ export class DeleteUserWorker extends BaseWorker { deleteOrganizationAuditLogsQueue: DeleteOrganizationAuditLogsQueue; }, ) { - super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); + super(WorkerName, QueueName, { connection: input.redisConnection, concurrency: 10 }, input.logger); } protected async handler(job: Job) { From f1e5eb8ed6be87dd84ba39549076a83219f60935 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Mon, 7 Jul 2025 10:23:36 +0600 Subject: [PATCH 07/11] fix: removed Worker & Queue suffix from file names for better naming consistency --- controlplane/src/bin/deactivate-org.ts | 2 +- controlplane/src/bin/delete-user.ts | 2 +- controlplane/src/bin/reactivate-org.ts | 2 +- controlplane/src/core/build-server.ts | 12 ++++++------ .../src/core/repositories/OrganizationRepository.ts | 4 ++-- controlplane/src/core/repositories/UserRepository.ts | 2 +- controlplane/src/core/routes.ts | 12 ++++++------ .../{AIGraphReadmeWorker.ts => AIGraphReadme.ts} | 0 .../workers/{CacheWarmerWorker.ts => CacheWarmer.ts} | 0 ...ganizationWorker.ts => DeactivateOrganization.ts} | 2 +- ...teOrganizationWorker.ts => DeleteOrganization.ts} | 2 +- ...tLogsWorker.ts => DeleteOrganizationAuditLogs.ts} | 0 .../workers/{DeleteUserQueue.ts => DeleteUser.ts} | 2 +- ...ganizationWorker.ts => ReactivateOrganization.ts} | 2 +- 14 files changed, 22 insertions(+), 22 deletions(-) rename controlplane/src/core/workers/{AIGraphReadmeWorker.ts => AIGraphReadme.ts} (100%) rename controlplane/src/core/workers/{CacheWarmerWorker.ts => CacheWarmer.ts} (100%) rename controlplane/src/core/workers/{DeactivateOrganizationWorker.ts => DeactivateOrganization.ts} (97%) rename controlplane/src/core/workers/{DeleteOrganizationWorker.ts => DeleteOrganization.ts} (99%) rename controlplane/src/core/workers/{DeleteOrganizationAuditLogsWorker.ts => DeleteOrganizationAuditLogs.ts} (100%) rename controlplane/src/core/workers/{DeleteUserQueue.ts => DeleteUser.ts} (99%) rename controlplane/src/core/workers/{ReactivateOrganizationWorker.ts => ReactivateOrganization.ts} (97%) diff --git a/controlplane/src/bin/deactivate-org.ts b/controlplane/src/bin/deactivate-org.ts index 229ac3d1df..08bceafe65 100644 --- a/controlplane/src/bin/deactivate-org.ts +++ b/controlplane/src/bin/deactivate-org.ts @@ -1,6 +1,6 @@ import process from 'node:process'; import { pino } from 'pino'; -import { DeactivateOrganizationQueue } from '../core/workers/DeactivateOrganizationWorker.js'; +import { DeactivateOrganizationQueue } from '../core/workers/DeactivateOrganization.js'; import { createRedisConnections } from '../core/plugins/redis.js'; import { getConfig } from './get-config.js'; diff --git a/controlplane/src/bin/delete-user.ts b/controlplane/src/bin/delete-user.ts index 292a51772e..178c848d8e 100644 --- a/controlplane/src/bin/delete-user.ts +++ b/controlplane/src/bin/delete-user.ts @@ -1,7 +1,7 @@ import process from 'node:process'; import { pino } from 'pino'; import { createRedisConnections } from '../core/plugins/redis.js'; -import { DeleteUserQueue } from '../core/workers/DeleteUserQueue.js'; +import { DeleteUserQueue } from '../core/workers/DeleteUser.js'; import { getConfig } from './get-config.js'; const { redis } = getConfig(); diff --git a/controlplane/src/bin/reactivate-org.ts b/controlplane/src/bin/reactivate-org.ts index 787354fc31..cc03ebc567 100644 --- a/controlplane/src/bin/reactivate-org.ts +++ b/controlplane/src/bin/reactivate-org.ts @@ -1,7 +1,7 @@ import process from 'node:process'; import { pino } from 'pino'; import { createRedisConnections } from '../core/plugins/redis.js'; -import { ReactivateOrganizationQueue } from '../core/workers/ReactivateOrganizationWorker.js'; +import { ReactivateOrganizationQueue } from '../core/workers/ReactivateOrganization.js'; import { getConfig } from './get-config.js'; const { organizationSlug, redis } = getConfig(); diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index fcb3d2f145..462f67c21b 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -36,17 +36,17 @@ import { Authorization } from './services/Authorization.js'; import { BillingRepository } from './repositories/BillingRepository.js'; import { BillingService } from './services/BillingService.js'; import { UserRepository } from './repositories/UserRepository.js'; -import { AIGraphReadmeQueue, AIGraphReadmeWorker } from './workers/AIGraphReadmeWorker.js'; +import { AIGraphReadmeQueue, AIGraphReadmeWorker } from './workers/AIGraphReadme.js'; import { fastifyLoggerId, createS3ClientConfig, extractS3BucketName } from './util.js'; import { ApiKeyRepository } from './repositories/ApiKeyRepository.js'; -import { DeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; +import { DeleteOrganizationWorker, DeleteOrganizationQueue } from './workers/DeleteOrganization.js'; import { DeleteOrganizationAuditLogsWorker, DeleteOrganizationAuditLogsQueue, -} from './workers/DeleteOrganizationAuditLogsWorker.js'; -import { DeactivateOrganizationWorker, DeactivateOrganizationQueue } from './workers/DeactivateOrganizationWorker.js'; -import { DeleteUserWorker, DeleteUserQueue } from './workers/DeleteUserQueue.js'; -import { ReactivateOrganizationWorker, ReactivateOrganizationQueue } from './workers/ReactivateOrganizationWorker.js'; +} from './workers/DeleteOrganizationAuditLogs.js'; +import { DeactivateOrganizationWorker, DeactivateOrganizationQueue } from './workers/DeactivateOrganization.js'; +import { DeleteUserWorker, DeleteUserQueue } from './workers/DeleteUser.js'; +import { ReactivateOrganizationWorker, ReactivateOrganizationQueue } from './workers/ReactivateOrganization.js'; export interface BuildConfig { logger: LoggerOptions; diff --git a/controlplane/src/core/repositories/OrganizationRepository.ts b/controlplane/src/core/repositories/OrganizationRepository.ts index 09144015a3..b7c9e8d9c5 100644 --- a/controlplane/src/core/repositories/OrganizationRepository.ts +++ b/controlplane/src/core/repositories/OrganizationRepository.ts @@ -34,10 +34,10 @@ import { WebhooksConfigDTO, } from '../../types/index.js'; import Keycloak from '../services/Keycloak.js'; -import { DeleteOrganizationQueue } from '../workers/DeleteOrganizationWorker.js'; +import { DeleteOrganizationQueue } from '../workers/DeleteOrganization.js'; import { BlobStorage } from '../blobstorage/index.js'; import { delayForManualOrgDeletionInDays, delayForOrgAuditLogsDeletionInDays } from '../constants.js'; -import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogsWorker.js'; +import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogs.js'; import { RBACEvaluator } from '../services/RBACEvaluator.js'; import { BillingRepository } from './BillingRepository.js'; import { FederatedGraphRepository } from './FederatedGraphRepository.js'; diff --git a/controlplane/src/core/repositories/UserRepository.ts b/controlplane/src/core/repositories/UserRepository.ts index 1c8bbae8ea..277d1fb80b 100644 --- a/controlplane/src/core/repositories/UserRepository.ts +++ b/controlplane/src/core/repositories/UserRepository.ts @@ -7,7 +7,7 @@ import { UserDTO } from '../../types/index.js'; import { BlobStorage } from '../blobstorage/index.js'; import Keycloak from '../services/Keycloak.js'; import OidcProvider from '../services/OidcProvider.js'; -import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogsWorker.js'; +import { DeleteOrganizationAuditLogsQueue } from '../workers/DeleteOrganizationAuditLogs.js'; import { BillingRepository } from './BillingRepository.js'; import { OidcRepository } from './OidcRepository.js'; import { OrganizationRepository } from './OrganizationRepository.js'; diff --git a/controlplane/src/core/routes.ts b/controlplane/src/core/routes.ts index d08fe04f42..f244a64a57 100644 --- a/controlplane/src/core/routes.ts +++ b/controlplane/src/core/routes.ts @@ -15,12 +15,12 @@ import { IPlatformWebhookService } from './webhooks/PlatformWebhookService.js'; import { BlobStorage } from './blobstorage/index.js'; import Mailer from './services/Mailer.js'; import { Authorization } from './services/Authorization.js'; -import { AIGraphReadmeQueue } from './workers/AIGraphReadmeWorker.js'; -import { DeleteOrganizationQueue } from './workers/DeleteOrganizationWorker.js'; -import { DeactivateOrganizationQueue } from './workers/DeactivateOrganizationWorker.js'; -import { DeleteUserQueue } from './workers/DeleteUserQueue.js'; -import { ReactivateOrganizationQueue } from './workers/ReactivateOrganizationWorker.js'; -import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogsWorker.js'; +import { AIGraphReadmeQueue } from './workers/AIGraphReadme.js'; +import { DeleteOrganizationQueue } from './workers/DeleteOrganization.js'; +import { DeactivateOrganizationQueue } from './workers/DeactivateOrganization.js'; +import { DeleteUserQueue } from './workers/DeleteUser.js'; +import { ReactivateOrganizationQueue } from './workers/ReactivateOrganization.js'; +import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogs.js'; export interface RouterOptions { db: PostgresJsDatabase; diff --git a/controlplane/src/core/workers/AIGraphReadmeWorker.ts b/controlplane/src/core/workers/AIGraphReadme.ts similarity index 100% rename from controlplane/src/core/workers/AIGraphReadmeWorker.ts rename to controlplane/src/core/workers/AIGraphReadme.ts diff --git a/controlplane/src/core/workers/CacheWarmerWorker.ts b/controlplane/src/core/workers/CacheWarmer.ts similarity index 100% rename from controlplane/src/core/workers/CacheWarmerWorker.ts rename to controlplane/src/core/workers/CacheWarmer.ts diff --git a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts b/controlplane/src/core/workers/DeactivateOrganization.ts similarity index 97% rename from controlplane/src/core/workers/DeactivateOrganizationWorker.ts rename to controlplane/src/core/workers/DeactivateOrganization.ts index 4cd40b0916..1e7800b56a 100644 --- a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeactivateOrganization.ts @@ -4,7 +4,7 @@ import pino from 'pino'; import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import Keycloak from '../services/Keycloak.js'; -import { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; +import { DeleteOrganizationQueue } from './DeleteOrganization.js'; import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.deactivate'; diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganization.ts similarity index 99% rename from controlplane/src/core/workers/DeleteOrganizationWorker.ts rename to controlplane/src/core/workers/DeleteOrganization.ts index 54105a86bd..43bea50e95 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganization.ts @@ -7,7 +7,7 @@ import Keycloak from '../services/Keycloak.js'; import { OidcRepository } from '../repositories/OidcRepository.js'; import OidcProvider from '../services/OidcProvider.js'; import { BlobStorage } from '../blobstorage/index.js'; -import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogsWorker.js'; +import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.delete'; diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts similarity index 100% rename from controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts rename to controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts diff --git a/controlplane/src/core/workers/DeleteUserQueue.ts b/controlplane/src/core/workers/DeleteUser.ts similarity index 99% rename from controlplane/src/core/workers/DeleteUserQueue.ts rename to controlplane/src/core/workers/DeleteUser.ts index d73c643978..34d0238a1e 100644 --- a/controlplane/src/core/workers/DeleteUserQueue.ts +++ b/controlplane/src/core/workers/DeleteUser.ts @@ -9,7 +9,7 @@ import { UserRepository } from '../repositories/UserRepository.js'; import Keycloak from '../services/Keycloak.js'; import { PlatformWebhookService } from '../webhooks/PlatformWebhookService.js'; import { BaseQueue, BaseWorker } from './base/index.js'; -import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogsWorker.js'; +import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; const QueueName = 'user.delete'; const WorkerName = 'DeleteUserWorker'; diff --git a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts b/controlplane/src/core/workers/ReactivateOrganization.ts similarity index 97% rename from controlplane/src/core/workers/ReactivateOrganizationWorker.ts rename to controlplane/src/core/workers/ReactivateOrganization.ts index 9228d3deb7..a83cdbe7ed 100644 --- a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/ReactivateOrganization.ts @@ -3,7 +3,7 @@ import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; -import { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; +import { DeleteOrganizationQueue } from './DeleteOrganization.js'; import { BaseQueue, BaseWorker } from './base/index.js'; const QueueName = 'organization.reactivate'; From 25f113d2cff62ec4c98ab739d83f0a2a93300f18 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Mon, 7 Jul 2025 10:41:38 +0600 Subject: [PATCH 08/11] fix: throw error after failed audit log deletion for better error handling --- controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts index d211366b1a..3edf6d726c 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts @@ -56,6 +56,7 @@ export class DeleteOrganizationAuditLogsWorker extends BaseWorker Date: Mon, 7 Jul 2025 10:52:06 +0600 Subject: [PATCH 09/11] fix: remove redundant async await from worker handler --- controlplane/src/core/workers/base/Worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/src/core/workers/base/Worker.ts b/controlplane/src/core/workers/base/Worker.ts index af0a41b2ff..5dc0f15deb 100644 --- a/controlplane/src/core/workers/base/Worker.ts +++ b/controlplane/src/core/workers/base/Worker.ts @@ -14,7 +14,7 @@ export abstract class BaseWorker { } public create(): Worker { - const worker = new Worker(this.queueName, async (job) => await this.handler(job), this.options); + const worker = new Worker(this.queueName, (job) => this.handler(job), this.options); worker.on('stalled', (jobId) => { this.logger.warn({ jobId }, `Job stalled [Worker: ${this.name}]`); From fd8310f52b2155e0821e672199ecac81a2baef44 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Tue, 8 Jul 2025 11:39:09 +0600 Subject: [PATCH 10/11] fix: replace PostgresJsDatabase with DB type in worker constructors for consistency --- controlplane/src/core/workers/AIGraphReadme.ts | 5 ++--- controlplane/src/core/workers/CacheWarmer.ts | 5 ++--- controlplane/src/core/workers/DeactivateOrganization.ts | 5 ++--- controlplane/src/core/workers/DeleteOrganization.ts | 5 ++--- controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts | 5 ++--- controlplane/src/core/workers/DeleteUser.ts | 5 ++--- controlplane/src/core/workers/ReactivateOrganization.ts | 5 ++--- 7 files changed, 14 insertions(+), 21 deletions(-) diff --git a/controlplane/src/core/workers/AIGraphReadme.ts b/controlplane/src/core/workers/AIGraphReadme.ts index a71fb30af2..1cde9a150e 100644 --- a/controlplane/src/core/workers/AIGraphReadme.ts +++ b/controlplane/src/core/workers/AIGraphReadme.ts @@ -1,11 +1,10 @@ -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import { ConnectionOptions, Job } from 'bullmq'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { OpenAIGraphql } from '../openai-graphql/index.js'; import { SubgraphRepository } from '../repositories/SubgraphRepository.js'; import { FederatedGraphRepository } from '../repositories/FederatedGraphRepository.js'; import { BaseQueue, BaseWorker } from './base/index.js'; +import { DB } from 'src/db/index.js'; const QueueName = 'ai.graph-readme-generator'; const WorkerName = 'AIGraphReadmeWorker'; @@ -61,7 +60,7 @@ export class AIGraphReadmeWorker extends BaseWorker { constructor( private input: { redisConnection: ConnectionOptions; - db: PostgresJsDatabase; + db: DB; logger: pino.Logger; openAiApiKey: string; }, diff --git a/controlplane/src/core/workers/CacheWarmer.ts b/controlplane/src/core/workers/CacheWarmer.ts index 9b080f3572..bee33764c2 100644 --- a/controlplane/src/core/workers/CacheWarmer.ts +++ b/controlplane/src/core/workers/CacheWarmer.ts @@ -1,12 +1,11 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { BlobStorage } from '../blobstorage/index.js'; import { ClickHouseClient } from '../clickhouse/index.js'; import { S3RouterConfigMetadata } from '../composition/composer.js'; import { CacheWarmerRepository } from '../repositories/CacheWarmerRepository.js'; import { BaseQueue, BaseWorker } from './base/index.js'; +import { DB } from 'src/db/index.js'; const QueueName = 'cache.warmer'; const WorkerName = 'CacheWarmerWorker'; @@ -50,7 +49,7 @@ export class CacheWarmerWorker extends BaseWorker { constructor( private input: { redisConnection: ConnectionOptions; - db: PostgresJsDatabase; + db: DB; logger: pino.Logger; chClient: ClickHouseClient | undefined; blobStorage: BlobStorage; diff --git a/controlplane/src/core/workers/DeactivateOrganization.ts b/controlplane/src/core/workers/DeactivateOrganization.ts index 1e7800b56a..f57b39e734 100644 --- a/controlplane/src/core/workers/DeactivateOrganization.ts +++ b/controlplane/src/core/workers/DeactivateOrganization.ts @@ -1,11 +1,10 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import Keycloak from '../services/Keycloak.js'; import { DeleteOrganizationQueue } from './DeleteOrganization.js'; import { BaseQueue, BaseWorker } from './base/index.js'; +import { DB } from 'src/db/index.js'; const QueueName = 'organization.deactivate'; const WorkerName = 'DeactivateOrganizationWorker'; @@ -42,7 +41,7 @@ export class DeactivateOrganizationWorker extends BaseWorker; + db: DB; logger: pino.Logger; keycloakClient: Keycloak; keycloakRealm: string; diff --git a/controlplane/src/core/workers/DeleteOrganization.ts b/controlplane/src/core/workers/DeleteOrganization.ts index 43bea50e95..3adfa52287 100644 --- a/controlplane/src/core/workers/DeleteOrganization.ts +++ b/controlplane/src/core/workers/DeleteOrganization.ts @@ -1,7 +1,5 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import Keycloak from '../services/Keycloak.js'; import { OidcRepository } from '../repositories/OidcRepository.js'; @@ -9,6 +7,7 @@ import OidcProvider from '../services/OidcProvider.js'; import { BlobStorage } from '../blobstorage/index.js'; import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; import { BaseQueue, BaseWorker } from './base/index.js'; +import { DB } from 'src/db/index.js'; const QueueName = 'organization.delete'; const WorkerName = 'DeleteOrganizationWorker'; @@ -42,7 +41,7 @@ export class DeleteOrganizationWorker extends BaseWorker; + db: DB; logger: pino.Logger; keycloakClient: Keycloak; keycloakRealm: string; diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts index 3edf6d726c..9ccddda3de 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts @@ -1,9 +1,8 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { AuditLogRepository } from '../repositories/AuditLogRepository.js'; import { BaseQueue, BaseWorker } from './base/index.js'; +import { DB } from 'src/db/index.js'; const QueueName = 'organization.delete_audit_logs'; const WorkerName = 'DeleteOrganizationAuditLogsWorker'; @@ -37,7 +36,7 @@ export class DeleteOrganizationAuditLogsWorker extends BaseWorker; + db: DB; logger: pino.Logger; }, ) { diff --git a/controlplane/src/core/workers/DeleteUser.ts b/controlplane/src/core/workers/DeleteUser.ts index 34d0238a1e..2364ee0380 100644 --- a/controlplane/src/core/workers/DeleteUser.ts +++ b/controlplane/src/core/workers/DeleteUser.ts @@ -1,8 +1,6 @@ import { PlatformEventName } from '@wundergraph/cosmo-connect/dist/notifications/events_pb'; import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { BlobStorage } from '../blobstorage/index.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import { UserRepository } from '../repositories/UserRepository.js'; @@ -10,6 +8,7 @@ import Keycloak from '../services/Keycloak.js'; import { PlatformWebhookService } from '../webhooks/PlatformWebhookService.js'; import { BaseQueue, BaseWorker } from './base/index.js'; import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; +import { DB } from 'src/db/index.js'; const QueueName = 'user.delete'; const WorkerName = 'DeleteUserWorker'; @@ -43,7 +42,7 @@ export class DeleteUserQueue extends BaseQueue { export class DeleteUserWorker extends BaseWorker { constructor( private input: { - db: PostgresJsDatabase; + db: DB; redisConnection: ConnectionOptions; logger: pino.Logger; keycloakClient: Keycloak; diff --git a/controlplane/src/core/workers/ReactivateOrganization.ts b/controlplane/src/core/workers/ReactivateOrganization.ts index a83cdbe7ed..f24761aa6e 100644 --- a/controlplane/src/core/workers/ReactivateOrganization.ts +++ b/controlplane/src/core/workers/ReactivateOrganization.ts @@ -1,10 +1,9 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; -import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; import pino from 'pino'; -import * as schema from '../../db/schema.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import { DeleteOrganizationQueue } from './DeleteOrganization.js'; import { BaseQueue, BaseWorker } from './base/index.js'; +import { DB } from 'src/db/index.js'; const QueueName = 'organization.reactivate'; const WorkerName = 'ReactivateOrganizationWorker'; @@ -39,7 +38,7 @@ export class ReactivateOrganizationWorker extends BaseWorker; + db: DB; logger: pino.Logger; deleteOrganizationQueue: DeleteOrganizationQueue; }, From 7eb4dd5b819d50540190d0d0fb74228e6e0a9790 Mon Sep 17 00:00:00 2001 From: fahimfaisaal Date: Tue, 8 Jul 2025 11:46:14 +0600 Subject: [PATCH 11/11] fix: used outer path --- controlplane/src/core/workers/AIGraphReadme.ts | 2 +- controlplane/src/core/workers/CacheWarmer.ts | 2 +- controlplane/src/core/workers/DeactivateOrganization.ts | 2 +- controlplane/src/core/workers/DeleteOrganization.ts | 2 +- controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts | 2 +- controlplane/src/core/workers/DeleteUser.ts | 2 +- controlplane/src/core/workers/ReactivateOrganization.ts | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/controlplane/src/core/workers/AIGraphReadme.ts b/controlplane/src/core/workers/AIGraphReadme.ts index 1cde9a150e..7b4a85a910 100644 --- a/controlplane/src/core/workers/AIGraphReadme.ts +++ b/controlplane/src/core/workers/AIGraphReadme.ts @@ -3,8 +3,8 @@ import pino from 'pino'; import { OpenAIGraphql } from '../openai-graphql/index.js'; import { SubgraphRepository } from '../repositories/SubgraphRepository.js'; import { FederatedGraphRepository } from '../repositories/FederatedGraphRepository.js'; +import { DB } from '../../db/index.js'; import { BaseQueue, BaseWorker } from './base/index.js'; -import { DB } from 'src/db/index.js'; const QueueName = 'ai.graph-readme-generator'; const WorkerName = 'AIGraphReadmeWorker'; diff --git a/controlplane/src/core/workers/CacheWarmer.ts b/controlplane/src/core/workers/CacheWarmer.ts index bee33764c2..9dc1bab1ae 100644 --- a/controlplane/src/core/workers/CacheWarmer.ts +++ b/controlplane/src/core/workers/CacheWarmer.ts @@ -4,8 +4,8 @@ import { BlobStorage } from '../blobstorage/index.js'; import { ClickHouseClient } from '../clickhouse/index.js'; import { S3RouterConfigMetadata } from '../composition/composer.js'; import { CacheWarmerRepository } from '../repositories/CacheWarmerRepository.js'; +import { DB } from '../../db/index.js'; import { BaseQueue, BaseWorker } from './base/index.js'; -import { DB } from 'src/db/index.js'; const QueueName = 'cache.warmer'; const WorkerName = 'CacheWarmerWorker'; diff --git a/controlplane/src/core/workers/DeactivateOrganization.ts b/controlplane/src/core/workers/DeactivateOrganization.ts index f57b39e734..442f29cf8a 100644 --- a/controlplane/src/core/workers/DeactivateOrganization.ts +++ b/controlplane/src/core/workers/DeactivateOrganization.ts @@ -2,9 +2,9 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import pino from 'pino'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import Keycloak from '../services/Keycloak.js'; +import { DB } from '../../db/index.js'; import { DeleteOrganizationQueue } from './DeleteOrganization.js'; import { BaseQueue, BaseWorker } from './base/index.js'; -import { DB } from 'src/db/index.js'; const QueueName = 'organization.deactivate'; const WorkerName = 'DeactivateOrganizationWorker'; diff --git a/controlplane/src/core/workers/DeleteOrganization.ts b/controlplane/src/core/workers/DeleteOrganization.ts index 3adfa52287..85aba824d2 100644 --- a/controlplane/src/core/workers/DeleteOrganization.ts +++ b/controlplane/src/core/workers/DeleteOrganization.ts @@ -5,9 +5,9 @@ import Keycloak from '../services/Keycloak.js'; import { OidcRepository } from '../repositories/OidcRepository.js'; import OidcProvider from '../services/OidcProvider.js'; import { BlobStorage } from '../blobstorage/index.js'; +import { DB } from '../../db/index.js'; import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; import { BaseQueue, BaseWorker } from './base/index.js'; -import { DB } from 'src/db/index.js'; const QueueName = 'organization.delete'; const WorkerName = 'DeleteOrganizationWorker'; diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts index 9ccddda3de..44869a08a3 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogs.ts @@ -1,8 +1,8 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import pino from 'pino'; import { AuditLogRepository } from '../repositories/AuditLogRepository.js'; +import { DB } from '../../db/index.js'; import { BaseQueue, BaseWorker } from './base/index.js'; -import { DB } from 'src/db/index.js'; const QueueName = 'organization.delete_audit_logs'; const WorkerName = 'DeleteOrganizationAuditLogsWorker'; diff --git a/controlplane/src/core/workers/DeleteUser.ts b/controlplane/src/core/workers/DeleteUser.ts index 2364ee0380..8d9ec37165 100644 --- a/controlplane/src/core/workers/DeleteUser.ts +++ b/controlplane/src/core/workers/DeleteUser.ts @@ -6,9 +6,9 @@ import { OrganizationRepository } from '../repositories/OrganizationRepository.j import { UserRepository } from '../repositories/UserRepository.js'; import Keycloak from '../services/Keycloak.js'; import { PlatformWebhookService } from '../webhooks/PlatformWebhookService.js'; +import { DB } from '../../db/index.js'; import { BaseQueue, BaseWorker } from './base/index.js'; import { DeleteOrganizationAuditLogsQueue } from './DeleteOrganizationAuditLogs.js'; -import { DB } from 'src/db/index.js'; const QueueName = 'user.delete'; const WorkerName = 'DeleteUserWorker'; diff --git a/controlplane/src/core/workers/ReactivateOrganization.ts b/controlplane/src/core/workers/ReactivateOrganization.ts index f24761aa6e..853c416310 100644 --- a/controlplane/src/core/workers/ReactivateOrganization.ts +++ b/controlplane/src/core/workers/ReactivateOrganization.ts @@ -1,9 +1,9 @@ import { ConnectionOptions, Job, JobsOptions } from 'bullmq'; import pino from 'pino'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; +import { DB } from '../../db/index.js'; import { DeleteOrganizationQueue } from './DeleteOrganization.js'; import { BaseQueue, BaseWorker } from './base/index.js'; -import { DB } from 'src/db/index.js'; const QueueName = 'organization.reactivate'; const WorkerName = 'ReactivateOrganizationWorker';