@@ -7,12 +7,12 @@ import { BasicPgClient, Migration } from 'postgres-migrations/dist/types'
77import { validateMigrationHashes } from 'postgres-migrations/dist/validation'
88import { runMigration } from 'postgres-migrations/dist/run-migration'
99import { searchPath } from '../connection'
10- import { getTenantConfig , listTenantsToMigrate } from '../tenant'
10+ import { getTenantConfig , TenantMigrationStatus } from '../tenant'
1111import { multitenantKnex } from '../multitenant-db'
1212import { ProgressiveMigrations } from './progressive'
1313import { RunMigrationsOnTenants } from '@storage/events'
1414import { ERRORS } from '@internal/errors'
15- import { DBMigration } from '@internal/database '
15+ import { DBMigration } from './types '
1616
1717const {
1818 multitenantDatabaseUrl,
@@ -81,6 +81,86 @@ export async function lastMigrationName() {
8181 return migrations [ migrations . length - 1 ] . name as keyof typeof DBMigration
8282}
8383
84+ /**
85+ * List all tenants that needs to have the migrations run
86+ */
87+ export async function * listTenantsToMigrate ( signal : AbortSignal ) {
88+ let lastCursor = 0
89+
90+ while ( true ) {
91+ if ( signal . aborted ) {
92+ break
93+ }
94+
95+ const migrationVersion = await lastMigrationName ( )
96+
97+ const data = await multitenantKnex
98+ . table < { id : string ; cursor_id : number } > ( 'tenants' )
99+ . select ( 'id' , 'cursor_id' )
100+ . where ( 'cursor_id' , '>' , lastCursor )
101+ . where ( ( builder ) => {
102+ builder
103+ . where ( ( whereBuilder ) => {
104+ whereBuilder
105+ . where ( 'migrations_version' , '!=' , migrationVersion )
106+ . whereNotIn ( 'migrations_status' , [
107+ TenantMigrationStatus . FAILED ,
108+ TenantMigrationStatus . FAILED_STALE ,
109+ ] )
110+ } )
111+ . orWhere ( 'migrations_status' , null )
112+ } )
113+ . orderBy ( 'cursor_id' , 'asc' )
114+ . limit ( 200 )
115+
116+ if ( data . length === 0 ) {
117+ break
118+ }
119+
120+ lastCursor = data [ data . length - 1 ] . cursor_id
121+ yield data . map ( ( tenant ) => tenant . id )
122+ }
123+ }
124+
125+ /**
126+ * Update tenant migration version and status
127+ * @param tenantId
128+ * @param options
129+ */
130+ export async function updateTenantMigrationsState (
131+ tenantId : string ,
132+ options ?: { state : TenantMigrationStatus }
133+ ) {
134+ const migrationVersion = await lastMigrationName ( )
135+ const state = options ?. state || TenantMigrationStatus . COMPLETED
136+ return multitenantKnex
137+ . table ( 'tenants' )
138+ . where ( 'id' , tenantId )
139+ . update ( {
140+ migrations_version : [
141+ TenantMigrationStatus . FAILED ,
142+ TenantMigrationStatus . FAILED_STALE ,
143+ ] . includes ( state )
144+ ? undefined
145+ : migrationVersion ,
146+ migrations_status : state ,
147+ } )
148+ }
149+
150+ /**
151+ * Determine if a tenant has the migrations up to date
152+ * @param tenantId
153+ */
154+ export async function areMigrationsUpToDate ( tenantId : string ) {
155+ const latestMigrationVersion = await lastMigrationName ( )
156+ const tenant = await getTenantConfig ( tenantId )
157+
158+ return (
159+ latestMigrationVersion === tenant . migrationVersion &&
160+ tenant . migrationStatus === TenantMigrationStatus . COMPLETED
161+ )
162+ }
163+
84164export async function hasMissingSyncMigration ( tenantId : string ) {
85165 const { migrationVersion, migrationStatus } = await getTenantConfig ( tenantId )
86166 const migrations = await loadMigrationFilesCached ( './migrations/tenant' )
0 commit comments