From 4804e55fe4ed15b915442cc920694a3f7509476f Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Tue, 16 Dec 2025 18:32:07 -0500 Subject: [PATCH 1/8] feat: create script to enqueue inactive organizations for deletion --- controlplane/src/bin/db-cleanup.ts | 206 ++++++++++++++++++ controlplane/src/core/build-server.ts | 20 +- .../repositories/OrganizationRepository.ts | 3 +- controlplane/src/core/routes.ts | 2 + .../NotifyOrganizationDeletionQueuedWorker.ts | 134 ++++++++++++ 5 files changed, 363 insertions(+), 2 deletions(-) create mode 100644 controlplane/src/bin/db-cleanup.ts create mode 100644 controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts diff --git a/controlplane/src/bin/db-cleanup.ts b/controlplane/src/bin/db-cleanup.ts new file mode 100644 index 0000000000..9fd8702469 --- /dev/null +++ b/controlplane/src/bin/db-cleanup.ts @@ -0,0 +1,206 @@ +import 'dotenv/config'; + +import process from 'node:process'; +import { subDays, startOfMonth, addDays } from 'date-fns'; +import postgres from 'postgres'; +import { drizzle, PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import { and, count, eq, gte, isNull, lt, sql } from 'drizzle-orm'; +import { pino } from 'pino'; +import { buildDatabaseConnectionConfig } from '../core/plugins/database.js'; +import * as schema from '../db/schema.js'; +import { DeleteOrganizationQueue } from '../core/workers/DeleteOrganizationWorker.js'; +import { createRedisConnections } from '../core/plugins/redis.js'; +import { OrganizationRepository } from '../core/repositories/OrganizationRepository.js'; +import { NotifyOrganizationDeletionQueuedQueue } from '../core/workers/NotifyOrganizationDeletionQueuedWorker.js'; +import { getConfig } from './get-config.js'; + +// Number of concurrent tasks. We'll allocate the same number of database connections + 1, so keep this number reasonable +const MAX_DEGREE_OF_PARALLELISM = 5; + +// How many organizations to retrieve from the database to migrate in a transaction. This is used to not load +// all organizations at once and perform the migration in buckets +const ORGANIZATIONS_PER_BUCKET = 100; + +// The number of days the organization needs to be inactive for before we consider it for deletion +const MIN_INACTIVITY_DAYS = 90; + +// How long should we wait before deleting the organization? +const DELAY_FOR_ORG_DELETION_IN_DAYS = 7; + +const { databaseConnectionUrl, databaseTlsCa, databaseTlsCert, databaseTlsKey, redis } = getConfig(); + +try { + const connectionConfig = await buildDatabaseConnectionConfig({ + tls: + databaseTlsCa || databaseTlsCert || databaseTlsKey + ? { + ca: databaseTlsCa, + cert: databaseTlsCert, + key: databaseTlsKey, + } + : undefined, + }); + const queryConnection = postgres(databaseConnectionUrl, { + ...connectionConfig, + max: MAX_DEGREE_OF_PARALLELISM + 1, + }); + + // Initialize the Redis connection + const { redisQueue, redisWorker } = await createRedisConnections({ + host: redis.host!, + port: Number(redis.port), + password: redis.password, + tls: redis.tls, + }); + + await redisQueue.connect(); + await redisWorker.connect(); + await redisWorker.ping(); + await redisQueue.ping(); + + try { + const logger = pino(); + const db = drizzle(queryConnection, { schema: { ...schema } }); + + await queueOrganizationsForDeletion({ + db, + deleteOrganizationQueue: new DeleteOrganizationQueue(logger, redisQueue), + notifyOrganizationDeletionQueuedQueue: new NotifyOrganizationDeletionQueuedQueue(logger, redisQueue), + }); + } finally { + redisQueue.disconnect(); + redisWorker.disconnect(); + + // Close the database connection + await queryConnection.end({ + timeout: 1, + }); + } + + // eslint-disable-next-line unicorn/no-process-exit + process.exit(0); +} catch (err: any) { + console.error(err); + // eslint-disable-next-line unicorn/no-process-exit + process.exit(1); +} + +function chunkArray(data: T[]): T[][] { + // @ts-ignore + if (MAX_DEGREE_OF_PARALLELISM === 1) { + return [data]; + } + + const chunks: T[][] = []; + const organizationsPerChunk = Math.ceil(ORGANIZATIONS_PER_BUCKET / MAX_DEGREE_OF_PARALLELISM); + for (let i = 0; i < data.length; i += organizationsPerChunk) { + chunks.push(data.slice(i, i + organizationsPerChunk)); + } + + return chunks; +} + +async function queueOrganizationsForDeletion({ + db, + deleteOrganizationQueue, + notifyOrganizationDeletionQueuedQueue, +}: { + db: PostgresJsDatabase; + deleteOrganizationQueue: DeleteOrganizationQueue; + notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; +}) { + // First, retrieve all the organizations that only have a single user and have not had any activity registered in + // the audit log for the last `MIN_INACTIVITY_DAYS` days + const now = new Date(); + const inactivityThreshold = startOfMonth(subDays(now, MIN_INACTIVITY_DAYS)); + + console.log(`Retrieving organizations with a single member...`); + const organizations = await db + .select({ + id: schema.organizations.id, + slug: schema.organizations.slug, + userId: schema.organizations.createdBy, + }) + .from(schema.organizations) + .innerJoin(schema.organizationsMembers, eq(schema.organizationsMembers.organizationId, schema.organizations.id)) + .where( + and(isNull(schema.organizations.queuedForDeletionAt), lt(schema.organizations.createdAt, inactivityThreshold)), + ) + .groupBy(schema.organizations.id) + .having(sql`COUNT(${schema.organizationsMembers.id}) = 1`) + .execute(); + + if (organizations.length === 0) { + console.log('No organizations found with a single member'); + return; + } + + console.log(`${organizations.length} organizations with a single member found`); + + // Break the organizations in chunk so we can have some degree of parallelism + console.log('Processing organizations...'); + await Promise.all( + chunkArray(organizations).map((chunk) => + db.transaction((tx) => { + return processChunkOfOrganizations({ + organizations: chunk, + db: tx, + inactivityThreshold, + deleteOrganizationQueue, + notifyOrganizationDeletionQueuedQueue, + }); + }), + ), + ); + + console.log('Done!'); +} + +async function processChunkOfOrganizations({ + organizations, + db, + inactivityThreshold, + deleteOrganizationQueue, + notifyOrganizationDeletionQueuedQueue, +}: { + organizations: { id: string; slug: string; userId: string | null }[]; + db: PostgresJsDatabase; + inactivityThreshold: Date; + deleteOrganizationQueue: DeleteOrganizationQueue; + notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; +}) { + const queuedAt = new Date(); + const deletesAt = addDays(queuedAt, DELAY_FOR_ORG_DELETION_IN_DAYS); + + const orgRepo = new OrganizationRepository(pino(), db, undefined); + for (const org of organizations) { + const auditLogs = await db + .select({ count: count() }) + .from(schema.auditLogs) + .where(and(eq(schema.auditLogs.organizationId, org.id), gte(schema.auditLogs.createdAt, inactivityThreshold))) + .execute(); + + if (auditLogs.length > 0 && auditLogs[0].count > 0) { + // The organization has had activity registered in the audit, at least once in the last `MIN_INACTIVITY_DAYS` days, + // so we don't need to consider it for deletion + continue; + } + + console.log(`\tEnqueuing organization "${org.slug}" for deletion at ${deletesAt.toISOString()}...`); + + // Enqueue the organization deletion job + await orgRepo.queueOrganizationDeletion({ + organizationId: org.id, + queuedBy: undefined, + deleteOrganizationQueue, + deleteDelayInDays: DELAY_FOR_ORG_DELETION_IN_DAYS, + }); + + // Queue the organization deletion notification job + await notifyOrganizationDeletionQueuedQueue.addJob({ + organizationId: org.id, + queuedAt: queuedAt.getTime(), + deletesAt: deletesAt.getTime(), + }); + } +} diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index a26f9e744f..ff2398a251 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -10,7 +10,7 @@ import { App } from 'octokit'; import { Worker } from 'bullmq'; import routes from './routes.js'; import fastifyHealth from './plugins/health.js'; -import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js'; +import fastifyMetrics from './plugins/metrics.js'; import fastifyDatabase from './plugins/database.js'; import fastifyClickHouse from './plugins/clickhouse.js'; import fastifyRedis from './plugins/redis.js'; @@ -53,6 +53,10 @@ import { createReactivateOrganizationWorker, ReactivateOrganizationQueue, } from './workers/ReactivateOrganizationWorker.js'; +import { + createNotifyOrganizationDeletionQueuedWorker, + NotifyOrganizationDeletionQueuedQueue, +} from './workers/NotifyOrganizationDeletionQueuedWorker.js'; export interface BuildConfig { logger: LoggerOptions; @@ -394,6 +398,19 @@ export default async function build(opts: BuildConfig) { }), ); + const notifyOrganizationDeletionQueuedQueue = new NotifyOrganizationDeletionQueuedQueue( + logger, + fastify.redisForQueue, + ); + bullWorkers.push( + createNotifyOrganizationDeletionQueuedWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + logger, + mailer: mailerClient, + }), + ); + // required to verify webhook payloads await fastify.register(import('fastify-raw-body'), { field: 'rawBody', @@ -499,6 +516,7 @@ export default async function build(opts: BuildConfig) { deactivateOrganizationQueue, reactivateOrganizationQueue, deleteUserQueue, + notifyOrganizationDeletionQueuedQueue, }, stripeSecretKey: opts.stripe?.secret, admissionWebhookJWTSecret: opts.admissionWebhook.secret, diff --git a/controlplane/src/core/repositories/OrganizationRepository.ts b/controlplane/src/core/repositories/OrganizationRepository.ts index a374b860fd..703d109750 100644 --- a/controlplane/src/core/repositories/OrganizationRepository.ts +++ b/controlplane/src/core/repositories/OrganizationRepository.ts @@ -943,6 +943,7 @@ export class OrganizationRepository { organizationId: string; queuedBy?: string; deleteOrganizationQueue: DeleteOrganizationQueue; + deleteDelayInDays?: number; }) { return this.db.transaction(async (tx) => { const now = new Date(); @@ -954,7 +955,7 @@ export class OrganizationRepository { }) .where(eq(schema.organizations.id, input.organizationId)); - const deleteAt = addDays(now, delayForManualOrgDeletionInDays); + const deleteAt = addDays(now, input.deleteDelayInDays || delayForManualOrgDeletionInDays); const delay = Number(deleteAt) - Number(now); return await input.deleteOrganizationQueue.addJob( diff --git a/controlplane/src/core/routes.ts b/controlplane/src/core/routes.ts index d08fe04f42..52dd845ac7 100644 --- a/controlplane/src/core/routes.ts +++ b/controlplane/src/core/routes.ts @@ -21,6 +21,7 @@ import { DeactivateOrganizationQueue } from './workers/DeactivateOrganizationWor import { DeleteUserQueue } from './workers/DeleteUserQueue.js'; import { ReactivateOrganizationQueue } from './workers/ReactivateOrganizationWorker.js'; import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogsWorker.js'; +import { NotifyOrganizationDeletionQueuedQueue } from './workers/NotifyOrganizationDeletionQueuedWorker.js'; export interface RouterOptions { db: PostgresJsDatabase; @@ -48,6 +49,7 @@ export interface RouterOptions { deactivateOrganizationQueue: DeactivateOrganizationQueue; reactivateOrganizationQueue: ReactivateOrganizationQueue; deleteUserQueue: DeleteUserQueue; + notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; }; stripeSecretKey?: string; cdnBaseUrl: string; diff --git a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts new file mode 100644 index 0000000000..ca702a0a15 --- /dev/null +++ b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts @@ -0,0 +1,134 @@ +import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import pino from 'pino'; +import * as schema from '../../db/schema.js'; +import Mailer from '../services/Mailer.js'; +import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; +import { IQueue, IWorker } from './Worker.js'; + +const QueueName = 'organization.send-deletion-queued-message'; +const WorkerName = 'SendOrganizationDeletionQueuedWorker'; + +export interface NotifyOrganizationDeletionQueuedInput { + organizationId: string; + queuedAt: number; + deletesAt: number; +} + +export class NotifyOrganizationDeletionQueuedQueue implements IQueue { + private readonly queue: Queue; + private readonly logger: pino.Logger; + + constructor(log: pino.Logger, conn: ConnectionOptions) { + this.logger = log.child({ queue: QueueName }); + this.queue = new Queue(QueueName, { + connection: conn, + defaultJobOptions: { + removeOnComplete: { + age: 90 * 86_400, + }, + removeOnFail: { + age: 90 * 86_400, + }, + attempts: 6, + backoff: { + type: 'exponential', + delay: 112_000, + }, + }, + }); + + this.queue.on('error', (err) => { + this.logger.error(err, 'Queue error'); + }); + } + + public addJob(job: NotifyOrganizationDeletionQueuedInput, opts?: Omit) { + return this.queue.add(job.organizationId, job, { + ...opts, + jobId: job.organizationId, + }); + } + + public removeJob(job: NotifyOrganizationDeletionQueuedInput) { + return this.queue.remove(job.organizationId); + } + + public getJob(job: NotifyOrganizationDeletionQueuedInput) { + return this.queue.getJob(job.organizationId); + } +} + +class NotifyOrganizationDeletionQueuedWorker implements IWorker { + constructor( + private input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + logger: pino.Logger; + mailer: Mailer | undefined; + }, + ) { + this.input.logger = input.logger.child({ worker: WorkerName }); + } + + public async handler(job: Job) { + try { + if (!this.input.mailer) { + throw new Error('Mailer service not configured'); + } + + const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); + const org = await orgRepo.byId(job.data.organizationId); + if (!org) { + throw new Error('Organization not found'); + } + + const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); + const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin); + + const intl = Intl.DateTimeFormat(undefined, { + dateStyle: 'medium', + timeStyle: 'short', + }); + + await this.input.mailer.sendOrganizationDeletionQueuedEmail({ + receiverEmails: orgAdmins.map((m) => m.email), + organizationName: org.name, + userDisplayName: 'System', + queuedOnDate: intl.format(new Date(job.data.queuedAt)), + deletionDate: intl.format(new Date(job.data.deletesAt)), + restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`, + }); + } catch (err) { + this.input.logger.error( + { jobId: job.id, organizationId: job.data.organizationId, err }, + `Failed to send organization deletion queued notification`, + ); + throw err; + } + } +} + +export const createNotifyOrganizationDeletionQueuedWorker = (input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + logger: pino.Logger; + mailer: Mailer | undefined; +}) => { + const log = input.logger.child({ worker: WorkerName }); + const worker = new Worker( + QueueName, + (job) => new NotifyOrganizationDeletionQueuedWorker(input).handler(job), + { + connection: input.redisConnection, + concurrency: 10, + }, + ); + worker.on('stalled', (job) => { + log.warn({ joinId: job }, `Job stalled`); + }); + worker.on('error', (err) => { + log.error(err, 'Worker error'); + }); + return worker; +}; From a0c978ac1bf7aac3f2a4f74cd280f6f488fa80d1 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Thu, 18 Dec 2025 15:52:46 -0500 Subject: [PATCH 2/8] chore: update script and tests --- controlplane/src/bin/db-cleanup.ts | 18 +- .../organization/deleteOrganization.ts | 204 +++++++++--------- .../repositories/OrganizationRepository.ts | 40 ++-- .../src/core/workers/CacheWarmerWorker.ts | 2 +- .../workers/DeactivateOrganizationWorker.ts | 2 +- .../DeleteOrganizationAuditLogsWorker.ts | 2 +- .../core/workers/DeleteOrganizationWorker.ts | 2 +- .../src/core/workers/DeleteUserQueue.ts | 2 +- .../NotifyOrganizationDeletionQueuedWorker.ts | 2 +- .../workers/ReactivateOrganizationWorker.ts | 2 +- controlplane/test/test-util.ts | 3 + 11 files changed, 146 insertions(+), 133 deletions(-) diff --git a/controlplane/src/bin/db-cleanup.ts b/controlplane/src/bin/db-cleanup.ts index 9fd8702469..4e4c07ecd1 100644 --- a/controlplane/src/bin/db-cleanup.ts +++ b/controlplane/src/bin/db-cleanup.ts @@ -4,7 +4,7 @@ import process from 'node:process'; import { subDays, startOfMonth, addDays } from 'date-fns'; import postgres from 'postgres'; import { drizzle, PostgresJsDatabase } from 'drizzle-orm/postgres-js'; -import { and, count, eq, gte, isNull, lt, sql } from 'drizzle-orm'; +import { and, count, eq, gte, isNull, lt, or, sql } from 'drizzle-orm'; import { pino } from 'pino'; import { buildDatabaseConnectionConfig } from '../core/plugins/database.js'; import * as schema from '../db/schema.js'; @@ -114,19 +114,29 @@ async function queueOrganizationsForDeletion({ const now = new Date(); const inactivityThreshold = startOfMonth(subDays(now, MIN_INACTIVITY_DAYS)); - console.log(`Retrieving organizations with a single member...`); + console.log(`Retrieving organizations...`); const organizations = await db .select({ id: schema.organizations.id, slug: schema.organizations.slug, userId: schema.organizations.createdBy, + plan: schema.organizationBilling.plan, }) .from(schema.organizations) .innerJoin(schema.organizationsMembers, eq(schema.organizationsMembers.organizationId, schema.organizations.id)) + .leftJoin(schema.organizationBilling, eq(schema.organizationBilling.organizationId, schema.organizations.id)) .where( - and(isNull(schema.organizations.queuedForDeletionAt), lt(schema.organizations.createdAt, inactivityThreshold)), + and( + isNull(schema.organizations.queuedForDeletionAt), + eq(schema.organizations.isDeactivated, false), + lt(schema.organizations.createdAt, inactivityThreshold), + or( + isNull(schema.organizationBilling.plan), + eq(schema.organizationBilling.plan, 'developer'), + ), + ), ) - .groupBy(schema.organizations.id) + .groupBy(schema.organizations.id, schema.organizationBilling.plan) .having(sql`COUNT(${schema.organizationsMembers.id}) = 1`) .execute(); diff --git a/controlplane/src/core/bufservices/organization/deleteOrganization.ts b/controlplane/src/core/bufservices/organization/deleteOrganization.ts index f9b36b759f..81a9881f96 100644 --- a/controlplane/src/core/bufservices/organization/deleteOrganization.ts +++ b/controlplane/src/core/bufservices/organization/deleteOrganization.ts @@ -25,116 +25,118 @@ export function deleteOrganization( const authContext = await opts.authenticator.authenticate(ctx.requestHeader); logger = enrichLogger(ctx, logger, authContext); - const orgRepo = new OrganizationRepository(logger, opts.db, opts.billingDefaultPlanId); - const auditLogRepo = new AuditLogRepository(opts.db); - const billingRepo = new BillingRepository(opts.db); - - const memberships = await orgRepo.memberships({ userId: authContext.userId }); - const orgCount = memberships.length; - - const org = await orgRepo.byId(authContext.organizationId); - if (!org) { - return { - response: { - code: EnumStatusCode.ERR_NOT_FOUND, - details: `Organization not found`, - }, - }; - } - - const subscription = await billingRepo.getActiveSubscriptionOfOrganization(org.id); - if (subscription?.id) { - return { - response: { - code: EnumStatusCode.ERR, - details: 'The organization subscription must be cancelled before the organization is deleted.', - }, - }; - } - - const user = await orgRepo.getOrganizationMember({ - organizationID: authContext.organizationId, - userID: authContext.userId || req.userID, - }); + return opts.db.transaction(async (tx) => { + const orgRepo = new OrganizationRepository(logger, tx, opts.billingDefaultPlanId); + const auditLogRepo = new AuditLogRepository(tx); + const billingRepo = new BillingRepository(tx); + + const memberships = await orgRepo.memberships({ userId: authContext.userId }); + const orgCount = memberships.length; + + const org = await orgRepo.byId(authContext.organizationId); + if (!org) { + return { + response: { + code: EnumStatusCode.ERR_NOT_FOUND, + details: `Organization not found`, + }, + }; + } + + const subscription = await billingRepo.getActiveSubscriptionOfOrganization(org.id); + if (subscription?.id) { + return { + response: { + code: EnumStatusCode.ERR, + details: 'The organization subscription must be cancelled before the organization is deleted.', + }, + }; + } + + const user = await orgRepo.getOrganizationMember({ + organizationID: authContext.organizationId, + userID: authContext.userId || req.userID, + }); - if (!user) { - return { - response: { - code: EnumStatusCode.ERR, - details: 'User is not a part of this organization.', - }, - }; - } + if (!user) { + return { + response: { + code: EnumStatusCode.ERR, + details: 'User is not a part of this organization.', + }, + }; + } + + // non admins cannot delete the organization + if (!user.rbac.isOrganizationAdmin) { + throw new UnauthorizedError(); + } + + // Minimum one organization is required for a user + if (orgCount <= 1) { + return { + response: { + code: EnumStatusCode.ERR_NOT_FOUND, + details: 'Minimum one organization is required for a user.', + }, + }; + } + + // If the organization deletion have already been queued we shouldn't do it again + if (org.deletion) { + return { + response: { + code: EnumStatusCode.OK, + }, + }; + } + + const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); + const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin); + + const now = new Date(); + const oneMonthFromNow = addDays(now, delayForManualOrgDeletionInDays); + + await orgRepo.queueOrganizationDeletion({ + organizationId: org.id, + queuedBy: authContext.userDisplayName, + deleteOrganizationQueue: opts.queues.deleteOrganizationQueue, + }); - // non admins cannot delete the organization - if (!user.rbac.isOrganizationAdmin) { - throw new UnauthorizedError(); - } + await auditLogRepo.addAuditLog({ + organizationId: org.id, + organizationSlug: authContext.organizationSlug, + auditAction: 'organization.deletion_queued', + action: 'queued_deletion', + actorId: authContext.userId, + auditableType: 'organization', + auditableDisplayName: org.name, + actorDisplayName: authContext.userDisplayName, + apiKeyName: authContext.apiKeyName, + actorType: authContext.auth === 'api_key' ? 'api_key' : 'user', + }); - // Minimum one organization is required for a user - if (orgCount <= 1) { - return { - response: { - code: EnumStatusCode.ERR_NOT_FOUND, - details: 'Minimum one organization is required for a user.', - }, - }; - } + if (opts.mailerClient && orgAdmins.length > 0) { + const intl = Intl.DateTimeFormat(undefined, { + dateStyle: 'medium', + timeStyle: 'short', + }); + + await opts.mailerClient.sendOrganizationDeletionQueuedEmail({ + receiverEmails: orgAdmins.map((m) => m.email), + organizationName: org.name, + userDisplayName: authContext.userDisplayName, + queuedOnDate: intl.format(now), + deletionDate: intl.format(oneMonthFromNow), + restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`, + }); + } - // If the organization deletion have already been queued we shouldn't do it again - if (org.deletion) { return { response: { code: EnumStatusCode.OK, }, }; - } - - const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); - const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin); - - const now = new Date(); - const oneMonthFromNow = addDays(now, delayForManualOrgDeletionInDays); - - await orgRepo.queueOrganizationDeletion({ - organizationId: org.id, - queuedBy: authContext.userDisplayName, - deleteOrganizationQueue: opts.queues.deleteOrganizationQueue, - }); - - await auditLogRepo.addAuditLog({ - organizationId: org.id, - organizationSlug: authContext.organizationSlug, - auditAction: 'organization.deletion_queued', - action: 'queued_deletion', - actorId: authContext.userId, - auditableType: 'organization', - auditableDisplayName: org.name, - actorDisplayName: authContext.userDisplayName, - apiKeyName: authContext.apiKeyName, - actorType: authContext.auth === 'api_key' ? 'api_key' : 'user', }); - - if (opts.mailerClient && orgAdmins.length > 0) { - const intl = Intl.DateTimeFormat(undefined, { - dateStyle: 'medium', - timeStyle: 'short', - }); - - await opts.mailerClient.sendOrganizationDeletionQueuedEmail({ - receiverEmails: orgAdmins.map((m) => m.email), - organizationName: org.name, - userDisplayName: authContext.userDisplayName, - queuedOnDate: intl.format(now), - deletionDate: intl.format(oneMonthFromNow), - restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`, - }); - } - - return { - response: { - code: EnumStatusCode.OK, - }, - }; }); } diff --git a/controlplane/src/core/repositories/OrganizationRepository.ts b/controlplane/src/core/repositories/OrganizationRepository.ts index 703d109750..c355b682f6 100644 --- a/controlplane/src/core/repositories/OrganizationRepository.ts +++ b/controlplane/src/core/repositories/OrganizationRepository.ts @@ -939,34 +939,32 @@ export class OrganizationRepository { return result[0]; } - public queueOrganizationDeletion(input: { + public async queueOrganizationDeletion(input: { organizationId: string; queuedBy?: string; deleteOrganizationQueue: DeleteOrganizationQueue; deleteDelayInDays?: number; }) { - return this.db.transaction(async (tx) => { - const now = new Date(); - await tx - .update(schema.organizations) - .set({ - queuedForDeletionAt: now, - queuedForDeletionBy: input.queuedBy, - }) - .where(eq(schema.organizations.id, input.organizationId)); + const now = new Date(); + await this.db + .update(schema.organizations) + .set({ + queuedForDeletionAt: now, + queuedForDeletionBy: input.queuedBy, + }) + .where(eq(schema.organizations.id, input.organizationId)); - const deleteAt = addDays(now, input.deleteDelayInDays || delayForManualOrgDeletionInDays); - const delay = Number(deleteAt) - Number(now); + const deleteAt = addDays(now, input.deleteDelayInDays || delayForManualOrgDeletionInDays); + const delay = Number(deleteAt) - Number(now); - return await input.deleteOrganizationQueue.addJob( - { - organizationId: input.organizationId, - }, - { - delay, - }, - ); - }); + return await input.deleteOrganizationQueue.addJob( + { + organizationId: input.organizationId, + }, + { + delay, + }, + ); } public restoreOrganization(input: { organizationId: string; deleteOrganizationQueue: DeleteOrganizationQueue }) { diff --git a/controlplane/src/core/workers/CacheWarmerWorker.ts b/controlplane/src/core/workers/CacheWarmerWorker.ts index cb170f77e0..6e5edfd6ec 100644 --- a/controlplane/src/core/workers/CacheWarmerWorker.ts +++ b/controlplane/src/core/workers/CacheWarmerWorker.ts @@ -131,7 +131,7 @@ export const createCacheWarmerWorker = (input: { concurrency: 10, }); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts index d1ee93cb3e..b403ea7034 100644 --- a/controlplane/src/core/workers/DeactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeactivateOrganizationWorker.ts @@ -125,7 +125,7 @@ export const createDeactivateOrganizationWorker = (input: { }, ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts index 77465438c4..3f158bc01c 100644 --- a/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationAuditLogsWorker.ts @@ -99,7 +99,7 @@ export const createDeleteOrganizationAuditLogsWorker = (input: { ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, 'Job stalled'); + log.warn({ jobId: job }, 'Job stalled'); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeleteOrganizationWorker.ts b/controlplane/src/core/workers/DeleteOrganizationWorker.ts index 2846b1a0d0..182ec6c76c 100644 --- a/controlplane/src/core/workers/DeleteOrganizationWorker.ts +++ b/controlplane/src/core/workers/DeleteOrganizationWorker.ts @@ -151,7 +151,7 @@ export const createDeleteOrganizationWorker = (input: { }, ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/DeleteUserQueue.ts b/controlplane/src/core/workers/DeleteUserQueue.ts index e03e29a35b..0b5634c5e5 100644 --- a/controlplane/src/core/workers/DeleteUserQueue.ts +++ b/controlplane/src/core/workers/DeleteUserQueue.ts @@ -140,7 +140,7 @@ export const createDeleteUserWorker = (input: { concurrency: 10, }); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts index ca702a0a15..0243cccf34 100644 --- a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts +++ b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts @@ -125,7 +125,7 @@ export const createNotifyOrganizationDeletionQueuedWorker = (input: { }, ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts index 24c20fdf1a..a6c5ffacbc 100644 --- a/controlplane/src/core/workers/ReactivateOrganizationWorker.ts +++ b/controlplane/src/core/workers/ReactivateOrganizationWorker.ts @@ -113,7 +113,7 @@ export const createReactivateOrganizationWorker = (input: { }, ); worker.on('stalled', (job) => { - log.warn({ joinId: job }, `Job stalled`); + log.warn({ jobId: job }, `Job stalled`); }); worker.on('error', (err) => { log.error(err, 'Worker error'); diff --git a/controlplane/test/test-util.ts b/controlplane/test/test-util.ts index 1b3fa8aa86..b958208946 100644 --- a/controlplane/test/test-util.ts +++ b/controlplane/test/test-util.ts @@ -44,6 +44,7 @@ import { DeactivateOrganizationQueue } from '../src/core/workers/DeactivateOrgan import { DeleteUserQueue } from '../src/core/workers/DeleteUserQueue.js'; import { ReactivateOrganizationQueue } from '../src/core/workers/ReactivateOrganizationWorker.js'; import { DeleteOrganizationAuditLogsQueue } from '../src/core/workers/DeleteOrganizationAuditLogsWorker.js'; +import { NotifyOrganizationDeletionQueuedQueue } from "../src/core/workers/NotifyOrganizationDeletionQueuedWorker.js"; export const DEFAULT_ROUTER_URL = 'http://localhost:3002'; export const DEFAULT_SUBGRAPH_URL_ONE = 'http://localhost:4001'; @@ -151,6 +152,7 @@ export const SetupTest = async function ({ const deactivateOrganizationQueue = new DeactivateOrganizationQueue(log, server.redisForQueue); const deleteUserQueue = new DeleteUserQueue(log, server.redisForQueue); const reactivateOrganizationQueue = new ReactivateOrganizationQueue(log, server.redisForQueue); + const notifyOrganizationDeletionQueuedQueue = new NotifyOrganizationDeletionQueuedQueue(log, server.redisForQueue); const blobStorage = new InMemoryBlobStorage(); await server.register(fastifyConnectPlugin, { @@ -181,6 +183,7 @@ export const SetupTest = async function ({ deactivateOrganizationQueue, reactivateOrganizationQueue, deleteUserQueue, + notifyOrganizationDeletionQueuedQueue, }, }), }); From e82d01af7f2ce19475f41d3d222752d0d8648049 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Thu, 18 Dec 2025 16:06:41 -0500 Subject: [PATCH 3/8] chore: linting and tests --- controlplane/src/bin/db-cleanup.ts | 9 +++------ .../workers/NotifyOrganizationDeletionQueuedWorker.ts | 4 ++++ controlplane/test/test-util.ts | 1 + 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/controlplane/src/bin/db-cleanup.ts b/controlplane/src/bin/db-cleanup.ts index 4e4c07ecd1..0835869e4e 100644 --- a/controlplane/src/bin/db-cleanup.ts +++ b/controlplane/src/bin/db-cleanup.ts @@ -130,10 +130,7 @@ async function queueOrganizationsForDeletion({ isNull(schema.organizations.queuedForDeletionAt), eq(schema.organizations.isDeactivated, false), lt(schema.organizations.createdAt, inactivityThreshold), - or( - isNull(schema.organizationBilling.plan), - eq(schema.organizationBilling.plan, 'developer'), - ), + or(isNull(schema.organizationBilling.plan), eq(schema.organizationBilling.plan, 'developer')), ), ) .groupBy(schema.organizations.id, schema.organizationBilling.plan) @@ -209,8 +206,8 @@ async function processChunkOfOrganizations({ // Queue the organization deletion notification job await notifyOrganizationDeletionQueuedQueue.addJob({ organizationId: org.id, - queuedAt: queuedAt.getTime(), - deletesAt: deletesAt.getTime(), + queuedAt: Number(queuedAt), + deletesAt: Number(deletesAt), }); } } diff --git a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts index 0243cccf34..b236bdadc9 100644 --- a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts +++ b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts @@ -85,6 +85,10 @@ class NotifyOrganizationDeletionQueuedWorker implements IWorker { const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin); + if (orgAdmins.length === 0) { + this.input.logger.warn({ organizationId: org.id }, 'No admins found for organization, skipping notification'); + return; + } const intl = Intl.DateTimeFormat(undefined, { dateStyle: 'medium', diff --git a/controlplane/test/test-util.ts b/controlplane/test/test-util.ts index b958208946..b7493b2642 100644 --- a/controlplane/test/test-util.ts +++ b/controlplane/test/test-util.ts @@ -416,6 +416,7 @@ export const SetupTest = async function ({ deactivateOrganizationQueue, deleteUserQueue, reactivateOrganizationQueue, + notifyOrganizationDeletionQueuedQueue, }, }; }; From 67c0a22bb1e998e4a299bdc5899665bff192cbcf Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Thu, 18 Dec 2025 16:30:33 -0500 Subject: [PATCH 4/8] chore: remove non-null assertion --- controlplane/src/bin/db-cleanup.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/src/bin/db-cleanup.ts b/controlplane/src/bin/db-cleanup.ts index 0835869e4e..675e91b439 100644 --- a/controlplane/src/bin/db-cleanup.ts +++ b/controlplane/src/bin/db-cleanup.ts @@ -47,7 +47,7 @@ try { // Initialize the Redis connection const { redisQueue, redisWorker } = await createRedisConnections({ - host: redis.host!, + host: redis.host, port: Number(redis.port), password: redis.password, tls: redis.tls, From 27dcdf93bf1b2d5f3b183cb58123575e2da7b368 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Tue, 6 Jan 2026 21:25:04 -0500 Subject: [PATCH 5/8] chore: implement as organization cleanup as cron job --- controlplane/package.json | 4 +- controlplane/src/bin/db-cleanup.ts | 213 ---------------- controlplane/src/core/build-server.ts | 23 +- controlplane/src/core/routes.ts | 2 - .../NotifyOrganizationDeletionQueuedWorker.ts | 9 +- ...ueueInactiveOrganizationsDeletionWorker.ts | 231 ++++++++++++++++++ pnpm-lock.yaml | 103 ++++---- 7 files changed, 307 insertions(+), 278 deletions(-) delete mode 100644 controlplane/src/bin/db-cleanup.ts create mode 100644 controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts diff --git a/controlplane/package.json b/controlplane/package.json index 74c650fe15..8ae76a1855 100644 --- a/controlplane/package.json +++ b/controlplane/package.json @@ -62,7 +62,7 @@ "@wundergraph/protographic": "workspace:*", "axios": "^1.12.2", "axios-retry": "^4.5.0", - "bullmq": "^5.10.0", + "bullmq": "5.66.4", "cookie": "^0.7.2", "date-fns": "^3.6.0", "dotenv": "^16.4.5", @@ -75,7 +75,7 @@ "fastify-plugin": "^4.5.1", "fastify-raw-body": "^4.3.0", "graphql": "^16.9.0", - "ioredis": "^5.4.1", + "ioredis": "5.8.2", "isomorphic-dompurify": "^2.33.0", "jose": "^5.2.4", "lodash": "^4.17.21", diff --git a/controlplane/src/bin/db-cleanup.ts b/controlplane/src/bin/db-cleanup.ts deleted file mode 100644 index 675e91b439..0000000000 --- a/controlplane/src/bin/db-cleanup.ts +++ /dev/null @@ -1,213 +0,0 @@ -import 'dotenv/config'; - -import process from 'node:process'; -import { subDays, startOfMonth, addDays } from 'date-fns'; -import postgres from 'postgres'; -import { drizzle, PostgresJsDatabase } from 'drizzle-orm/postgres-js'; -import { and, count, eq, gte, isNull, lt, or, sql } from 'drizzle-orm'; -import { pino } from 'pino'; -import { buildDatabaseConnectionConfig } from '../core/plugins/database.js'; -import * as schema from '../db/schema.js'; -import { DeleteOrganizationQueue } from '../core/workers/DeleteOrganizationWorker.js'; -import { createRedisConnections } from '../core/plugins/redis.js'; -import { OrganizationRepository } from '../core/repositories/OrganizationRepository.js'; -import { NotifyOrganizationDeletionQueuedQueue } from '../core/workers/NotifyOrganizationDeletionQueuedWorker.js'; -import { getConfig } from './get-config.js'; - -// Number of concurrent tasks. We'll allocate the same number of database connections + 1, so keep this number reasonable -const MAX_DEGREE_OF_PARALLELISM = 5; - -// How many organizations to retrieve from the database to migrate in a transaction. This is used to not load -// all organizations at once and perform the migration in buckets -const ORGANIZATIONS_PER_BUCKET = 100; - -// The number of days the organization needs to be inactive for before we consider it for deletion -const MIN_INACTIVITY_DAYS = 90; - -// How long should we wait before deleting the organization? -const DELAY_FOR_ORG_DELETION_IN_DAYS = 7; - -const { databaseConnectionUrl, databaseTlsCa, databaseTlsCert, databaseTlsKey, redis } = getConfig(); - -try { - const connectionConfig = await buildDatabaseConnectionConfig({ - tls: - databaseTlsCa || databaseTlsCert || databaseTlsKey - ? { - ca: databaseTlsCa, - cert: databaseTlsCert, - key: databaseTlsKey, - } - : undefined, - }); - const queryConnection = postgres(databaseConnectionUrl, { - ...connectionConfig, - max: MAX_DEGREE_OF_PARALLELISM + 1, - }); - - // Initialize the Redis connection - const { redisQueue, redisWorker } = await createRedisConnections({ - host: redis.host, - port: Number(redis.port), - password: redis.password, - tls: redis.tls, - }); - - await redisQueue.connect(); - await redisWorker.connect(); - await redisWorker.ping(); - await redisQueue.ping(); - - try { - const logger = pino(); - const db = drizzle(queryConnection, { schema: { ...schema } }); - - await queueOrganizationsForDeletion({ - db, - deleteOrganizationQueue: new DeleteOrganizationQueue(logger, redisQueue), - notifyOrganizationDeletionQueuedQueue: new NotifyOrganizationDeletionQueuedQueue(logger, redisQueue), - }); - } finally { - redisQueue.disconnect(); - redisWorker.disconnect(); - - // Close the database connection - await queryConnection.end({ - timeout: 1, - }); - } - - // eslint-disable-next-line unicorn/no-process-exit - process.exit(0); -} catch (err: any) { - console.error(err); - // eslint-disable-next-line unicorn/no-process-exit - process.exit(1); -} - -function chunkArray(data: T[]): T[][] { - // @ts-ignore - if (MAX_DEGREE_OF_PARALLELISM === 1) { - return [data]; - } - - const chunks: T[][] = []; - const organizationsPerChunk = Math.ceil(ORGANIZATIONS_PER_BUCKET / MAX_DEGREE_OF_PARALLELISM); - for (let i = 0; i < data.length; i += organizationsPerChunk) { - chunks.push(data.slice(i, i + organizationsPerChunk)); - } - - return chunks; -} - -async function queueOrganizationsForDeletion({ - db, - deleteOrganizationQueue, - notifyOrganizationDeletionQueuedQueue, -}: { - db: PostgresJsDatabase; - deleteOrganizationQueue: DeleteOrganizationQueue; - notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; -}) { - // First, retrieve all the organizations that only have a single user and have not had any activity registered in - // the audit log for the last `MIN_INACTIVITY_DAYS` days - const now = new Date(); - const inactivityThreshold = startOfMonth(subDays(now, MIN_INACTIVITY_DAYS)); - - console.log(`Retrieving organizations...`); - const organizations = await db - .select({ - id: schema.organizations.id, - slug: schema.organizations.slug, - userId: schema.organizations.createdBy, - plan: schema.organizationBilling.plan, - }) - .from(schema.organizations) - .innerJoin(schema.organizationsMembers, eq(schema.organizationsMembers.organizationId, schema.organizations.id)) - .leftJoin(schema.organizationBilling, eq(schema.organizationBilling.organizationId, schema.organizations.id)) - .where( - and( - isNull(schema.organizations.queuedForDeletionAt), - eq(schema.organizations.isDeactivated, false), - lt(schema.organizations.createdAt, inactivityThreshold), - or(isNull(schema.organizationBilling.plan), eq(schema.organizationBilling.plan, 'developer')), - ), - ) - .groupBy(schema.organizations.id, schema.organizationBilling.plan) - .having(sql`COUNT(${schema.organizationsMembers.id}) = 1`) - .execute(); - - if (organizations.length === 0) { - console.log('No organizations found with a single member'); - return; - } - - console.log(`${organizations.length} organizations with a single member found`); - - // Break the organizations in chunk so we can have some degree of parallelism - console.log('Processing organizations...'); - await Promise.all( - chunkArray(organizations).map((chunk) => - db.transaction((tx) => { - return processChunkOfOrganizations({ - organizations: chunk, - db: tx, - inactivityThreshold, - deleteOrganizationQueue, - notifyOrganizationDeletionQueuedQueue, - }); - }), - ), - ); - - console.log('Done!'); -} - -async function processChunkOfOrganizations({ - organizations, - db, - inactivityThreshold, - deleteOrganizationQueue, - notifyOrganizationDeletionQueuedQueue, -}: { - organizations: { id: string; slug: string; userId: string | null }[]; - db: PostgresJsDatabase; - inactivityThreshold: Date; - deleteOrganizationQueue: DeleteOrganizationQueue; - notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; -}) { - const queuedAt = new Date(); - const deletesAt = addDays(queuedAt, DELAY_FOR_ORG_DELETION_IN_DAYS); - - const orgRepo = new OrganizationRepository(pino(), db, undefined); - for (const org of organizations) { - const auditLogs = await db - .select({ count: count() }) - .from(schema.auditLogs) - .where(and(eq(schema.auditLogs.organizationId, org.id), gte(schema.auditLogs.createdAt, inactivityThreshold))) - .execute(); - - if (auditLogs.length > 0 && auditLogs[0].count > 0) { - // The organization has had activity registered in the audit, at least once in the last `MIN_INACTIVITY_DAYS` days, - // so we don't need to consider it for deletion - continue; - } - - console.log(`\tEnqueuing organization "${org.slug}" for deletion at ${deletesAt.toISOString()}...`); - - // Enqueue the organization deletion job - await orgRepo.queueOrganizationDeletion({ - organizationId: org.id, - queuedBy: undefined, - deleteOrganizationQueue, - deleteDelayInDays: DELAY_FOR_ORG_DELETION_IN_DAYS, - }); - - // Queue the organization deletion notification job - await notifyOrganizationDeletionQueuedQueue.addJob({ - organizationId: org.id, - queuedAt: Number(queuedAt), - deletesAt: Number(deletesAt), - }); - } -} diff --git a/controlplane/src/core/build-server.ts b/controlplane/src/core/build-server.ts index ff2398a251..b37deb8169 100644 --- a/controlplane/src/core/build-server.ts +++ b/controlplane/src/core/build-server.ts @@ -53,6 +53,10 @@ import { createReactivateOrganizationWorker, ReactivateOrganizationQueue, } from './workers/ReactivateOrganizationWorker.js'; +import { + QueueInactiveOrganizationsDeletionQueue, + createQueueInactiveOrganizationsDeletionWorker, +} from './workers/QueueInactiveOrganizationsDeletionWorker.js'; import { createNotifyOrganizationDeletionQueuedWorker, NotifyOrganizationDeletionQueuedQueue, @@ -411,6 +415,24 @@ export default async function build(opts: BuildConfig) { }), ); + const queueInactiveOrganizationsDeletionQueue = new QueueInactiveOrganizationsDeletionQueue( + logger, + fastify.redisForQueue, + ); + bullWorkers.push( + createQueueInactiveOrganizationsDeletionWorker({ + redisConnection: fastify.redisForWorker, + db: fastify.db, + realm: opts.keycloak.realm, + keycloak: keycloakClient, + deleteOrganizationQueue, + notifyOrganizationDeletionQueuedQueue, + logger, + }), + ); + + await queueInactiveOrganizationsDeletionQueue.scheduleJob(); + // required to verify webhook payloads await fastify.register(import('fastify-raw-body'), { field: 'rawBody', @@ -516,7 +538,6 @@ export default async function build(opts: BuildConfig) { deactivateOrganizationQueue, reactivateOrganizationQueue, deleteUserQueue, - notifyOrganizationDeletionQueuedQueue, }, stripeSecretKey: opts.stripe?.secret, admissionWebhookJWTSecret: opts.admissionWebhook.secret, diff --git a/controlplane/src/core/routes.ts b/controlplane/src/core/routes.ts index 52dd845ac7..d08fe04f42 100644 --- a/controlplane/src/core/routes.ts +++ b/controlplane/src/core/routes.ts @@ -21,7 +21,6 @@ import { DeactivateOrganizationQueue } from './workers/DeactivateOrganizationWor import { DeleteUserQueue } from './workers/DeleteUserQueue.js'; import { ReactivateOrganizationQueue } from './workers/ReactivateOrganizationWorker.js'; import { DeleteOrganizationAuditLogsQueue } from './workers/DeleteOrganizationAuditLogsWorker.js'; -import { NotifyOrganizationDeletionQueuedQueue } from './workers/NotifyOrganizationDeletionQueuedWorker.js'; export interface RouterOptions { db: PostgresJsDatabase; @@ -49,7 +48,6 @@ export interface RouterOptions { deactivateOrganizationQueue: DeactivateOrganizationQueue; reactivateOrganizationQueue: ReactivateOrganizationQueue; deleteUserQueue: DeleteUserQueue; - notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; }; stripeSecretKey?: string; cdnBaseUrl: string; diff --git a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts index b236bdadc9..953f791dcc 100644 --- a/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts +++ b/controlplane/src/core/workers/NotifyOrganizationDeletionQueuedWorker.ts @@ -6,8 +6,8 @@ import Mailer from '../services/Mailer.js'; import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; import { IQueue, IWorker } from './Worker.js'; -const QueueName = 'organization.send-deletion-queued-message'; -const WorkerName = 'SendOrganizationDeletionQueuedWorker'; +const QueueName = 'organization.notify-organization-deletion-queued-queue'; +const WorkerName = 'NotifyOrganizationDeletionQueuedQueue'; export interface NotifyOrganizationDeletionQueuedInput { organizationId: string; @@ -80,7 +80,8 @@ class NotifyOrganizationDeletionQueuedWorker implements IWorker { const orgRepo = new OrganizationRepository(this.input.logger, this.input.db); const org = await orgRepo.byId(job.data.organizationId); if (!org) { - throw new Error('Organization not found'); + // The organization has already been deleted + return; } const organizationMembers = await orgRepo.getMembers({ organizationID: org.id }); @@ -125,7 +126,7 @@ export const createNotifyOrganizationDeletionQueuedWorker = (input: { (job) => new NotifyOrganizationDeletionQueuedWorker(input).handler(job), { connection: input.redisConnection, - concurrency: 10, + concurrency: 100, }, ); worker.on('stalled', (job) => { diff --git a/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts new file mode 100644 index 0000000000..84c28c5d05 --- /dev/null +++ b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts @@ -0,0 +1,231 @@ +import { ConnectionOptions, Job, JobsOptions, Queue, Worker } from 'bullmq'; +import { PostgresJsDatabase } from 'drizzle-orm/postgres-js'; +import pino from 'pino'; +import { addDays, startOfMonth, subDays } from 'date-fns'; +import { and, count, eq, gte, isNull, lt, or, sql } from 'drizzle-orm'; +import * as schema from '../../db/schema.js'; +import { OrganizationRepository } from '../repositories/OrganizationRepository.js'; +import Keycloak from '../services/Keycloak.js'; +import { IQueue, IWorker } from './Worker.js'; +import type { NotifyOrganizationDeletionQueuedQueue } from './NotifyOrganizationDeletionQueuedWorker.js'; +import type { DeleteOrganizationQueue } from './DeleteOrganizationWorker.js'; + +const QueueName = 'organization.queue-inactive-organizations-queue'; +const WorkerName = 'QueueInactiveOrganizationWorker'; + +// The number of days the organization needs to be inactive for before we consider it for deletion +const MIN_INACTIVITY_DAYS = 90; + +// How long should we wait before deleting the organization? +const DELAY_FOR_ORG_DELETION_IN_DAYS = 7; + +export interface QueueInactiveOrganizationsDeletionInput {} + +export class QueueInactiveOrganizationsDeletionQueue implements IQueue { + private readonly queue: Queue; + private readonly logger: pino.Logger; + + constructor(log: pino.Logger, conn: ConnectionOptions) { + this.logger = log.child({ queue: QueueName }); + this.queue = new Queue(QueueName, { + connection: conn, + defaultJobOptions: { + removeOnComplete: { + age: 90 * 86_400, + }, + removeOnFail: { + age: 90 * 86_400, + }, + attempts: 6, + backoff: { + type: 'exponential', + delay: 112_000, + }, + }, + }); + + this.queue.on('error', (err) => { + this.logger.error(err, 'Queue error'); + }); + } + + public addJob( + _job: QueueInactiveOrganizationsDeletionInput, + _opts?: Omit, + ): Promise> { + throw new Error('This method should not be called directly.'); + } + + public removeJob(_job: QueueInactiveOrganizationsDeletionInput) { + return Promise.resolve(0); + } + + public getJob(_job: QueueInactiveOrganizationsDeletionInput): Promise> { + throw new Error('This method should not be called directly.'); + } + + public scheduleJob() { + return this.queue.upsertJobScheduler( + WorkerName, + { + pattern: '0 0 * * * *', + }, + { + name: '', + data: {}, + opts: {}, + }, + ); + } +} + +class QueueInactiveOrganizationsDeletionWorker implements IWorker { + private readonly orgRepo: OrganizationRepository; + + constructor( + private input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + realm: string; + keycloak: Keycloak; + deleteOrganizationQueue: DeleteOrganizationQueue; + notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; + logger: pino.Logger; + }, + ) { + this.input.logger = input.logger.child({ worker: WorkerName }); + this.orgRepo = new OrganizationRepository(this.input.logger, this.input.db); + } + + public async handler(_job: Job) { + const now = new Date(); + const inactivityThreshold = startOfMonth(subDays(now, MIN_INACTIVITY_DAYS)); + const deletesAt = addDays(now, DELAY_FOR_ORG_DELETION_IN_DAYS); + + // Retrieve all the organizations that only have a single user + const orgsWithSingleUser = await this.retrieveOrganizationsWithSingleUser(inactivityThreshold); + if (orgsWithSingleUser.length === 0) { + this.input.logger.debug('No organizations with single user found'); + return; + } + + // Process all the organizations with a single user + await this.input.keycloak.authenticateClient(); + for (const org of orgsWithSingleUser) { + if (!org.userId) { + // Should never be the case but to prevent TypeScript from complaining, we still need to ensure + // that the value exists + continue; + } + + // First, we check whether the organization has had any activity registered in the audit logs in the + // last `MIN_INACTIVITY_DAYS` days + const auditLogs = await this.input.db + .select({ count: count() }) + .from(schema.auditLogs) + .where(and(eq(schema.auditLogs.organizationId, org.id), gte(schema.auditLogs.createdAt, inactivityThreshold))) + .execute(); + + if (auditLogs.length > 0 && auditLogs[0].count > 0) { + // The organization has had activity registered in the audit, at least once in the last `MIN_INACTIVITY_DAYS` days, + // so we don't need to consider it for deletion + // continue; + } + + // If the organization hasn't had any activity, we should check the last time the user logged in + try { + const userSessions = await this.input.keycloak.client.users.listSessions({ + id: org.userId, + realm: this.input.realm, + }); + + const numberOfSessionsRecentlyActive = userSessions.filter( + (sess) => (sess.lastAccess || sess.start) && new Date(sess.lastAccess || sess.start!) >= inactivityThreshold, + ).length; + + if (numberOfSessionsRecentlyActive > 0) { + // The user has been active at least once in the last `MIN_INACTIVITY_DAYS` days, so we don't need + // to consider it for deletion + continue; + } + } catch (error) { + // Failed to fetch the user sessions, skip for now + this.input.logger.error(error, 'Failed to retrieve user sessions'); + continue; + } + + // It seems like the organization (and the user) hasn't been active recently, flag the organization for deletion + this.input.logger.info(`Queuing organization "${org.slug}" for deletion at ${deletesAt.toISOString()}`); + await this.queueForDeletion(org.id, now, deletesAt); + } + } + + private async queueForDeletion(orgId: string, queuedAt: Date, deletesAt: Date) { + // Enqueue the organization deletion job + await this.orgRepo.queueOrganizationDeletion({ + organizationId: orgId, + queuedBy: undefined, + deleteOrganizationQueue: this.input.deleteOrganizationQueue, + deleteDelayInDays: DELAY_FOR_ORG_DELETION_IN_DAYS, + }); + + // Queue the organization deletion notification job + await this.input.notifyOrganizationDeletionQueuedQueue.addJob({ + organizationId: orgId, + queuedAt: Number(queuedAt), + deletesAt: Number(deletesAt), + }); + } + + private retrieveOrganizationsWithSingleUser(createdBefore: Date) { + return this.input.db + .select({ + id: schema.organizations.id, + slug: schema.organizations.slug, + userId: schema.organizations.createdBy, + plan: schema.organizationBilling.plan, + }) + .from(schema.organizations) + .innerJoin(schema.organizationsMembers, eq(schema.organizationsMembers.organizationId, schema.organizations.id)) + .leftJoin(schema.organizationBilling, eq(schema.organizationBilling.organizationId, schema.organizations.id)) + .where( + and( + isNull(schema.organizations.queuedForDeletionAt), + eq(schema.organizations.isDeactivated, false), + lt(schema.organizations.createdAt, createdBefore), + or(isNull(schema.organizationBilling.plan), eq(schema.organizationBilling.plan, 'developer')), + ), + ) + .groupBy(schema.organizations.id, schema.organizationBilling.plan) + .having(sql`COUNT(${schema.organizationsMembers.id}) = 1`) + .execute(); + } +} + +export function createQueueInactiveOrganizationsDeletionWorker(input: { + redisConnection: ConnectionOptions; + db: PostgresJsDatabase; + realm: string; + keycloak: Keycloak; + deleteOrganizationQueue: DeleteOrganizationQueue; + notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue; + logger: pino.Logger; +}) { + const log = input.logger.child({ worker: WorkerName }); + const worker = new Worker( + QueueName, + (job) => new QueueInactiveOrganizationsDeletionWorker(input).handler(job), + { + connection: input.redisConnection, + concurrency: 10, + }, + ); + + worker.on('stalled', (job) => { + log.warn({ jobId: job }, `Job stalled`); + }); + worker.on('error', (err) => { + log.error(err, 'Worker error'); + }); + return worker; +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 37700ad9ff..f738ba81cc 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -523,8 +523,8 @@ importers: specifier: ^4.5.0 version: 4.5.0(axios@1.12.2) bullmq: - specifier: ^5.10.0 - version: 5.10.0 + specifier: 5.66.4 + version: 5.66.4 cookie: specifier: ^0.7.2 version: 0.7.2 @@ -562,8 +562,8 @@ importers: specifier: 16.9.0 version: 16.9.0(patch_hash=hafdlc54qtxpqvetpefk646rly) ioredis: - specifier: ^5.4.1 - version: 5.4.1 + specifier: 5.8.2 + version: 5.8.2 isomorphic-dompurify: specifier: ^2.33.0 version: 2.33.0 @@ -4171,8 +4171,8 @@ packages: '@types/node': optional: true - '@ioredis/commands@1.2.0': - resolution: {integrity: sha512-Sx1pU8EM64o2BrqNpEO1CNLtKQwyhuXuqyfH7oGKCk+1a33d2r5saW8zNwm3j6BTExtjrv2BxTgzzkMwts6vGg==} + '@ioredis/commands@1.4.0': + resolution: {integrity: sha512-aFT2yemJJo+TZCmieA7qnYGQooOS7QfNmYrzGtsYd3g9j5iDP8AimYYAesf79ohjbLG12XxC4nG5DyEnC88AsQ==} '@isaacs/cliui@8.0.2': resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} @@ -7847,9 +7847,6 @@ packages: '@types/node-fetch@2.6.11': resolution: {integrity: sha512-24xFj9R5+rfQJLRyM56qh+wnVSYhyXC2tkoBndtY0U+vubqNsYXGjufB2nn8Q6gt0LrARwL6UBtMCSVCwl4B1g==} - '@types/node@18.19.130': - resolution: {integrity: sha512-GRaXQx6jGfL8sKfaIDD6OupbIHBr9jv7Jnaml9tB7l4v068PAOXqfcujMMo5PhbIs6ggR1XODELqahT2R8v0fg==} - '@types/node@18.19.21': resolution: {integrity: sha512-2Q2NeB6BmiTFQi4DHBzncSoq/cJMLDdhPaAoJFnFCyD9a8VPZRf7a1GAwp1Edb7ROaZc5Jz/tnZyL6EsWMRaqw==} @@ -8605,8 +8602,8 @@ packages: builtins@5.0.1: resolution: {integrity: sha512-qwVpFEHNfhYJIzNRBvd2C1kyo6jz3ZSMPyyuR47OPdiKWlbYnZNyDWuyR175qDnAJLiCo5fBBqPb3RiXgWlkOQ==} - bullmq@5.10.0: - resolution: {integrity: sha512-bu8lGvgXLwMhBrKeE0ntH+IWBZFVISJBm5njYtPL8u9Qm3AekEONinKRftiIP66r/Y45ljxg9dp9EahoJ6L9wA==} + bullmq@5.66.4: + resolution: {integrity: sha512-y2VRk2z7d1YNI2JQDD7iThoD0X/0iZZ3VEp8lqT5s5U0XDl9CIjXp1LQgmE9EKy6ReHtzmYXS1f328PnUbZGtQ==} bun-types@1.2.12: resolution: {integrity: sha512-tvWMx5vPqbRXgE8WUZI94iS1xAYs8bkqESR9cxBB1Wi+urvfTrF1uzuDgBHFAdO0+d2lmsbG3HmeKMvUyj6pWA==} @@ -10984,8 +10981,8 @@ packages: invariant@2.2.4: resolution: {integrity: sha512-phJfQVBuaJM5raOpJjSfkiD6BpbCE4Ns//LaXl6wGYtUBY83nWS6Rf9tXm2e8VaK60JEjYldbPif/A2B1C2gNA==} - ioredis@5.4.1: - resolution: {integrity: sha512-2YZsvl7jopIa1gaePkeMtd9rAcSjOOjPtpcLlOeusyO+XH2SK5ZcT+UCrElPP+WVIInh2TzeI4XW9ENaSLVVHA==} + ioredis@5.8.2: + resolution: {integrity: sha512-C6uC+kleiIMmjViJINWk80sOQw5lEzse1ZmvD+S/s8p8CWapftSaC+kocGTx6xrbrJ4WmYQGC08ffHLr6ToR6Q==} engines: {node: '>=12.22.0'} ip-address@9.0.5: @@ -12020,8 +12017,8 @@ packages: resolution: {integrity: sha512-SdzXp4kD/Qf8agZ9+iTu6eql0m3kWm1A2y1hkpTeVNENutaB0BwHlSvAIaMxwntmRUAUjon2V4L8Z/njd0Ct8A==} hasBin: true - msgpackr@1.10.1: - resolution: {integrity: sha512-r5VRLv9qouXuLiIBrLpl2d5ZvPt8svdQTl5/vMvE4nzDMyEX4sgW5yWhuBBj5UmgwOTWj8CIdSXn5sAfsHAWIQ==} + msgpackr@1.11.5: + resolution: {integrity: sha512-UjkUHN0yqp9RWKy0Lplhh+wlpdt9oQBYgULZOiFhV3VclSF1JnSQWZ5r9gORQlNYaUKQoR8itv7g7z1xDDuACA==} msw@2.2.11: resolution: {integrity: sha512-XtIoewF7XWLT0a39Ftkazt9PprBA1bxHZ4CSlomN74sCBJOJU2w5VwLmGlswwsOBhGoF7jovt6bxrSIESxA1KA==} @@ -15125,7 +15122,7 @@ snapshots: '@apm-js-collab/tracing-hooks@0.3.1': dependencies: '@apm-js-collab/code-transformer': 0.8.2 - debug: 4.3.7 + debug: 4.4.1 module-details-from-path: 1.0.4 transitivePeerDependencies: - supports-color @@ -16095,7 +16092,7 @@ snapshots: '@babel/traverse': 7.28.0 '@babel/types': 7.28.2 convert-source-map: 2.0.0 - debug: 4.3.7 + debug: 4.4.1 gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.1 @@ -16255,7 +16252,7 @@ snapshots: '@babel/helper-split-export-declaration': 7.22.6 '@babel/parser': 7.28.0 '@babel/types': 7.23.6 - debug: 4.3.7 + debug: 4.4.1 globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -16267,7 +16264,7 @@ snapshots: '@babel/parser': 7.28.0 '@babel/template': 7.27.2 '@babel/types': 7.28.2 - debug: 4.3.7 + debug: 4.4.1 globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -16280,7 +16277,7 @@ snapshots: '@babel/parser': 7.28.0 '@babel/template': 7.27.2 '@babel/types': 7.28.2 - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -18532,7 +18529,7 @@ snapshots: optionalDependencies: '@types/node': 22.17.0 - '@ioredis/commands@1.2.0': {} + '@ioredis/commands@1.4.0': {} '@isaacs/cliui@8.0.2': dependencies: @@ -23108,11 +23105,6 @@ snapshots: '@types/node': 20.12.12 form-data: 4.0.4 - '@types/node@18.19.130': - dependencies: - undici-types: 5.26.5 - optional: true - '@types/node@18.19.21': dependencies: undici-types: 5.26.5 @@ -23293,7 +23285,7 @@ snapshots: dependencies: '@typescript-eslint/typescript-estree': 5.62.0(typescript@5.5.2) '@typescript-eslint/utils': 5.62.0(eslint@8.57.1)(typescript@5.5.2) - debug: 4.3.7 + debug: 4.4.1 eslint: 8.57.1 tsutils: 3.21.0(typescript@5.5.2) optionalDependencies: @@ -23307,7 +23299,7 @@ snapshots: dependencies: '@typescript-eslint/types': 5.62.0 '@typescript-eslint/visitor-keys': 5.62.0 - debug: 4.3.7 + debug: 4.4.1 globby: 11.1.0 is-glob: 4.0.3 semver: 7.7.1 @@ -23339,7 +23331,7 @@ snapshots: '@typescript/vfs@1.5.0': dependencies: - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -24020,7 +24012,7 @@ snapshots: dependencies: bytes: 3.1.2 content-type: 1.0.5 - debug: 4.3.7 + debug: 4.4.1 http-errors: 2.0.0 iconv-lite: 0.7.1 on-finished: 2.4.1 @@ -24126,21 +24118,21 @@ snapshots: dependencies: semver: 7.7.1 - bullmq@5.10.0: + bullmq@5.66.4: dependencies: cron-parser: 4.9.0 - ioredis: 5.4.1 - msgpackr: 1.10.1 + ioredis: 5.8.2 + msgpackr: 1.11.5 node-abort-controller: 3.1.1 - semver: 7.7.1 - tslib: 2.6.2 - uuid: 9.0.1 + semver: 7.7.3 + tslib: 2.8.1 + uuid: 11.1.0 transitivePeerDependencies: - supports-color bun-types@1.2.12: dependencies: - '@types/node': 18.19.130 + '@types/node': 20.12.12 optional: true bun-types@1.2.3: @@ -25168,7 +25160,7 @@ snapshots: base64id: 2.0.0 cookie: 0.7.2 cors: 2.8.5 - debug: 4.3.7 + debug: 4.4.1 engine.io-parser: 5.2.3 ws: 8.17.1 transitivePeerDependencies: @@ -25567,7 +25559,7 @@ snapshots: eslint-import-resolver-node@0.3.7: dependencies: - debug: 4.3.7 + debug: 4.4.1 is-core-module: 2.12.1 resolve: 1.22.8 transitivePeerDependencies: @@ -25611,7 +25603,7 @@ snapshots: eslint-module-utils@2.8.0(@typescript-eslint/parser@5.62.0(eslint@8.57.1)(typescript@5.5.2))(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.57.1): dependencies: - debug: 4.3.7 + debug: 4.4.1 optionalDependencies: '@typescript-eslint/parser': 5.62.0(eslint@8.57.1)(typescript@5.5.2) eslint: 8.57.1 @@ -25622,7 +25614,7 @@ snapshots: eslint-module-utils@2.8.0(@typescript-eslint/parser@5.62.0(eslint@8.57.1)(typescript@5.5.2))(eslint-import-resolver-typescript@3.5.5)(eslint@8.57.1): dependencies: - debug: 4.3.7 + debug: 4.4.1 optionalDependencies: '@typescript-eslint/parser': 5.62.0(eslint@8.57.1)(typescript@5.5.2) eslint: 8.57.1 @@ -26153,7 +26145,7 @@ snapshots: finalhandler@2.1.0: dependencies: - debug: 4.3.7 + debug: 4.4.1 encodeurl: 2.0.0 escape-html: 1.0.3 on-finished: 2.4.1 @@ -26815,7 +26807,7 @@ snapshots: http-proxy-agent@7.0.2: dependencies: agent-base: 7.1.4 - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -26970,11 +26962,11 @@ snapshots: dependencies: loose-envify: 1.4.0 - ioredis@5.4.1: + ioredis@5.8.2: dependencies: - '@ioredis/commands': 1.2.0 + '@ioredis/commands': 1.4.0 cluster-key-slot: 1.1.2 - debug: 4.3.7 + debug: 4.4.1 denque: 2.1.0 lodash.defaults: 4.2.0 lodash.isarguments: 3.1.0 @@ -27252,7 +27244,7 @@ snapshots: jest-worker@27.5.1: dependencies: - '@types/node': 20.3.1 + '@types/node': 20.12.12 merge-stream: 2.0.0 supports-color: 10.2.0 @@ -28198,7 +28190,7 @@ snapshots: '@msgpackr-extract/msgpackr-extract-win32-x64': 3.0.2 optional: true - msgpackr@1.10.1: + msgpackr@1.11.5: optionalDependencies: msgpackr-extract: 3.0.2 @@ -29773,7 +29765,7 @@ snapshots: require-in-the-middle@7.5.2: dependencies: - debug: 4.3.7 + debug: 4.4.1 module-details-from-path: 1.0.3 resolve: 1.22.10 transitivePeerDependencies: @@ -29926,7 +29918,7 @@ snapshots: router@2.2.0: dependencies: - debug: 4.3.7 + debug: 4.4.1 depd: 2.0.0 is-promise: 4.0.0 parseurl: 1.3.3 @@ -30016,12 +30008,11 @@ snapshots: semver@7.7.1: {} - semver@7.7.3: - optional: true + semver@7.7.3: {} send@1.2.0: dependencies: - debug: 4.3.7 + debug: 4.4.1 encodeurl: 2.0.0 escape-html: 1.0.3 etag: 1.8.1 @@ -30199,7 +30190,7 @@ snapshots: socket.io-adapter@2.5.5: dependencies: - debug: 4.3.7 + debug: 4.4.1 ws: 8.17.1 transitivePeerDependencies: - bufferutil @@ -30209,7 +30200,7 @@ snapshots: socket.io-parser@4.2.4: dependencies: '@socket.io/component-emitter': 3.1.2 - debug: 4.3.7 + debug: 4.4.1 transitivePeerDependencies: - supports-color @@ -30218,7 +30209,7 @@ snapshots: accepts: 1.3.8 base64id: 2.0.0 cors: 2.8.5 - debug: 4.3.7 + debug: 4.4.1 engine.io: 6.6.4 socket.io-adapter: 2.5.5 socket.io-parser: 4.2.4 From b24bb91915f7b0783ca862b1c5148c735aaf9af3 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Tue, 6 Jan 2026 21:39:28 -0500 Subject: [PATCH 6/8] chore: fix test --- controlplane/test/test-util.ts | 4 ---- 1 file changed, 4 deletions(-) diff --git a/controlplane/test/test-util.ts b/controlplane/test/test-util.ts index b7493b2642..1b3fa8aa86 100644 --- a/controlplane/test/test-util.ts +++ b/controlplane/test/test-util.ts @@ -44,7 +44,6 @@ import { DeactivateOrganizationQueue } from '../src/core/workers/DeactivateOrgan import { DeleteUserQueue } from '../src/core/workers/DeleteUserQueue.js'; import { ReactivateOrganizationQueue } from '../src/core/workers/ReactivateOrganizationWorker.js'; import { DeleteOrganizationAuditLogsQueue } from '../src/core/workers/DeleteOrganizationAuditLogsWorker.js'; -import { NotifyOrganizationDeletionQueuedQueue } from "../src/core/workers/NotifyOrganizationDeletionQueuedWorker.js"; export const DEFAULT_ROUTER_URL = 'http://localhost:3002'; export const DEFAULT_SUBGRAPH_URL_ONE = 'http://localhost:4001'; @@ -152,7 +151,6 @@ export const SetupTest = async function ({ const deactivateOrganizationQueue = new DeactivateOrganizationQueue(log, server.redisForQueue); const deleteUserQueue = new DeleteUserQueue(log, server.redisForQueue); const reactivateOrganizationQueue = new ReactivateOrganizationQueue(log, server.redisForQueue); - const notifyOrganizationDeletionQueuedQueue = new NotifyOrganizationDeletionQueuedQueue(log, server.redisForQueue); const blobStorage = new InMemoryBlobStorage(); await server.register(fastifyConnectPlugin, { @@ -183,7 +181,6 @@ export const SetupTest = async function ({ deactivateOrganizationQueue, reactivateOrganizationQueue, deleteUserQueue, - notifyOrganizationDeletionQueuedQueue, }, }), }); @@ -416,7 +413,6 @@ export const SetupTest = async function ({ deactivateOrganizationQueue, deleteUserQueue, reactivateOrganizationQueue, - notifyOrganizationDeletionQueuedQueue, }, }; }; From 8ac01c097875340e81b2cf15346164cb9cbfb0a2 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Tue, 6 Jan 2026 21:42:22 -0500 Subject: [PATCH 7/8] chore: fix commented `continue` --- .../core/workers/QueueInactiveOrganizationsDeletionWorker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts index 84c28c5d05..b6766bd6f9 100644 --- a/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts +++ b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts @@ -129,7 +129,7 @@ class QueueInactiveOrganizationsDeletionWorker implements IWorker { if (auditLogs.length > 0 && auditLogs[0].count > 0) { // The organization has had activity registered in the audit, at least once in the last `MIN_INACTIVITY_DAYS` days, // so we don't need to consider it for deletion - // continue; + continue; } // If the organization hasn't had any activity, we should check the last time the user logged in From 499ec46c72093c08b0512cdfc157830083efbb65 Mon Sep 17 00:00:00 2001 From: Wilson Rivera Date: Wed, 7 Jan 2026 11:16:58 -0500 Subject: [PATCH 8/8] chore: update schedule to run once a month --- .../core/workers/QueueInactiveOrganizationsDeletionWorker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts index b6766bd6f9..4c5af664ee 100644 --- a/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts +++ b/controlplane/src/core/workers/QueueInactiveOrganizationsDeletionWorker.ts @@ -68,7 +68,7 @@ export class QueueInactiveOrganizationsDeletionQueue implements IQueue