diff --git a/controlplane/package.json b/controlplane/package.json index 093c623d75..846569c21c 100644 --- a/controlplane/package.json +++ b/controlplane/package.json @@ -62,7 +62,7 @@ "@wundergraph/protographic": "workspace:*", "axios": "^1.12.2", "axios-retry": "^4.5.0", - "bullmq": "^5.10.0", + "bullmq": "5.66.4", "cookie": "^0.7.2", "date-fns": "^3.6.0", "dotenv": "^16.4.5", @@ -75,7 +75,7 @@ "fastify-plugin": "^4.5.1", "fastify-raw-body": "^4.3.0", "graphql": "^16.9.0", - "ioredis": "^5.4.1", + "ioredis": "5.8.2", "isomorphic-dompurify": "^2.33.0", "jose": "^5.2.4", "lodash": "^4.17.21", diff --git a/controlplane/src/core/bufservices/organization/deleteOrganization.ts b/controlplane/src/core/bufservices/organization/deleteOrganization.ts index f9b36b759f..81a9881f96 100644 --- a/controlplane/src/core/bufservices/organization/deleteOrganization.ts +++ b/controlplane/src/core/bufservices/organization/deleteOrganization.ts @@ -25,116 +25,118 @@ export function deleteOrganization( const authContext = await opts.authenticator.authenticate(ctx.requestHeader); logger = enrichLogger(ctx, logger, authContext); - const orgRepo = new OrganizationRepository(logger, opts.db, opts.billingDefaultPlanId); - const auditLogRepo = new AuditLogRepository(opts.db); - const billingRepo = new BillingRepository(opts.db); - - const memberships = await orgRepo.memberships({ userId: authContext.userId }); - const orgCount = memberships.length; - - const org = await orgRepo.byId(authContext.organizationId); - if (!org) { - return { - response: { - code: EnumStatusCode.ERR_NOT_FOUND, - details: `Organization not found`, - }, - }; - } - - const subscription = await billingRepo.getActiveSubscriptionOfOrganization(org.id); - if (subscription?.id) { - return { - response: { - code: EnumStatusCode.ERR, - details: 'The organization subscription must be cancelled before the organization is deleted.', - }, - }; - } - - const user = await orgRepo.getOrganizationMember({ - organizationID: authContext.organizationId, - userID: authContext.userId || req.userID, - }); + return opts.db.transaction(async (tx) => { + const orgRepo = new OrganizationRepository(logger, tx, opts.billingDefaultPlanId); + const auditLogRepo = new AuditLogRepository(tx); + const billingRepo = new BillingRepository(tx); + + const memberships = await orgRepo.memberships({ userId: authContext.userId }); + const orgCount = memberships.length; + + const org = await orgRepo.byId(authContext.organizationId); + if (!org) { + return { + response: { + code: EnumStatusCode.ERR_NOT_FOUND, + details: `Organization not found`, + }, + }; + } + + const subscription = await billingRepo.getActiveSubscriptionOfOrganization(org.id); + if (subscription?.id) { + return { + response: { + code: EnumStatusCode.ERR, + details: 'The organization subscription must be cancelled before the organization is deleted.', + }, + }; + } + + const user = await orgRepo.getOrganizationMember({ + organizationID: authContext.organizationId, + userID: authContext.userId || req.userID, + }); - if (!user) { - return { - response: { - code: EnumStatusCode.ERR, - details: 'User is not a part of this organization.', - }, - }; - } + if (!user) { + return { + response: { + code: EnumStatusCode.ERR, + details: 'User is not a part of this organization.', + }, + }; + } + + // non admins cannot delete the organization + if (!user.rbac.isOrganizationAdmin) { + throw new UnauthorizedError(); + } + + // Minimum one organization is required for a user + if (orgCount <= 1) { + return { + response: { + code: EnumStatusCode.ERR_NOT_FOUND, + details: 'Minimum one organization is required for a user.', + }, + }; + } + + // If the organization deletion have already been queued we shouldn't do it again + if (org.deletion) { + return { + response: { + code: EnumStatusCode.OK, + }, + }; + } + + const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); + const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin); + + const now = new Date(); + const oneMonthFromNow = addDays(now, delayForManualOrgDeletionInDays); + + await orgRepo.queueOrganizationDeletion({ + organizationId: org.id, + queuedBy: authContext.userDisplayName, + deleteOrganizationQueue: opts.queues.deleteOrganizationQueue, + }); - // non admins cannot delete the organization - if (!user.rbac.isOrganizationAdmin) { - throw new UnauthorizedError(); - } + await auditLogRepo.addAuditLog({ + organizationId: org.id, + organizationSlug: authContext.organizationSlug, + auditAction: 'organization.deletion_queued', + action: 'queued_deletion', + actorId: authContext.userId, + auditableType: 'organization', + auditableDisplayName: org.name, + actorDisplayName: authContext.userDisplayName, + apiKeyName: authContext.apiKeyName, + actorType: authContext.auth === 'api_key' ? 'api_key' : 'user', + }); - // Minimum one organization is required for a user - if (orgCount <= 1) { - return { - response: { - code: EnumStatusCode.ERR_NOT_FOUND, - details: 'Minimum one organization is required for a user.', - }, - }; - } + if (opts.mailerClient && orgAdmins.length > 0) { + const intl = Intl.DateTimeFormat(undefined, { + dateStyle: 'medium', + timeStyle: 'short', + }); + + await opts.mailerClient.sendOrganizationDeletionQueuedEmail({ + receiverEmails: orgAdmins.map((m) => m.email), + organizationName: org.name, + userDisplayName: authContext.userDisplayName, + queuedOnDate: intl.format(now), + deletionDate: intl.format(oneMonthFromNow), + restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`, + }); + } - // If the organization deletion have already been queued we shouldn't do it again - if (org.deletion) { return { response: { code: EnumStatusCode.OK, }, }; - } - - const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); - const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin); - - const now = new Date(); - const oneMonthFromNow = addDays(now, delayForManualOrgDeletionInDays); - - await orgRepo.queueOrganizationDeletion({ - organizationId: org.id, - queuedBy: authContext.userDisplayName, - deleteOrganizationQueue: opts.queues.deleteOrganizationQueue, - }); - - await auditLogRepo.addAuditLog({ - organizationId: org.id, - organizationSlug: authContext.organizationSlug, - auditAction: 'organization.deletion_queued', - action: 'queued_deletion', - actorId: authContext.userId, - auditableType: 'organization', - auditableDisplayName: org.name, - actorDisplayName: authContext.userDisplayName, - apiKeyName: authContext.apiKeyName, - actorType: authContext.auth === 'api_key' ? 'api_key' : 'user', }); - - if (opts.mailerClient && orgAdmins.length > 0) { - const intl = Intl.DateTimeFormat(undefined, { - dateStyle: 'medium', - timeStyle: 'short', - }); - - await opts.mailerClient.sendOrganizationDeletionQueuedEmail({ - receiverEmails: orgAdmins.map((m) => m.email), - organizationName: org.name, - userDisplayName: authContext.userDisplayName, - queuedOnDate: intl.format(now), - deletionDate: intl.format(oneMonthFromNow), - restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`, - }); - } - - return { - response: { - code: EnumStatusCode.OK, - }, - }; }); } diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index a26f9e744f..b37deb8169 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -10,7 +10,7 @@ import { App } from 'octokit'; import { Worker } from 'bullmq'; import routes from './routes.js'; import fastifyHealth from './plugins/health.js'; -import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js'; +import fastifyMetrics from './plugins/metrics.js'; import fastifyDatabase from './plugins/database.js'; import fastifyClickHouse from './plugins/clickhouse.js'; import fastifyRedis from './plugins/redis.js'; @@ -53,6 +53,14 @@ import { createReactivateOrganizationWorker, ReactivateOrganizationQueue, } from './workers/ReactivateOrganizationWorker.js'; +import { + QueueInactiveOrganizationsDeletionQueue, + createQueueInactiveOrganizationsDeletionWorker, +} from './workers/QueueInactiveOrganizationsDeletionWorker.js'; +import { + createNotifyOrganizationDeletionQueuedWorker, + NotifyOrganizationDeletionQueuedQueue, +} from './workers/NotifyOrganizationDeletionQueuedWorker.js'; export interface BuildConfig { logger: LoggerOptions; @@ -394,6 +402,37 @@ export default async function build(opts: BuildConfig) { }), ); + const notifyOrganizationDeletionQueuedQueue = new NotifyOrganizationDeletionQueuedQueue( + logger, + fastify.redisForQueue, + ); + bullWorkers.push( + createNotifyOrganizationDeletionQueuedWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + mailer: mailerClient, + }), + ); + + const queueInactiveOrganizationsDeletionQueue = new QueueInactiveOrganizationsDeletionQueue( + logger, + fastify.redisForQueue, + ); + bullWorkers.push( + createQueueInactiveOrganizationsDeletionWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + realm: opts.keycloak.realm, + keycloak: keycloakClient, + deleteOrganizationQueue, + notifyOrganizationDeletionQueuedQueue, + logger, + }), + ); + + await queueInactiveOrganizationsDeletionQueue.scheduleJob(); + // required to verify webhook payloads await fastify.register(import('fastify-raw-body'), { field: 'rawBody', diff --git a/controlplane/src/core/repositories/OrganizationRepository.ts b/controlplane/src/core/repositories/OrganizationRepository.ts index a374b860fd..c355b682f6 100644 --- a/controlplane/src/core/repositories/OrganizationRepository.ts +++ b/controlplane/src/core/repositories/OrganizationRepository.ts @@ -939,33 +939,32 @@ export class OrganizationRepository { return result[0]; } - public queueOrganizationDeletion(input: { + public async queueOrganizationDeletion(input: { organizationId: string; queuedBy?: string; deleteOrganizationQueue: DeleteOrganizationQueue; + deleteDelayInDays?: number; }) { - return this.db.transaction(async (tx) => { - const now = new Date(); - await tx - .update(schema.organizations) - .set({ - queuedForDeletionAt: now, - queuedForDeletionBy: input.queuedBy, - }) - .where(eq(schema.organizations.id, input.organizationId)); + const now = new Date(); + await this.db + .update(schema.organizations) + .set({ + queuedForDeletionAt: now, + queuedForDeletionBy: input.queuedBy, + }) + .where(eq(schema.organizations.id, input.organizationId)); - const deleteAt = addDays(now, delayForManualOrgDeletionInDays); - const delay = Number(deleteAt) - Number(now); + const deleteAt = addDays(now, input.deleteDelayInDays || delayForManualOrgDeletionInDays); + const delay = Number(deleteAt) - Number(now); - return await input.deleteOrganizationQueue.addJob( - { - organizationId: input.organizationId, - }, - { - delay, - }, - ); - }); + return await input.deleteOrganizationQueue.addJob( + { + organizationId: input.organizationId, + }, + { + delay, + }, + ); } public restoreOrganization(input: { organizationId: string; deleteOrganizationQueue: DeleteOrganizationQueue }) { diff --git a/controlplane/src/core/workers/CacheWarmerWorker.ts b/controlplane/src/core/workers/CacheWarmerWorker.ts index cb170f77e0..6e5edfd6ec 100644 --- a/controlplane/src/core/workers/CacheWarmerWorker.ts +++ b/controlplane/src/core/workers/CacheWarmerWorker.ts @@ -131,7 +131,7 @@ export const createCacheWarmerWorker = (input: { concurrency: 10, }); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts index d1ee93cb3e..b403ea7034 100644 --- a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts @@ -125,7 +125,7 @@ export const createDeactivateOrganizationWorker = (input: { }, ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts index 77465438c4..3f158bc01c 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts @@ -99,7 +99,7 @@ export const createDeleteOrganizationAuditLogsWorker = (input: { ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, 'Job stalled'); + log.warn({ jobId: job }, 'Job stalled'); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganizationWorker.ts index 2846b1a0d0..182ec6c76c 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationWorker.ts @@ -151,7 +151,7 @@ export const createDeleteOrganizationWorker = (input: { }, ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeleteUserQueue.ts b/controlplane/src/core/workers/DeleteUserQueue.ts index e03e29a35b..0b5634c5e5 100644 --- a/controlplane/src/core/workers/DeleteUserQueue.ts +++ b/controlplane/src/core/workers/DeleteUserQueue.ts @@ -140,7 +140,7 @@ export const createDeleteUserWorker = (input: { concurrency: 10, }); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts new file mode 100644 index 0000000000..953f791dcc --- /dev/null +++ b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts @@ -0,0 +1,139 @@ +import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import pino from 'pino'; +import * as schema from '../../db/schema.js'; +import Mailer from '../services/Mailer.js'; +import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; +import { IQueue, IWorker } from './Worker.js'; + +const QueueName = 'organization.notify-organization-deletion-queued-queue'; +const WorkerName = 'NotifyOrganizationDeletionQueuedQueue'; + +export interface NotifyOrganizationDeletionQueuedInput { + organizationId: string; + queuedAt: number; + deletesAt: number; +} + +export class NotifyOrganizationDeletionQueuedQueue implements IQueue { + private readonly queue: Queue; + private readonly logger: pino.Logger; + + constructor(log: pino.Logger, conn: ConnectionOptions) { + this.logger = log.child({ queue: QueueName }); + this.queue = new Queue(QueueName, { + connection: conn, + defaultJobOptions: { + removeOnComplete: { + age: 90 * 86_400, + }, + removeOnFail: { + age: 90 * 86_400, + }, + attempts: 6, + backoff: { + type: 'exponential', + delay: 112_000, + }, + }, + }); + + this.queue.on('error', (err) => { + this.logger.error(err, 'Queue error'); + }); + } + + public addJob(job: NotifyOrganizationDeletionQueuedInput, opts?: Omit) { + return this.queue.add(job.organizationId, job, { + ...opts, + jobId: job.organizationId, + }); + } + + public removeJob(job: NotifyOrganizationDeletionQueuedInput) { + return this.queue.remove(job.organizationId); + } + + public getJob(job: NotifyOrganizationDeletionQueuedInput) { + return this.queue.getJob(job.organizationId); + } +} + +class NotifyOrganizationDeletionQueuedWorker implements IWorker { + constructor( + private input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + logger: pino.Logger; + mailer: Mailer | undefined; + }, + ) { + this.input.logger = input.logger.child({ worker: WorkerName }); + } + + public async handler(job: Job) { + try { + if (!this.input.mailer) { + throw new Error('Mailer service not configured'); + } + + const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); + const org = await orgRepo.byId(job.data.organizationId); + if (!org) { + // The organization has already been deleted + return; + } + + const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); + const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin); + if (orgAdmins.length === 0) { + this.input.logger.warn({ organizationId: org.id }, 'No admins found for organization, skipping notification'); + return; + } + + const intl = Intl.DateTimeFormat(undefined, { + dateStyle: 'medium', + timeStyle: 'short', + }); + + await this.input.mailer.sendOrganizationDeletionQueuedEmail({ + receiverEmails: orgAdmins.map((m) => m.email), + organizationName: org.name, + userDisplayName: 'System', + queuedOnDate: intl.format(new Date(job.data.queuedAt)), + deletionDate: intl.format(new Date(job.data.deletesAt)), + restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`, + }); + } catch (err) { + this.input.logger.error( + { jobId: job.id, organizationId: job.data.organizationId, err }, + `Failed to send organization deletion queued notification`, + ); + throw err; + } + } +} + +export const createNotifyOrganizationDeletionQueuedWorker = (input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + logger: pino.Logger; + mailer: Mailer | undefined; +}) => { + const log = input.logger.child({ worker: WorkerName }); + const worker = new Worker( + QueueName, + (job) => new NotifyOrganizationDeletionQueuedWorker(input).handler(job), + { + connection: input.redisConnection, + concurrency: 100, + }, + ); + worker.on('stalled', (job) => { + log.warn({ jobId: job }, `Job stalled`); + }); + worker.on('error', (err) => { + log.error(err, 'Worker error'); + }); + return worker; +}; diff --git a/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts new file mode 100644 index 0000000000..4c5af664ee --- /dev/null +++ b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts @@ -0,0 +1,231 @@ +import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import pino from 'pino'; +import { addDays, startOfMonth, subDays } from 'date-fns'; +import { and, count, eq, gte, isNull, lt, or, sql } from 'drizzle-orm'; +import * as schema from '../../db/schema.js'; +import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; +import Keycloak from '../services/Keycloak.js'; +import { IQueue, IWorker } from './Worker.js'; +import type { NotifyOrganizationDeletionQueuedQueue } from './NotifyOrganizationDeletionQueuedWorker.js'; +import type { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; + +const QueueName = 'organization.queue-inactive-organizations-queue'; +const WorkerName = 'QueueInactiveOrganizationWorker'; + +// The number of days the organization needs to be inactive for before we consider it for deletion +const MIN_INACTIVITY_DAYS = 90; + +// How long should we wait before deleting the organization? +const DELAY_FOR_ORG_DELETION_IN_DAYS = 7; + +export interface QueueInactiveOrganizationsDeletionInput {} + +export class QueueInactiveOrganizationsDeletionQueue implements IQueue { + private readonly queue: Queue; + private readonly logger: pino.Logger; + + constructor(log: pino.Logger, conn: ConnectionOptions) { + this.logger = log.child({ queue: QueueName }); + this.queue = new Queue(QueueName, { + connection: conn, + defaultJobOptions: { + removeOnComplete: { + age: 90 * 86_400, + }, + removeOnFail: { + age: 90 * 86_400, + }, + attempts: 6, + backoff: { + type: 'exponential', + delay: 112_000, + }, + }, + }); + + this.queue.on('error', (err) => { + this.logger.error(err, 'Queue error'); + }); + } + + public addJob( + _job: QueueInactiveOrganizationsDeletionInput, + _opts?: Omit, + ): Promise> { + throw new Error('This method should not be called directly.'); + } + + public removeJob(_job: QueueInactiveOrganizationsDeletionInput) { + return Promise.resolve(0); + } + + public getJob(_job: QueueInactiveOrganizationsDeletionInput): Promise> { + throw new Error('This method should not be called directly.'); + } + + public scheduleJob() { + return this.queue.upsertJobScheduler( + WorkerName, + { + pattern: '0 0 0 1 * *', + }, + { + name: '', + data: {}, + opts: {}, + }, + ); + } +} + +class QueueInactiveOrganizationsDeletionWorker implements IWorker { + private readonly orgRepo: OrganizationRepository; + + constructor( + private input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + realm: string; + keycloak: Keycloak; + deleteOrganizationQueue: DeleteOrganizationQueue; + notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; + logger: pino.Logger; + }, + ) { + this.input.logger = input.logger.child({ worker: WorkerName }); + this.orgRepo = new OrganizationRepository(this.input.logger, this.input.db); + } + + public async handler(_job: Job) { + const now = new Date(); + const inactivityThreshold = startOfMonth(subDays(now, MIN_INACTIVITY_DAYS)); + const deletesAt = addDays(now, DELAY_FOR_ORG_DELETION_IN_DAYS); + + // Retrieve all the organizations that only have a single user + const orgsWithSingleUser = await this.retrieveOrganizationsWithSingleUser(inactivityThreshold); + if (orgsWithSingleUser.length === 0) { + this.input.logger.debug('No organizations with single user found'); + return; + } + + // Process all the organizations with a single user + await this.input.keycloak.authenticateClient(); + for (const org of orgsWithSingleUser) { + if (!org.userId) { + // Should never be the case but to prevent TypeScript from complaining, we still need to ensure + // that the value exists + continue; + } + + // First, we check whether the organization has had any activity registered in the audit logs in the + // last `MIN_INACTIVITY_DAYS` days + const auditLogs = await this.input.db + .select({ count: count() }) + .from(schema.auditLogs) + .where(and(eq(schema.auditLogs.organizationId, org.id), gte(schema.auditLogs.createdAt, inactivityThreshold))) + .execute(); + + if (auditLogs.length > 0 && auditLogs[0].count > 0) { + // The organization has had activity registered in the audit, at least once in the last `MIN_INACTIVITY_DAYS` days, + // so we don't need to consider it for deletion + continue; + } + + // If the organization hasn't had any activity, we should check the last time the user logged in + try { + const userSessions = await this.input.keycloak.client.users.listSessions({ + id: org.userId, + realm: this.input.realm, + }); + + const numberOfSessionsRecentlyActive = userSessions.filter( + (sess) => (sess.lastAccess || sess.start) && new Date(sess.lastAccess || sess.start!) >= inactivityThreshold, + ).length; + + if (numberOfSessionsRecentlyActive > 0) { + // The user has been active at least once in the last `MIN_INACTIVITY_DAYS` days, so we don't need + // to consider it for deletion + continue; + } + } catch (error) { + // Failed to fetch the user sessions, skip for now + this.input.logger.error(error, 'Failed to retrieve user sessions'); + continue; + } + + // It seems like the organization (and the user) hasn't been active recently, flag the organization for deletion + this.input.logger.info(`Queuing organization "${org.slug}" for deletion at ${deletesAt.toISOString()}`); + await this.queueForDeletion(org.id, now, deletesAt); + } + } + + private async queueForDeletion(orgId: string, queuedAt: Date, deletesAt: Date) { + // Enqueue the organization deletion job + await this.orgRepo.queueOrganizationDeletion({ + organizationId: orgId, + queuedBy: undefined, + deleteOrganizationQueue: this.input.deleteOrganizationQueue, + deleteDelayInDays: DELAY_FOR_ORG_DELETION_IN_DAYS, + }); + + // Queue the organization deletion notification job + await this.input.notifyOrganizationDeletionQueuedQueue.addJob({ + organizationId: orgId, + queuedAt: Number(queuedAt), + deletesAt: Number(deletesAt), + }); + } + + private retrieveOrganizationsWithSingleUser(createdBefore: Date) { + return this.input.db + .select({ + id: schema.organizations.id, + slug: schema.organizations.slug, + userId: schema.organizations.createdBy, + plan: schema.organizationBilling.plan, + }) + .from(schema.organizations) + .innerJoin(schema.organizationsMembers, eq(schema.organizationsMembers.organizationId, schema.organizations.id)) + .leftJoin(schema.organizationBilling, eq(schema.organizationBilling.organizationId, schema.organizations.id)) + .where( + and( + isNull(schema.organizations.queuedForDeletionAt), + eq(schema.organizations.isDeactivated, false), + lt(schema.organizations.createdAt, createdBefore), + or(isNull(schema.organizationBilling.plan), eq(schema.organizationBilling.plan, 'developer')), + ), + ) + .groupBy(schema.organizations.id, schema.organizationBilling.plan) + .having(sql`COUNT(${schema.organizationsMembers.id}) = 1`) + .execute(); + } +} + +export function createQueueInactiveOrganizationsDeletionWorker(input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + realm: string; + keycloak: Keycloak; + deleteOrganizationQueue: DeleteOrganizationQueue; + notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; + logger: pino.Logger; +}) { + const log = input.logger.child({ worker: WorkerName }); + const worker = new Worker( + QueueName, + (job) => new QueueInactiveOrganizationsDeletionWorker(input).handler(job), + { + connection: input.redisConnection, + concurrency: 10, + }, + ); + + worker.on('stalled', (job) => { + log.warn({ jobId: job }, `Job stalled`); + }); + worker.on('error', (err) => { + log.error(err, 'Worker error'); + }); + return worker; +} diff --git a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts index 24c20fdf1a..a6c5ffacbc 100644 --- a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts @@ -113,7 +113,7 @@ export const createReactivateOrganizationWorker = (input: { }, ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 37700ad9ff..f738ba81cc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -523,8 +523,8 @@ importers: specifier: ^4.5.0 version: 4.5.0(axios@1.12.2) bullmq: - specifier: ^5.10.0 - version: 5.10.0 + specifier: 5.66.4 + version: 5.66.4 cookie: specifier: ^0.7.2 version: 0.7.2 @@ -562,8 +562,8 @@ importers: specifier: 16.9.0 version: 16.9.0(patch_hash=hafdlc54qtxpqvetpefk646rly) ioredis: - specifier: ^5.4.1 - version: 5.4.1 + specifier: 5.8.2 + version: 5.8.2 isomorphic-dompurify: specifier: ^2.33.0 version: 2.33.0 @@ -4171,8 +4171,8 @@ packages: '@types/node': optional: true - '@ioredis/commands@1.2.0': - resolution: {integrity: sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==} + '@ioredis/commands@1.4.0': + resolution: {integrity: sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==} '@isaacs/cliui@8.0.2': resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} @@ -7847,9 +7847,6 @@ packages: '@types/node-fetch@2.6.11': resolution: {integrity: sha512-24xFj9R5+rfQJLRyM56qh+wnVSYhyXC2tkoBndtY0U+vubqNsYXGjufB2nn8Q6gt0LrARwL6UBtMCSVCwl4B1g==} - '@types/node@18.19.130': - resolution: {integrity: sha512-GRaXQx6jGfL8sKfaIDD6OupbIHBr9jv7Jnaml9tB7l4v068PAOXqfcujMMo5PhbIs6ggR1XODELqahT2R8v0fg==} - '@types/node@18.19.21': resolution: {integrity: sha512-2Q2NeB6BmiTFQi4DHBzncSoq/cJMLDdhPaAoJFnFCyD9a8VPZRf7a1GAwp1Edb7ROaZc5Jz/tnZyL6EsWMRaqw==} @@ -8605,8 +8602,8 @@ packages: builtins@5.0.1: resolution: {integrity: sha512-qwVpFEHNfhYJIzNRBvd2C1kyo6jz3ZSMPyyuR47OPdiKWlbYnZNyDWuyR175qDnAJLiCo5fBBqPb3RiXgWlkOQ==} - bullmq@5.10.0: - resolution: {integrity: sha512-bu8lGvgXLwMhBrKeE0ntH+IWBZFVISJBm5njYtPL8u9Qm3AekEONinKRftiIP66r/Y45ljxg9dp9EahoJ6L9wA==} + bullmq@5.66.4: + resolution: {integrity: sha512-y2VRk2z7d1YNI2JQDD7iThoD0X/0iZZ3VEp8lqT5s5U0XDl9CIjXp1LQgmE9EKy6ReHtzmYXS1f328PnUbZGtQ==} bun-types@1.2.12: resolution: {integrity: sha512-tvWMx5vPqbRXgE8WUZI94iS1xAYs8bkqESR9cxBB1Wi+urvfTrF1uzuDgBHFAdO0+d2lmsbG3HmeKMvUyj6pWA==} @@ -10984,8 +10981,8 @@ packages: invariant@2.2.4: resolution: {integrity: sha512-phJfQVBuaJM5raOpJjSfkiD6BpbCE4Ns//LaXl6wGYtUBY83nWS6Rf9tXm2e8VaK60JEjYldbPif/A2B1C2gNA==} - ioredis@5.4.1: - resolution: {integrity: sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==} + ioredis@5.8.2: + resolution: {integrity: sha512-C6uC+kleiIMmjViJINWk80sOQw5lEzse1ZmvD+S/s8p8CWapftSaC+kocGTx6xrbrJ4WmYQGC08ffHLr6ToR6Q==} engines: {node: '>=12.22.0'} ip-address@9.0.5: @@ -12020,8 +12017,8 @@ packages: resolution: {integrity: sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==} hasBin: true - msgpackr@1.10.1: - resolution: {integrity: sha512-r5VRLv9qouXuLiIBrLpl2d5ZvPt8svdQTl5/vMvE4nzDMyEX4sgW5yWhuBBj5UmgwOTWj8CIdSXn5sAfsHAWIQ==} + msgpackr@1.11.5: + resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==} msw@2.2.11: resolution: {integrity: sha512-XtIoewF7XWLT0a39Ftkazt9PprBA1bxHZ4CSlomN74sCBJOJU2w5VwLmGlswwsOBhGoF7jovt6bxrSIESxA1KA==} @@ -15125,7 +15122,7 @@ snapshots: '@apm-js-collab/tracing-hooks@0.3.1': dependencies: '@apm-js-collab/code-transformer': 0.8.2 - debug: 4.3.7 + debug: 4.4.1 module-details-from-path: 1.0.4 transitivePeerDependencies: - supports-color @@ -16095,7 +16092,7 @@ snapshots: '@babel/traverse': 7.28.0 '@babel/types': 7.28.2 convert-source-map: 2.0.0 - debug: 4.3.7 + debug: 4.4.1 gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.1 @@ -16255,7 +16252,7 @@ snapshots: '@babel/helper-split-export-declaration': 7.22.6 '@babel/parser': 7.28.0 '@babel/types': 7.23.6 - debug: 4.3.7 + debug: 4.4.1 globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -16267,7 +16264,7 @@ snapshots: '@babel/parser': 7.28.0 '@babel/template': 7.27.2 '@babel/types': 7.28.2 - debug: 4.3.7 + debug: 4.4.1 globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -16280,7 +16277,7 @@ snapshots: '@babel/parser': 7.28.0 '@babel/template': 7.27.2 '@babel/types': 7.28.2 - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -18532,7 +18529,7 @@ snapshots: optionalDependencies: '@types/node': 22.17.0 - '@ioredis/commands@1.2.0': {} + '@ioredis/commands@1.4.0': {} '@isaacs/cliui@8.0.2': dependencies: @@ -23108,11 +23105,6 @@ snapshots: '@types/node': 20.12.12 form-data: 4.0.4 - '@types/node@18.19.130': - dependencies: - undici-types: 5.26.5 - optional: true - '@types/node@18.19.21': dependencies: undici-types: 5.26.5 @@ -23293,7 +23285,7 @@ snapshots: dependencies: '@typescript-eslint/typescript-estree': 5.62.0(typescript@5.5.2) '@typescript-eslint/utils': 5.62.0(eslint@8.57.1)(typescript@5.5.2) - debug: 4.3.7 + debug: 4.4.1 eslint: 8.57.1 tsutils: 3.21.0(typescript@5.5.2) optionalDependencies: @@ -23307,7 +23299,7 @@ snapshots: dependencies: '@typescript-eslint/types': 5.62.0 '@typescript-eslint/visitor-keys': 5.62.0 - debug: 4.3.7 + debug: 4.4.1 globby: 11.1.0 is-glob: 4.0.3 semver: 7.7.1 @@ -23339,7 +23331,7 @@ snapshots: '@typescript/vfs@1.5.0': dependencies: - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -24020,7 +24012,7 @@ snapshots: dependencies: bytes: 3.1.2 content-type: 1.0.5 - debug: 4.3.7 + debug: 4.4.1 http-errors: 2.0.0 iconv-lite: 0.7.1 on-finished: 2.4.1 @@ -24126,21 +24118,21 @@ snapshots: dependencies: semver: 7.7.1 - bullmq@5.10.0: + bullmq@5.66.4: dependencies: cron-parser: 4.9.0 - ioredis: 5.4.1 - msgpackr: 1.10.1 + ioredis: 5.8.2 + msgpackr: 1.11.5 node-abort-controller: 3.1.1 - semver: 7.7.1 - tslib: 2.6.2 - uuid: 9.0.1 + semver: 7.7.3 + tslib: 2.8.1 + uuid: 11.1.0 transitivePeerDependencies: - supports-color bun-types@1.2.12: dependencies: - '@types/node': 18.19.130 + '@types/node': 20.12.12 optional: true bun-types@1.2.3: @@ -25168,7 +25160,7 @@ snapshots: base64id: 2.0.0 cookie: 0.7.2 cors: 2.8.5 - debug: 4.3.7 + debug: 4.4.1 engine.io-parser: 5.2.3 ws: 8.17.1 transitivePeerDependencies: @@ -25567,7 +25559,7 @@ snapshots: eslint-import-resolver-node@0.3.7: dependencies: - debug: 4.3.7 + debug: 4.4.1 is-core-module: 2.12.1 resolve: 1.22.8 transitivePeerDependencies: @@ -25611,7 +25603,7 @@ snapshots: eslint-module-utils@2.8.0(@typescript-eslint/parser@5.62.0(eslint@8.57.1)(typescript@5.5.2))(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.57.1): dependencies: - debug: 4.3.7 + debug: 4.4.1 optionalDependencies: '@typescript-eslint/parser': 5.62.0(eslint@8.57.1)(typescript@5.5.2) eslint: 8.57.1 @@ -25622,7 +25614,7 @@ snapshots: eslint-module-utils@2.8.0(@typescript-eslint/parser@5.62.0(eslint@8.57.1)(typescript@5.5.2))(eslint-import-resolver-typescript@3.5.5)(eslint@8.57.1): dependencies: - debug: 4.3.7 + debug: 4.4.1 optionalDependencies: '@typescript-eslint/parser': 5.62.0(eslint@8.57.1)(typescript@5.5.2) eslint: 8.57.1 @@ -26153,7 +26145,7 @@ snapshots: finalhandler@2.1.0: dependencies: - debug: 4.3.7 + debug: 4.4.1 encodeurl: 2.0.0 escape-html: 1.0.3 on-finished: 2.4.1 @@ -26815,7 +26807,7 @@ snapshots: http-proxy-agent@7.0.2: dependencies: agent-base: 7.1.4 - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -26970,11 +26962,11 @@ snapshots: dependencies: loose-envify: 1.4.0 - ioredis@5.4.1: + ioredis@5.8.2: dependencies: - '@ioredis/commands': 1.2.0 + '@ioredis/commands': 1.4.0 cluster-key-slot: 1.1.2 - debug: 4.3.7 + debug: 4.4.1 denque: 2.1.0 lodash.defaults: 4.2.0 lodash.isarguments: 3.1.0 @@ -27252,7 +27244,7 @@ snapshots: jest-worker@27.5.1: dependencies: - '@types/node': 20.3.1 + '@types/node': 20.12.12 merge-stream: 2.0.0 supports-color: 10.2.0 @@ -28198,7 +28190,7 @@ snapshots: '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.2 optional: true - msgpackr@1.10.1: + msgpackr@1.11.5: optionalDependencies: msgpackr-extract: 3.0.2 @@ -29773,7 +29765,7 @@ snapshots: require-in-the-middle@7.5.2: dependencies: - debug: 4.3.7 + debug: 4.4.1 module-details-from-path: 1.0.3 resolve: 1.22.10 transitivePeerDependencies: @@ -29926,7 +29918,7 @@ snapshots: router@2.2.0: dependencies: - debug: 4.3.7 + debug: 4.4.1 depd: 2.0.0 is-promise: 4.0.0 parseurl: 1.3.3 @@ -30016,12 +30008,11 @@ snapshots: semver@7.7.1: {} - semver@7.7.3: - optional: true + semver@7.7.3: {} send@1.2.0: dependencies: - debug: 4.3.7 + debug: 4.4.1 encodeurl: 2.0.0 escape-html: 1.0.3 etag: 1.8.1 @@ -30199,7 +30190,7 @@ snapshots: socket.io-adapter@2.5.5: dependencies: - debug: 4.3.7 + debug: 4.4.1 ws: 8.17.1 transitivePeerDependencies: - bufferutil @@ -30209,7 +30200,7 @@ snapshots: socket.io-parser@4.2.4: dependencies: '@socket.io/component-emitter': 3.1.2 - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -30218,7 +30209,7 @@ snapshots: accepts: 1.3.8 base64id: 2.0.0 cors: 2.8.5 - debug: 4.3.7 + debug: 4.4.1 engine.io: 6.6.4 socket.io-adapter: 2.5.5 socket.io-parser: 4.2.4