Skip to content

Commit bbca2a6

Browse files
committed
feat: reconcile orphan objects from admin endpoint
1 parent b54c395 commit bbca2a6

File tree

33 files changed

+1745
-10
lines changed

33 files changed

+1745
-10
lines changed

src/admin-app.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import { Registry } from 'prom-client'
44

55
const build = (opts: FastifyServerOptions = {}, appInstance?: FastifyInstance): FastifyInstance => {
66
const app = fastify(opts)
7+
app.register(plugins.signals)
78
app.register(plugins.adminTenantId)
89
app.register(plugins.logRequest({ excludeUrls: ['/status', '/metrics', '/health'] }))
910
app.register(routes.tenants, { prefix: 'tenants' })
11+
app.register(routes.objects, { prefix: 'tenants' })
1012
app.register(routes.migrations, { prefix: 'migrations' })
1113
app.register(routes.s3Credentials, { prefix: 's3' })
1214

src/config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ type StorageConfigType = {
2626
storageS3ForcePathStyle?: boolean
2727
storageS3Region: string
2828
storageS3ClientTimeout: number
29+
storageS3BackupBucket?: string
2930
isMultitenant: boolean
3031
jwtSecret: string
3132
jwtAlgorithm: string
@@ -286,6 +287,7 @@ export function getConfig(options?: { reload?: boolean }): StorageConfigType {
286287
'true',
287288
storageS3Region: getOptionalConfigFromEnv('STORAGE_S3_REGION', 'REGION') as string,
288289
storageS3ClientTimeout: Number(getOptionalConfigFromEnv('STORAGE_S3_CLIENT_TIMEOUT') || `0`),
290+
storageS3BackupBucket: getOptionalConfigFromEnv('STORAGE_S3_BACKUP_BUCKET'),
289291

290292
// DB - Migrations
291293
dbAnonRole: getOptionalConfigFromEnv('DB_ANON_ROLE') || 'anon',

src/http/plugins/db.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ export const db = fastifyPlugin(
9292

9393
interface DbSuperUserPluginOptions {
9494
disableHostCheck?: boolean
95+
maxConnections?: number
9596
}
9697

9798
export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(
@@ -111,6 +112,7 @@ export const dbSuperUser = fastifyPlugin<DbSuperUserPluginOptions>(
111112
method: request.method,
112113
headers: request.headers,
113114
disableHostCheck: opts.disableHostCheck,
115+
maxConnections: opts.maxConnections,
114116
operation: () => request.operation?.type,
115117
})
116118
})

src/http/routes/admin/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
export { default as migrations } from './migrations'
22
export { default as tenants } from './tenants'
33
export { default as s3Credentials } from './s3'
4+
export { default as objects } from './objects'

src/http/routes/admin/objects.ts

Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
import { FastifyInstance, RequestGenericInterface } from 'fastify'
2+
import apiKey from '../../plugins/apikey'
3+
import { dbSuperUser, storage } from '../../plugins'
4+
import { ObjectScanner } from '@storage/scanner/scanner'
5+
import { FastifyReply } from 'fastify/types/reply'
6+
import { getConfig } from '../../../config'
7+
8+
const listOrphanedObjects = {
9+
description: 'List Orphaned Objects',
10+
params: {
11+
type: 'object',
12+
properties: {
13+
tenantId: { type: 'string' },
14+
bucketId: { type: 'string' },
15+
},
16+
required: ['tenantId', 'bucketId'],
17+
},
18+
query: {
19+
type: 'object',
20+
properties: {
21+
before: { type: 'string' },
22+
},
23+
},
24+
} as const
25+
26+
const syncOrphanedObjects = {
27+
description: 'Sync Orphaned Objects',
28+
params: {
29+
type: 'object',
30+
properties: {
31+
tenantId: { type: 'string' },
32+
bucketId: { type: 'string' },
33+
},
34+
required: ['tenantId', 'bucketId'],
35+
},
36+
body: {
37+
type: 'object',
38+
properties: {
39+
deleteDbKeys: { type: 'boolean' },
40+
deleteS3Keys: { type: 'boolean' },
41+
},
42+
},
43+
optional: ['deleteDbKeys', 'deleteS3Keys'],
44+
} as const
45+
46+
interface ListOrphanObjectsRequest extends RequestGenericInterface {
47+
Params: {
48+
tenantId: string
49+
bucketId: string
50+
}
51+
Querystring: {
52+
before?: string
53+
}
54+
}
55+
56+
interface SyncOrphanObjectsRequest extends RequestGenericInterface {
57+
Params: {
58+
tenantId: string
59+
bucketId: string
60+
}
61+
Body: {
62+
deleteDbKeys?: boolean
63+
deleteS3Keys?: boolean
64+
before?: string
65+
}
66+
}
67+
68+
const { storageS3BackupBucket } = getConfig()
69+
70+
export default async function routes(fastify: FastifyInstance) {
71+
fastify.register(apiKey)
72+
fastify.register(dbSuperUser, {
73+
disableHostCheck: true,
74+
maxConnections: 5,
75+
})
76+
fastify.register(storage)
77+
78+
fastify.get<ListOrphanObjectsRequest>(
79+
'/:tenantId/buckets/:bucketId/orphan-objects',
80+
{
81+
schema: listOrphanedObjects,
82+
},
83+
async (req, reply) => {
84+
const bucket = req.params.bucketId
85+
let before = req.query.before ? new Date(req.query.before as string) : undefined
86+
87+
if (before && isNaN(before.getTime())) {
88+
return reply.status(400).send({
89+
error: 'Invalid date format',
90+
})
91+
}
92+
if (!before) {
93+
before = new Date()
94+
before.setHours(before.getHours() - 1)
95+
}
96+
97+
const scanner = new ObjectScanner(req.storage)
98+
const orphanObjects = scanner.listOrphaned(bucket, {
99+
signal: req.signals.disconnect.signal,
100+
before: before,
101+
})
102+
103+
reply.header('Content-Type', 'application/json; charset=utf-8')
104+
105+
// Do not the connection time out, periodically send
106+
// a ping message to keep the connection alive
107+
const respPing = ping(reply)
108+
109+
try {
110+
for await (const result of orphanObjects) {
111+
if (result.value.length > 0) {
112+
respPing.update()
113+
reply.raw.write(
114+
JSON.stringify({
115+
...result,
116+
type: 'data',
117+
})
118+
)
119+
}
120+
}
121+
} catch (e) {
122+
throw e
123+
} finally {
124+
respPing.clear()
125+
reply.raw.end()
126+
}
127+
}
128+
)
129+
130+
fastify.delete<SyncOrphanObjectsRequest>(
131+
'/:tenantId/buckets/:bucketId/orphan-objects',
132+
{
133+
schema: syncOrphanedObjects,
134+
},
135+
async (req, reply) => {
136+
if (!req.body.deleteDbKeys && !req.body.deleteS3Keys) {
137+
return reply.status(400).send({
138+
error: 'At least one of deleteDbKeys or deleteS3Keys must be set to true',
139+
})
140+
}
141+
142+
if (!storageS3BackupBucket) {
143+
return reply.status(400).send({
144+
error: 'Backup bucket not configured',
145+
})
146+
}
147+
148+
const bucket = `${req.params.bucketId}`
149+
let before = req.body.before ? new Date(req.body.before as string) : undefined
150+
151+
if (!before) {
152+
before = new Date()
153+
before.setHours(before.getHours() - 1)
154+
}
155+
156+
const respPing = ping(reply)
157+
158+
try {
159+
const scanner = new ObjectScanner(req.storage)
160+
const result = scanner.deleteOrphans(bucket, {
161+
deleteDbKeys: req.body.deleteDbKeys,
162+
deleteS3Keys: req.body.deleteS3Keys,
163+
signal: req.signals.disconnect.signal,
164+
})
165+
166+
for await (const deleted of result) {
167+
respPing.update()
168+
reply.raw.write(
169+
JSON.stringify({
170+
...deleted,
171+
type: 'data',
172+
})
173+
)
174+
}
175+
} catch (e) {
176+
throw e
177+
} finally {
178+
respPing.clear()
179+
reply.raw.end()
180+
}
181+
}
182+
)
183+
}
184+
185+
// Occasionally write a ping message to the response stream
186+
function ping(reply: FastifyReply) {
187+
let lastSend = undefined as Date | undefined
188+
const clearPing = setInterval(() => {
189+
const fiveSecondsEarly = new Date()
190+
fiveSecondsEarly.setSeconds(fiveSecondsEarly.getSeconds() - 5)
191+
192+
if (!lastSend || (lastSend && lastSend < fiveSecondsEarly)) {
193+
lastSend = new Date()
194+
reply.raw.write(
195+
JSON.stringify({
196+
type: 'ping',
197+
})
198+
)
199+
}
200+
}, 1000 * 10)
201+
202+
return {
203+
clear: () => clearInterval(clearPing),
204+
update: () => {
205+
lastSend = new Date()
206+
},
207+
}
208+
}

src/internal/concurrency/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from './mutex'
22
export * from './async-abort-controller'
3+
export * from './merge-async-itertor'
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
type MergedYield<Gens extends Record<string, AsyncGenerator<any>>> = {
2+
[K in keyof Gens]: Gens[K] extends AsyncGenerator<infer V> ? { type: K; value: V } : never
3+
}[keyof Gens]
4+
5+
export async function* mergeAsyncGenerators<Gens extends Record<string, AsyncGenerator<any>>>(
6+
gens: Gens
7+
): AsyncGenerator<MergedYield<Gens>> {
8+
// Convert the input object into an array of [name, generator] tuples
9+
const entries = Object.entries(gens) as [keyof Gens, Gens[keyof Gens]][]
10+
11+
// Initialize an array to keep track of each generator's state
12+
const iterators = entries.map(([name, gen]) => ({
13+
name,
14+
iterator: gen[Symbol.asyncIterator](),
15+
done: false,
16+
}))
17+
18+
// Continue looping as long as at least one generator is not done
19+
while (iterators.some((it) => !it.done)) {
20+
// Prepare an array of promises to fetch the next value from each generator
21+
const nextPromises = iterators.map((it) =>
22+
it.done ? Promise.resolve({ done: true, value: undefined }) : it.iterator.next()
23+
)
24+
25+
// Await all the next() promises concurrently
26+
const results = await Promise.all(nextPromises)
27+
28+
// Iterate through the results and yield values with their corresponding names
29+
for (let i = 0; i < iterators.length; i++) {
30+
const it = iterators[i]
31+
const result = results[i]
32+
33+
if (!it.done && !result.done) {
34+
// Yield an object containing the generator's name and the yielded value
35+
yield { type: it.name, value: result.value } as MergedYield<Gens>
36+
}
37+
38+
if (!it.done && result.done) {
39+
// Mark the generator as done if it has no more values
40+
it.done = true
41+
}
42+
}
43+
}
44+
}

src/internal/database/client.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { ERRORS } from '@internal/errors'
66
interface ConnectionOptions {
77
host: string
88
tenantId: string
9+
maxConnections?: number
910
headers?: Record<string, string | undefined | string[]>
1011
method?: string
1112
path?: string

src/internal/database/connection.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export const searchPath = ['storage', 'public', 'extensions', ...dbSearchPath.sp
6969
export class TenantConnection {
7070
public readonly role: string
7171

72-
constructor(protected readonly pool: Knex, protected readonly options: TenantConnectionOptions) {
72+
constructor(public readonly pool: Knex, protected readonly options: TenantConnectionOptions) {
7373
this.role = options.user.payload.role || 'anon'
7474
}
7575

@@ -101,7 +101,9 @@ export class TenantConnection {
101101
searchPath: isExternalPool ? undefined : searchPath,
102102
pool: {
103103
min: 0,
104-
max: isExternalPool ? 1 : options.maxConnections || databaseMaxConnections,
104+
max: isExternalPool
105+
? options.maxConnections || 1
106+
: options.maxConnections || databaseMaxConnections,
105107
acquireTimeoutMillis: databaseConnectionTimeout,
106108
idleTimeoutMillis: isExternalPool
107109
? options.idleTimeoutMillis || 100

src/internal/queue/event.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,10 @@ export class Event<T extends Omit<BasePayload, '$version'>> {
8787
}
8888

8989
static batchSend<T extends Event<any>[]>(messages: T) {
90+
if (!pgQueueEnable) {
91+
return Promise.all(messages.map((message) => message.send()))
92+
}
93+
9094
return Queue.getInstance().insert(
9195
messages.map((message) => {
9296
const sendOptions = (this.getQueueOptions(message.payload) as PgBoss.JobInsert) || {}

0 commit comments

Comments
 (0)