Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4804e55
feat: create script to enqueue inactive organizations for deletion
wilsonrivera Dec 16, 2025
a0c978a
chore: update script and tests
wilsonrivera Dec 18, 2025
ea9ecda
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Dec 18, 2025
e82d01a
chore: linting and tests
wilsonrivera Dec 18, 2025
67c0a22
chore: remove non-null assertion
wilsonrivera Dec 18, 2025
9b98d21
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Dec 19, 2025
2394618
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Dec 22, 2025
df04df0
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 5, 2026
27dcdf9
chore: implement as organization cleanup as cron job
wilsonrivera Jan 7, 2026
d7cd912
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 7, 2026
b24bb91
chore: fix test
wilsonrivera Jan 7, 2026
e1c4068
Merge remote-tracking branch 'origin/wilson/eng-7753-delete-inactive-…
wilsonrivera Jan 7, 2026
8ac01c0
chore: fix commented `continue`
wilsonrivera Jan 7, 2026
4cf6c72
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 7, 2026
499ec46
chore: update schedule to run once a month
wilsonrivera Jan 7, 2026
d0b68fb
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 8, 2026
9911158
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 16, 2026
cee6641
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 20, 2026
70b005c
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 20, 2026
f709e35
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 23, 2026
0a2bfb8
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 216 additions & 0 deletions controlplane/src/bin/db-cleanup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import 'dotenv/config';

import process from 'node:process';
import { subDays, startOfMonth, addDays } from 'date-fns';
import postgres from 'postgres';
import { drizzle, PostgresJsDatabase } from 'drizzle-orm/postgres-js';
import { and, count, eq, gte, isNull, lt, or, sql } from 'drizzle-orm';
import { pino } from 'pino';
import { buildDatabaseConnectionConfig } from '../core/plugins/database.js';
import * as schema from '../db/schema.js';
import { DeleteOrganizationQueue } from '../core/workers/DeleteOrganizationWorker.js';
import { createRedisConnections } from '../core/plugins/redis.js';
import { OrganizationRepository } from '../core/repositories/OrganizationRepository.js';
import { NotifyOrganizationDeletionQueuedQueue } from '../core/workers/NotifyOrganizationDeletionQueuedWorker.js';
import { getConfig } from './get-config.js';

// Number of concurrent tasks. We'll allocate the same number of database connections + 1, so keep this number reasonable
const MAX_DEGREE_OF_PARALLELISM = 5;

// How many organizations to retrieve from the database to migrate in a transaction. This is used to not load
// all organizations at once and perform the migration in buckets
const ORGANIZATIONS_PER_BUCKET = 100;

// The number of days the organization needs to be inactive for before we consider it for deletion
const MIN_INACTIVITY_DAYS = 90;

// How long should we wait before deleting the organization?
const DELAY_FOR_ORG_DELETION_IN_DAYS = 7;

const { databaseConnectionUrl, databaseTlsCa, databaseTlsCert, databaseTlsKey, redis } = getConfig();

try {
const connectionConfig = await buildDatabaseConnectionConfig({
tls:
databaseTlsCa || databaseTlsCert || databaseTlsKey
? {
ca: databaseTlsCa,
cert: databaseTlsCert,
key: databaseTlsKey,
}
: undefined,
});
const queryConnection = postgres(databaseConnectionUrl, {
...connectionConfig,
max: MAX_DEGREE_OF_PARALLELISM + 1,
});

// Initialize the Redis connection
const { redisQueue, redisWorker } = await createRedisConnections({
host: redis.host!,
port: Number(redis.port),
password: redis.password,
tls: redis.tls,
});

await redisQueue.connect();
await redisWorker.connect();
await redisWorker.ping();
await redisQueue.ping();

try {
const logger = pino();
const db = drizzle(queryConnection, { schema: { ...schema } });

await queueOrganizationsForDeletion({
db,
deleteOrganizationQueue: new DeleteOrganizationQueue(logger, redisQueue),
notifyOrganizationDeletionQueuedQueue: new NotifyOrganizationDeletionQueuedQueue(logger, redisQueue),
});
} finally {
redisQueue.disconnect();
redisWorker.disconnect();

// Close the database connection
await queryConnection.end({
timeout: 1,
});
}

// eslint-disable-next-line unicorn/no-process-exit
process.exit(0);
} catch (err: any) {
console.error(err);
// eslint-disable-next-line unicorn/no-process-exit
process.exit(1);
}

function chunkArray<T>(data: T[]): T[][] {
// @ts-ignore
if (MAX_DEGREE_OF_PARALLELISM === 1) {
return [data];
}

const chunks: T[][] = [];
const organizationsPerChunk = Math.ceil(ORGANIZATIONS_PER_BUCKET / MAX_DEGREE_OF_PARALLELISM);
for (let i = 0; i < data.length; i += organizationsPerChunk) {
chunks.push(data.slice(i, i + organizationsPerChunk));
}

return chunks;
}

async function queueOrganizationsForDeletion({
db,
deleteOrganizationQueue,
notifyOrganizationDeletionQueuedQueue,
}: {
db: PostgresJsDatabase<typeof schema>;
deleteOrganizationQueue: DeleteOrganizationQueue;
notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue;
}) {
// First, retrieve all the organizations that only have a single user and have not had any activity registered in
// the audit log for the last `MIN_INACTIVITY_DAYS` days
const now = new Date();
const inactivityThreshold = startOfMonth(subDays(now, MIN_INACTIVITY_DAYS));

console.log(`Retrieving organizations...`);
const organizations = await db
.select({
id: schema.organizations.id,
slug: schema.organizations.slug,
userId: schema.organizations.createdBy,
plan: schema.organizationBilling.plan,
})
.from(schema.organizations)
.innerJoin(schema.organizationsMembers, eq(schema.organizationsMembers.organizationId, schema.organizations.id))
.leftJoin(schema.organizationBilling, eq(schema.organizationBilling.organizationId, schema.organizations.id))
.where(
and(
isNull(schema.organizations.queuedForDeletionAt),
eq(schema.organizations.isDeactivated, false),
lt(schema.organizations.createdAt, inactivityThreshold),
or(
isNull(schema.organizationBilling.plan),
eq(schema.organizationBilling.plan, 'developer'),
),
),
)
.groupBy(schema.organizations.id, schema.organizationBilling.plan)
.having(sql`COUNT(${schema.organizationsMembers.id}) = 1`)
.execute();

if (organizations.length === 0) {
console.log('No organizations found with a single member');
return;
}

console.log(`${organizations.length} organizations with a single member found`);

// Break the organizations in chunk so we can have some degree of parallelism
console.log('Processing organizations...');
await Promise.all(
chunkArray(organizations).map((chunk) =>
db.transaction((tx) => {
return processChunkOfOrganizations({
organizations: chunk,
db: tx,
inactivityThreshold,
deleteOrganizationQueue,
notifyOrganizationDeletionQueuedQueue,
});
}),
),
);

console.log('Done!');
}

async function processChunkOfOrganizations({
organizations,
db,
inactivityThreshold,
deleteOrganizationQueue,
notifyOrganizationDeletionQueuedQueue,
}: {
organizations: { id: string; slug: string; userId: string | null }[];
db: PostgresJsDatabase<typeof schema>;
inactivityThreshold: Date;
deleteOrganizationQueue: DeleteOrganizationQueue;
notifyOrganizationDeletionQueuedQueue: NotifyOrganizationDeletionQueuedQueue;
}) {
const queuedAt = new Date();
const deletesAt = addDays(queuedAt, DELAY_FOR_ORG_DELETION_IN_DAYS);

const orgRepo = new OrganizationRepository(pino(), db, undefined);
for (const org of organizations) {
const auditLogs = await db
.select({ count: count() })
.from(schema.auditLogs)
.where(and(eq(schema.auditLogs.organizationId, org.id), gte(schema.auditLogs.createdAt, inactivityThreshold)))
.execute();

if (auditLogs.length > 0 && auditLogs[0].count > 0) {
// The organization has had activity registered in the audit, at least once in the last `MIN_INACTIVITY_DAYS` days,
// so we don't need to consider it for deletion
continue;
}

console.log(`\tEnqueuing organization "${org.slug}" for deletion at ${deletesAt.toISOString()}...`);

// Enqueue the organization deletion job
await orgRepo.queueOrganizationDeletion({
organizationId: org.id,
queuedBy: undefined,
deleteOrganizationQueue,
deleteDelayInDays: DELAY_FOR_ORG_DELETION_IN_DAYS,
});

// Queue the organization deletion notification job
await notifyOrganizationDeletionQueuedQueue.addJob({
organizationId: org.id,
queuedAt: queuedAt.getTime(),
deletesAt: deletesAt.getTime(),
});
}
}
Loading
Loading