diff --git a/dev/docker-compose.min.yaml b/dev/docker-compose.min.yaml index fb5878dcb30..7e524c460e9 100644 --- a/dev/docker-compose.min.yaml +++ b/dev/docker-compose.min.yaml @@ -143,6 +143,7 @@ services: - STORAGE_CONFIG=${STORAGE_CONFIG} - MODEL_ENABLED=* - ACCOUNTS_URL=http://huly.local:3000 + - ACCOUNTS_DB_URL=${DB_CR_URL} - BRANDING_PATH=/var/cfg/branding.json - BACKUP_STORAGE=${BACKUP_STORAGE_CONFIG} - BACKUP_BUCKET=${BACKUP_BUCKET_NAME} diff --git a/dev/docker-compose.yaml b/dev/docker-compose.yaml index 0898467e1cb..b14f005522f 100644 --- a/dev/docker-compose.yaml +++ b/dev/docker-compose.yaml @@ -197,6 +197,7 @@ services: - STATS_URL=http://huly.local:4900 - STORAGE_CONFIG=${STORAGE_CONFIG} - ACCOUNTS_URL=http://huly.local:3000 + - ACCOUNTS_DB_URL=${DB_CR_URL} - BRANDING_PATH=/var/cfg/branding.json - BACKUP_STORAGE=${BACKUP_STORAGE_CONFIG} - BACKUP_BUCKET=${BACKUP_BUCKET_NAME} @@ -536,6 +537,7 @@ services: environment: - SECRET=secret - ACCOUNTS_URL=http://huly.local:3000 + - ACCOUNTS_DB_URL=${DB_CR_URL} - STATS_URL=http://huly.local:4900 - DB_URL=${DB_CR_URL} - REGION=cockroach diff --git a/dev/tool/src/index.ts b/dev/tool/src/index.ts index a4a62692a80..2df1a0733ad 100644 --- a/dev/tool/src/index.ts +++ b/dev/tool/src/index.ts @@ -916,7 +916,7 @@ export function devTool ( program .command('backup ') - .description('dump workspace transactions and minio resources') + .description('dump workspace transactions, blobs and accounts') .option('-i, --include ', 'A list of ; separated domain names to include during backup', '*') .option('-s, --skip ', 'A list of ; separated domain names to skip during backup', '') .option('--full', 'Full recheck', false) @@ -929,6 +929,7 @@ export function devTool ( .option('-f, --force', 'Force backup', false) .option('-t, --timeout ', 'Connect timeout in seconds', '30') .option('-k, --keepSnapshots ', 'Keep snapshots for days', '14') + .option('-fv, --fullVerify', 'Full verification', false) .action( async ( dirName: string, @@ -942,6 +943,7 @@ export function devTool ( contentTypes: string full: boolean keepSnapshots: string + fullVerify: boolean } ) => { const storage = await createFileBackupStorage(dirName) @@ -980,7 +982,7 @@ export function devTool ( return } - await backup(toolCtx, pipeline, wsIds, storage, { + await backup(toolCtx, pipeline, wsIds, storage, db, { force: cmd.force, include: cmd.include === '*' ? undefined : new Set(cmd.include.split(';').map((it) => it.trim())), skipDomains: (cmd.skip ?? '').split(';').map((it) => it.trim()), @@ -991,7 +993,8 @@ export function devTool ( .split(';') .map((it) => it.trim()) .filter((it) => it.length > 0), - keepSnapshots: parseInt(cmd.keepSnapshots) + keepSnapshots: parseInt(cmd.keepSnapshots), + fullVerify: cmd.fullVerify }) } catch (err: any) { toolCtx.error('Failed to backup workspace', { err, workspace }) diff --git a/qms-tests/docker-compose.yaml b/qms-tests/docker-compose.yaml index d3958422274..0fbb8f8cd9e 100644 --- a/qms-tests/docker-compose.yaml +++ b/qms-tests/docker-compose.yaml @@ -122,6 +122,7 @@ services: - TRANSACTOR_URL=ws://transactor:3334;ws://huly.local:3334 - STORAGE_CONFIG=${STORAGE_CONFIG} - ACCOUNTS_URL=http://account:3003 + - ACCOUNTS_DB_URL=${DB_PG_URL} - BRANDING_PATH=/var/cfg/branding.json restart: unless-stopped front: diff --git a/server/backup-service/src/config.ts b/server/backup-service/src/config.ts index 6946da05112..e3b511ed87f 100644 --- a/server/backup-service/src/config.ts +++ b/server/backup-service/src/config.ts @@ -17,6 +17,8 @@ import { type BackupConfig } from '@hcengineering/server-backup' interface Config extends Omit { AccountsURL: string + AccountsDbURL: string + AccountsDbNS: string ServiceID: string Secret: string @@ -36,6 +38,8 @@ interface Config extends Omit { const envMap: { [key in keyof Config]: string } = { AccountsURL: 'ACCOUNTS_URL', + AccountsDbURL: 'ACCOUNTS_DB_URL', + AccountsDbNS: 'ACCOUNTS_NS', ServiceID: 'SERVICE_ID', Secret: 'SECRET', BucketName: 'BUCKET_NAME', @@ -53,6 +57,7 @@ const envMap: { [key in keyof Config]: string } = { const required: Array = [ 'AccountsURL', + 'AccountsDbURL', 'Secret', 'ServiceID', 'BucketName', @@ -64,6 +69,8 @@ const required: Array = [ export const config: () => Config = () => { const params: Partial = { AccountsURL: process.env[envMap.AccountsURL], + AccountsDbURL: process.env[envMap.AccountsDbURL], + AccountsDbNS: process.env[envMap.AccountsDbNS], Secret: process.env[envMap.Secret], BucketName: process.env[envMap.BucketName] ?? 'backups', ServiceID: process.env[envMap.ServiceID] ?? 'backup-service', diff --git a/server/backup/package.json b/server/backup/package.json index e563fd5b3f6..b184bc2f8ba 100644 --- a/server/backup/package.json +++ b/server/backup/package.json @@ -42,7 +42,9 @@ "dependencies": { "@hcengineering/platform": "^0.6.11", "@hcengineering/core": "^0.6.32", + "@hcengineering/account": "^0.6.0", "@hcengineering/contact": "^0.6.24", + "@hcengineering/model-contact": "^0.6.1", "@hcengineering/client-resources": "^0.6.27", "@hcengineering/client": "^0.6.18", "@hcengineering/model": "^0.6.11", diff --git a/server/backup/src/backup.ts b/server/backup/src/backup.ts index 67a9d23d841..1bb041abba8 100644 --- a/server/backup/src/backup.ts +++ b/server/backup/src/backup.ts @@ -23,6 +23,7 @@ import core, { DOMAIN_TRANSIENT, DOMAIN_TX, MeasureContext, + PersonUuid, Ref, SortingOrder, toIdMap, @@ -33,6 +34,9 @@ import core, { type TxCUD, type WorkspaceIds } from '@hcengineering/core' +import { type Person as GlobalPerson, type SocialId, type AccountDB } from '@hcengineering/account' +import contact, { type Person, type SocialIdentity, type SocialIdentityRef } from '@hcengineering/contact' +import { DOMAIN_CHANNEL, DOMAIN_CONTACT } from '@hcengineering/model-contact' import { BlobClient } from '@hcengineering/server-client' import { BackupClientOps, createDummyStorageAdapter, estimateDocSize, type Pipeline } from '@hcengineering/server-core' import { deepEqual } from 'fast-equals' @@ -45,14 +49,25 @@ import { join } from 'path' import { Pack, pack } from 'tar-stream' import { gunzipSync, gzipSync } from 'zlib' import { BackupStorage } from './storage' -import { type BackupInfo, type BackupResult, type BackupSnapshot, type DomainData, type Snapshot } from './types' +import { + BackupDocId, + type BackupInfo, + type BackupResult, + type BackupSnapshot, + type DomainData, + type Snapshot +} from './types' import { checkBackupIntegrity, + chunkArray, compactBackup, doTrimHash, extendZero, + getObjectHash, + isAccountDomain, loadDigest, rebuildSizeInfo, + toAccountDomain, verifyDocsFromSnapshot, writeChanges } from './utils' @@ -71,6 +86,7 @@ export async function backup ( pipeline: Pipeline, wsIds: WorkspaceIds, storage: BackupStorage, + accountDb: AccountDB, options: { include?: Set skipDomains: string[] @@ -129,7 +145,7 @@ export async function backup ( const tmpRoot = mkdtempSync('huly') - const forcedFullCheck = '3' + const forcedFullCheck = '4' try { let backupInfo: BackupInfo = { @@ -147,6 +163,8 @@ export async function backup ( } const blobInfo: Record = {} + const affectedPersons = new Set() + const affectedSocialIds = new Set() // Version 0.6.2, format of digest file is changed to @@ -183,11 +201,6 @@ export async function backup ( backupInfo.domainHashes = {} } - if (backupInfo.domainHashes === undefined) { - // Migration - backupInfo.domainHashes = {} - } - let fullCheck = options.fullVerify === true if (backupInfo.migrations.forcedFullCheck !== forcedFullCheck) { @@ -212,6 +225,7 @@ export async function backup ( ctx.warn('starting backup', { workspace: workspaceId }) + let skipWorkspaceDomains = false if (!fullCheck) { lastTx = ( await pipeline.findAll( @@ -223,37 +237,41 @@ export async function backup ( ).shift() if (lastTx !== undefined) { if (lastTx._id === backupInfo.lastTxId && !options.force) { - ctx.info('No transaction changes. Skipping backup.', { workspace: workspaceId }) - result.result = false - return result + ctx.info('No transaction changes. Skipping workspace domains backup.', { workspace: workspaceId }) + skipWorkspaceDomains = true } } } const blobClient = new BlobClient(pipeline.context.storageAdapter ?? createDummyStorageAdapter(), wsIds) - - const domains = [ - DOMAIN_BLOB, - DOMAIN_MODEL_TX, - DOMAIN_TX, - ...pipeline.context.hierarchy - .domains() - .filter( - (it) => - it !== DOMAIN_TRANSIENT && - it !== DOMAIN_MODEL && - it !== DOMAIN_MODEL_TX && - it !== DOMAIN_TX && - it !== DOMAIN_BLOB && - it !== ('fulltext-blob' as Domain) && - !options.skipDomains.includes(it) && - (options.include === undefined || options.include.has(it)) - ) - ] + const accountDomains = [toAccountDomain('person'), toAccountDomain('socialId')] + const domains = skipWorkspaceDomains + ? accountDomains + : [ + DOMAIN_BLOB, + DOMAIN_MODEL_TX, + DOMAIN_TX, + ...pipeline.context.hierarchy + .domains() + .filter( + (it) => + it !== DOMAIN_TRANSIENT && + it !== DOMAIN_MODEL && + it !== DOMAIN_MODEL_TX && + it !== DOMAIN_TX && + it !== DOMAIN_BLOB && + it !== ('fulltext-blob' as Domain) && + !options.skipDomains.includes(it) && + (options.include === undefined || options.include.has(it)) + ), + ...accountDomains + ] ctx.info('domains for dump', { domains: domains.length, workspace: workspaceId, url: wsIds.url }) - backupInfo.lastTxId = '' // Clear until full backup will be complete + if (!skipWorkspaceDomains) { + backupInfo.lastTxId = '' // Clear until full backup will be complete + } const recheckSizes: string[] = [] @@ -298,7 +316,7 @@ export async function backup ( async function loadChangesFromServer ( ctx: MeasureContext, domain: Domain, - digest: Map, string>, + digest: Map, changes: Snapshot, same: Map, string> ): Promise<{ changed: number, needRetrieveChunks: RetriavableChunks[] }> { @@ -321,6 +339,7 @@ export async function backup ( } } } + while (true) { try { const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(ctx, domain, idx)) @@ -817,6 +836,18 @@ export async function backup ( processChanges(d, blobFiled) } else { + // Remember changes of Persons and SocialIdentities + // to process them later in account domains + if (domain === DOMAIN_CONTACT && d._class === contact.class.Person) { + const person = d as Person + if (person.personUuid !== undefined) { + affectedPersons.add(person.personUuid) + } + } else if (domain === DOMAIN_CHANNEL && d._class === contact.class.SocialIdentity) { + const sid = d as SocialIdentity + affectedSocialIds.add(sid._id) + } + const data = JSON.stringify(d) await new Promise((resolve, reject) => { _pack?.entry({ name: d._id + '.json' }, data, function (err) { @@ -860,7 +891,257 @@ export async function backup ( } } + async function processAccountDomain ( + ctx: MeasureContext, + domain: Domain, + progress: (value: number) => Promise + ): Promise { + const isPersonDomain = domain === toAccountDomain('person') + let collection: 'person' | 'socialId' + let key: 'uuid' | '_id' + let getObjKey: (obj: any) => string + let affectedObjects: Set + + if (isPersonDomain) { + collection = 'person' + key = 'uuid' + getObjKey = (obj: GlobalPerson) => obj.uuid + + if (fullCheck) { + let idx: number | undefined + while (true) { + const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(ctx, DOMAIN_CONTACT, idx)) + idx = currentChunk.idx + const chuckDocs = await connection.loadDocs( + ctx, + DOMAIN_CONTACT, + currentChunk.docs.map((it) => it.id) as Ref[] + ) + for (const doc of chuckDocs) { + if (doc._class === contact.class.Person) { + const person = doc as Person + if (person.personUuid !== undefined) { + affectedPersons.add(person.personUuid) + } + } + } + if (currentChunk.finished) { + break + } + } + } + affectedObjects = affectedPersons + } else { + collection = 'socialId' + key = '_id' + getObjKey = (obj: SocialId) => obj._id + + if (fullCheck) { + let idx: number | undefined + while (true) { + const currentChunk = await ctx.with('loadChunk', {}, () => connection.loadChunk(ctx, DOMAIN_CHANNEL, idx)) + idx = currentChunk.idx + const chuckDocs = await connection.loadDocs( + ctx, + DOMAIN_CHANNEL, + currentChunk.docs.map((it) => it.id) as Ref[] + ) + for (const doc of chuckDocs) { + if (doc._class === contact.class.SocialIdentity) { + const sid = doc as SocialIdentity + affectedSocialIds.add(sid._id) + } + } + if (currentChunk.finished) { + break + } + } + } + affectedObjects = affectedSocialIds + } + + const processedChanges: Snapshot = { + added: new Map(), + updated: new Map(), + removed: [] + } + + let stIndex = 0 + let snapshotIndex = 0 + const domainInfo: DomainData = { + snapshots: [], + storage: [], + added: 0, + updated: 0, + removed: 0 + } + + // Load cumulative digest from existing snapshots + const digest = await ctx.with('load-digest', {}, (ctx) => + loadDigest(ctx, storage, backupInfo.snapshots, domain, undefined, options.msg) + ) + + let _pack: Pack | undefined + let _packClose = async (): Promise => {} + let addedDocuments = (): number => 0 + let changed = false + + if (progress !== undefined) { + await progress(0) + } + + // 1. We need to include global records based on persons/socialIdentities info which are missing in digest + // 2. We need to check updates for all records present in digest + const batchSize = 1000 + const toLoad = new Set([...digest.keys(), ...affectedObjects]) as Set + if (toLoad.size === 0) { + ctx.info('No records updates') + return + } + + const toLoadSorted = Array.from(toLoad).sort() + const chunks = chunkArray(toLoadSorted, batchSize) + for (const chunk of chunks) { + const objs = await accountDb[collection].find({ + [key]: { $in: chunk, $gte: chunk[0], $lte: chunk[chunk.length - 1] } + }) + for (const obj of objs) { + // check if existing package need to be dumped + if (addedDocuments() > dataBlobSize && _pack !== undefined) { + await _packClose() + + try { + global.gc?.() + } catch (err) {} + + snapshot.domains[domain] = domainInfo + domainInfo.added += processedChanges.added.size + domainInfo.updated += processedChanges.updated.size + domainInfo.removed += processedChanges.removed.length + + snapshotIndex++ + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${extendZero(snapshotIndex)}.snp.gz`) + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + + processedChanges.added.clear() + processedChanges.removed = [] + processedChanges.updated.clear() + changed = false + domainChanges++ + + await storage.writeFile( + infoFile, + gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel, memLevel: 9 }) + ) + } + + // prepare new snapshot package if needed + if (_pack === undefined) { + _pack = pack() + stIndex++ + const storageFile = join(backupIndex, `${domain}-data-${snapshot.date}-${extendZero(stIndex)}.tar.gz`) + domainInfo.storage = [...(domainInfo.storage ?? []), storageFile] + const tmpFile = join(tmpRoot, basename(storageFile) + '.tmp') + const tempFile = createWriteStream(tmpFile) + // const dataStream = await storage.write(storageFile) + + const sizePass = new PassThrough() + let sz = 0 + sizePass._transform = (chunk, encoding, cb) => { + // No transformation, just pass through data + sz += chunk.length + sizePass.push(chunk) + cb() + } + + sizePass.pipe(tempFile) + + const storageZip = createGzip({ level: defaultLevel, memLevel: 9 }) + addedDocuments = () => sz + _pack.pipe(storageZip) + storageZip.pipe(sizePass) + + _packClose = async () => { + await new Promise((resolve) => { + tempFile.on('close', () => { + resolve() + }) + _pack?.finalize() + }) + + // We need to upload file to storage + ctx.info('>>>> upload pack', { storageFile, size: sz, url: wsIds.url, workspace: workspaceId }) + await storage.writeFile(storageFile, createReadStream(tmpFile)) + await rm(tmpFile) + + _pack = undefined + } + } + + // return early if canceled + if (canceled()) { + return + } + + // add new document file to the snapshot package if needed + const newHash = getObjectHash(obj) + const objKey = getObjKey(obj) + let include = false + + if (!digest.has(objKey)) { + // new person + processedChanges.added.set(objKey, newHash) + include = true + } else { + const oldHash = digest.get(objKey) + + if (oldHash !== newHash) { + // updated person + processedChanges.updated.set(objKey, newHash) + include = true + } + } + + if (include) { + const data = JSON.stringify(obj) + await new Promise((resolve, reject) => { + _pack?.entry({ name: getObjKey(obj) + '.json' }, data, function (err) { + if (err != null) reject(err) + resolve() + }) + }) + changed = true + } + } + } + + if (changed && _pack !== undefined) { + domainInfo.added += processedChanges.added.size + domainInfo.updated += processedChanges.updated.size + domainInfo.removed += processedChanges.removed.length + if (domainInfo.added + domainInfo.updated + domainInfo.removed > 0) { + snapshot.domains[domain] = domainInfo + + snapshotIndex++ + const snapshotFile = join(backupIndex, `${domain}-${snapshot.date}-${extendZero(snapshotIndex)}.snp.gz`) + domainInfo.snapshots = [...(domainInfo.snapshots ?? []), snapshotFile] + await writeChanges(storage, snapshotFile, processedChanges) + } + + processedChanges.added.clear() + processedChanges.removed = [] + processedChanges.updated.clear() + changed = false + await _packClose() + domainChanges++ + // This will allow to retry in case of critical error. + await storage.writeFile(infoFile, gzipSync(JSON.stringify(backupInfo, undefined, 2), { level: defaultLevel })) + } + } + let domainProgress = 0 + for (const domain of domains) { if (canceled()) { break @@ -877,8 +1158,9 @@ export async function backup ( if (mm.old > mm.current + mm.current / 10) { ctx.info('memory-stats', { ...mm, workspace: workspaceId }) } + const doProcessDomain = isAccountDomain(domain) ? processAccountDomain : processDomain await ctx.with('process-domain', { domain }, async (ctx) => { - await processDomain( + await doProcessDomain( ctx, domain, (value) => diff --git a/server/backup/src/restore.ts b/server/backup/src/restore.ts index dcefb07b4a5..d441dba9c7b 100644 --- a/server/backup/src/restore.ts +++ b/server/backup/src/restore.ts @@ -37,7 +37,7 @@ import { extract } from 'tar-stream' import { createGunzip, gunzipSync } from 'zlib' import { BackupStorage } from './storage' import type { BackupInfo } from './types' -import { doTrimHash, loadDigest, migradeBlobData } from './utils' +import { doTrimHash, isAccountDomain, loadDigest, migradeBlobData } from './utils' export * from './storage' const dataUploadSize = 2 * 1024 * 1024 @@ -147,7 +147,7 @@ export async function restore ( ctx.info('no changes in domain', { domain: c }) return } - const changeset = await loadDigest(ctx, storage, snapshots, c, opt.date) + const changeset = (await loadDigest(ctx, storage, snapshots, c, opt.date)) as Map, string> // We need to load full changeset from server const serverChangeset = new Map, string>() @@ -299,7 +299,7 @@ export async function restore ( const d = s.domains[c] if (d !== undefined && docsToAdd.size > 0) { - const sDigest = await loadDigest(ctx, storage, [s], c) + const sDigest = (await loadDigest(ctx, storage, [s], c)) as Map, string> const requiredDocs = new Map(Array.from(sDigest.entries()).filter(([it]) => docsToAdd.has(it))) let lastSendTime = Date.now() @@ -487,6 +487,10 @@ export async function restore ( } } + async function processAccountDomain (c: Domain): Promise { + // TODO + } + const limiter = new RateLimiter(opt.parallel ?? 1) try { @@ -508,7 +512,8 @@ export async function restore ( while (retry > 0) { retry-- try { - await processDomain(c) + const doProcessDomain = isAccountDomain(c) ? processAccountDomain : processDomain + await doProcessDomain(c) if (delay > 1) { ctx.warn('retry-success', { retry, delay, workspaceId }) } diff --git a/server/backup/src/service.ts b/server/backup/src/service.ts index c48f3e28740..3f4e57f4529 100644 --- a/server/backup/src/service.ts +++ b/server/backup/src/service.ts @@ -28,6 +28,7 @@ import { type WorkspaceIds, type WorkspaceInfoWithStatus } from '@hcengineering/core' +import { getAccountDB } from '@hcengineering/account' import { getAccountClient } from '@hcengineering/server-client' import { type DbConfiguration, @@ -43,6 +44,8 @@ import { restore } from './restore' export interface BackupConfig { AccountsURL: string + AccountsDbURL: string + AccountsDbNS?: string Token: string Interval: number // Timeout in seconds @@ -320,27 +323,33 @@ class BackupWorker { if (pipeline === undefined) { throw new Error('Pipeline is undefined, cannot proceed with backup') } + const [accountDB, closeAccountDB] = await getAccountDB(this.config.AccountsDbURL, this.config.AccountsDbNS) const result = await ctx.with( 'backup', {}, - (ctx) => - backup(ctx, pipeline as Pipeline, wsIds, storage, { - skipDomains: this.skipDomains, - force: true, - timeout: this.config.Timeout * 1000, - connectTimeout: 5 * 60 * 1000, // 5 minutes to, - keepSnapshots: this.config.KeepSnapshots, - blobDownloadLimit: this.downloadLimit, - skipBlobContentTypes: ['video/', 'audio/', 'image/'], - fullVerify: this.fullCheck, - progress: (progress) => { - return notify?.(progress) ?? Promise.resolve() - }, - msg: { - workspaceUrl: ws.url, - workspaceUuid: ws.uuid - } - }), + async (ctx) => { + try { + return await backup(ctx, pipeline as Pipeline, wsIds, storage, accountDB, { + skipDomains: this.skipDomains, + force: true, + timeout: this.config.Timeout * 1000, + connectTimeout: 5 * 60 * 1000, // 5 minutes to, + keepSnapshots: this.config.KeepSnapshots, + blobDownloadLimit: this.downloadLimit, + skipBlobContentTypes: ['video/', 'audio/', 'image/'], + fullVerify: this.fullCheck, + progress: (progress) => { + return notify?.(progress) ?? Promise.resolve() + }, + msg: { + workspaceUrl: ws.url, + workspaceUuid: ws.uuid + } + }) + } finally { + closeAccountDB() + } + }, { workspace: ws.uuid, url: ws.url } ) diff --git a/server/backup/src/types.ts b/server/backup/src/types.ts index bc1caddb934..964f1d6e0e4 100644 --- a/server/backup/src/types.ts +++ b/server/backup/src/types.ts @@ -29,22 +29,24 @@ export interface BlobData extends Doc { base64Data: string // base64 encoded data } +export type BackupDocId = Ref | string + /** * @public */ export interface Snapshot { - added: Map, string> - updated: Map, string> - removed: Ref[] + added: Map + updated: Map + removed: BackupDocId[] } /** * @public */ export interface SnapshotV6 { - added: Record, string> - updated: Record, string> - removed: Ref[] + added: Record + updated: Record + removed: BackupDocId[] } /** diff --git a/server/backup/src/utils.ts b/server/backup/src/utils.ts index 57ee384c1ec..3a1f36aec21 100644 --- a/server/backup/src/utils.ts +++ b/server/backup/src/utils.ts @@ -21,6 +21,7 @@ import core, { MeasureContext, MeasureMetricsContext, Ref, + type Space, type Blob } from '@hcengineering/core' import { @@ -33,6 +34,7 @@ import { statSync, writeFileSync } from 'node:fs' +import { createHash } from 'node:crypto' import { rm } from 'node:fs/promises' import { basename, dirname } from 'node:path' import { PassThrough, type Writable } from 'node:stream' @@ -41,7 +43,16 @@ import { join } from 'path' import { extract, Pack, pack } from 'tar-stream' import { createGunzip, gunzipSync, gzipSync } from 'zlib' import { BackupStorage } from './storage' -import type { BackupInfo, BackupResult, BackupSnapshot, BlobData, DomainData, Snapshot, SnapshotV6 } from './types' +import type { + BackupDocId, + BackupInfo, + BackupResult, + BackupSnapshot, + BlobData, + DomainData, + Snapshot, + SnapshotV6 +} from './types' export * from './storage' const dataBlobSize = 250 * 1024 * 1024 @@ -926,8 +937,8 @@ export async function loadDigest ( domain: Domain, date?: number, msg?: Record -): Promise, string>> { - const result = new Map, string>() +): Promise> { + const result = new Map() for (const s of snapshots) { const d = s.domains[domain] @@ -944,7 +955,7 @@ export async function loadDigest ( result.set(k as Ref, v) } for (const d of dChanges.removed) { - result.delete(d) + result.delete(d as Ref>) } } catch (err: any) { ctx.warn('failed to load digest', { snapshot: d.snapshot, ...(msg ?? {}) }) @@ -1233,7 +1244,7 @@ export async function verifyDocsFromSnapshot ( d: DomainData, s: BackupSnapshot, storage: BackupStorage, - digest: Map, string>, + digest: Map, verify: (docs: Doc[]) => Promise, chunkSize: number ): Promise<{ modified: boolean, modifiedFiles: string[] }> { @@ -1410,3 +1421,28 @@ export async function rebuildSizeInfo ( await storage.writeFile(sizeFile, gzipSync(JSON.stringify(sizeInfo, undefined, 2), { level: defaultLevel })) } + +export function chunkArray (array: T[], chunkSize: number): T[][] { + const chunks: T[][] = [] + for (let i = 0; i < array.length; i += chunkSize) { + chunks.push(array.slice(i, i + chunkSize)) + } + return chunks +} + +// Using sha1 as we don't need security critical but faster hash +export function getObjectHash (obj: Record): string { + const h = createHash('sha1') + h.update(JSON.stringify(obj)) + return h.digest('hex') +} + +const accountPrefix = 'account.' + +export function toAccountDomain (domain: string): Domain { + return `${accountPrefix}${domain}` as Domain +} + +export function isAccountDomain (domain: Domain): boolean { + return domain.startsWith(accountPrefix) +} diff --git a/server/workspace-service/src/index.ts b/server/workspace-service/src/index.ts index 77d9070ef1b..38f70915f75 100644 --- a/server/workspace-service/src/index.ts +++ b/server/workspace-service/src/index.ts @@ -95,6 +95,12 @@ export function serveWorkspaceAccount ( } setMetadata(serverClientPlugin.metadata.Endpoint, accountUri) + const accountDbUrl = process.env.ACCOUNTS_DB_URL + if (accountDbUrl === undefined) { + console.log('Please provide account db url') + process.exit(1) + } + const serverSecret = process.env.SERVER_SECRET if (serverSecret === undefined) { console.log('Please provide server secret') @@ -135,7 +141,8 @@ export function serveWorkspaceAccount ( wsOperation, brandings, fulltextUrl, - accountUri + accountUri, + accountDbUrl ) void worker diff --git a/server/workspace-service/src/service.ts b/server/workspace-service/src/service.ts index 7eda634038c..00a9e669deb 100644 --- a/server/workspace-service/src/service.ts +++ b/server/workspace-service/src/service.ts @@ -119,7 +119,8 @@ export class WorkspaceWorker { readonly operation: WorkspaceOperation, readonly brandings: BrandingMap, readonly fulltextUrl: string | undefined, - readonly accountsUrl: string + readonly accountsUrl: string, + readonly accountsDbUrl: string ) {} hasAvailableThread (): boolean { @@ -605,12 +606,13 @@ export class WorkspaceWorker { workspace, opt.backup.backupStorage, { + AccountsURL: this.accountsUrl, + AccountsDbURL: this.accountsDbUrl, Token: token, BucketName: opt.backup.bucketName, CoolDown: 0, Timeout: 0, SkipWorkspaces: '', - AccountsURL: '', Interval: 0, Parallel: 1, KeepSnapshots: 7 * 12 diff --git a/tests/docker-compose.override.yaml b/tests/docker-compose.override.yaml index 8f2fa5f5396..428486253b4 100644 --- a/tests/docker-compose.override.yaml +++ b/tests/docker-compose.override.yaml @@ -12,6 +12,7 @@ services: environment: - DB_URL=${DB_PG_URL} - STORAGE_CONFIG=${DATALAKE_STORAGE_CONFIG} + - ACCOUNTS_DB_URL=${DB_PG_URL} fulltext: environment: - DB_URL=${DB_PG_URL} diff --git a/tests/docker-compose.yaml b/tests/docker-compose.yaml index cb409daf1e1..2c47cf92a34 100644 --- a/tests/docker-compose.yaml +++ b/tests/docker-compose.yaml @@ -151,6 +151,7 @@ services: - STORAGE_CONFIG=${STORAGE_CONFIG} - REGION= - ACCOUNTS_URL=http://account:3003 + - ACCOUNTS_DB_URL=${MONGO_URL} - BRANDING_PATH=/var/cfg/branding.json - STATS_URL=http://stats:4901 restart: unless-stopped diff --git a/ws-tests/docker-compose.yaml b/ws-tests/docker-compose.yaml index 9291ec95755..00fd51e2138 100644 --- a/ws-tests/docker-compose.yaml +++ b/ws-tests/docker-compose.yaml @@ -203,6 +203,7 @@ services: - STORAGE_CONFIG=${STORAGE_CONFIG} - REGION= - ACCOUNTS_URL=http://huly.local:3003 + - ACCOUNTS_DB_URL=${DB_EU_URL} - BRANDING_PATH=/var/cfg/branding.json - BACKUP_STORAGE=${BACKUP_STORAGE_CONFIG} - BACKUP_BUCKET=${BACKUP_BUCKET_NAME} @@ -227,6 +228,7 @@ services: - STATS_URL=http://huly.local:4901 - STORAGE_CONFIG=${STORAGE_CONFIG} - ACCOUNTS_URL=http://huly.local:3003 + - ACCOUNTS_DB_URL=${DB_EU_URL} - BRANDING_PATH=/var/cfg/branding.json - BACKUP_STORAGE=${BACKUP_STORAGE_CONFIG} - BACKUP_BUCKET=${BACKUP_BUCKET_NAME}