diff --git a/packages/service-core/src/entry/commands/compact-action.ts b/packages/service-core/src/entry/commands/compact-action.ts index 128c54841..12327fbaa 100644 --- a/packages/service-core/src/entry/commands/compact-action.ts +++ b/packages/service-core/src/entry/commands/compact-action.ts @@ -21,12 +21,19 @@ const HEAP_LIMIT = v8.getHeapStatistics().heap_size_limit; const COMPACT_MEMORY_LIMIT_MB = Math.min(HEAP_LIMIT / 1024 / 1024 - 128, 1024); export function registerCompactAction(program: Command) { - const compactCommand = program.command(COMMAND_NAME); + const compactCommand = program + .command(COMMAND_NAME) + .option(`-b, --buckets [buckets]`, 'Bucket or bucket definition name (optional, comma-separate multiple names)'); wrapConfigCommand(compactCommand); return compactCommand.description('Compact storage').action(async (options) => { - logger.info('Compacting storage...'); + const buckets = options.buckets?.split(','); + if (buckets != null) { + logger.info('Compacting storage for all buckets...'); + } else { + logger.info(`Compacting storage for ${buckets.join(', ')}...`); + } const runnerConfig = extractRunnerOptions(options); const configuration = await utils.loadConfig(runnerConfig); logger.info('Successfully loaded configuration...'); @@ -46,7 +53,7 @@ export function registerCompactAction(program: Command) { } using p = bucketStorage.getInstance(active); logger.info('Performing compaction...'); - await p.compact({ memoryLimitMB: COMPACT_MEMORY_LIMIT_MB }); + await p.compact({ memoryLimitMB: COMPACT_MEMORY_LIMIT_MB, compactBuckets: buckets }); logger.info('Successfully compacted storage.'); } catch (e) { logger.error(`Failed to compact: ${e.toString()}`); diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 5c6b98fe3..96f16ad56 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -472,6 +472,8 @@ export interface CompactOptions { * If specified, compact only the specific buckets. * * If not specified, compacts all buckets. + * + * These can be individual bucket names, or bucket definition names. */ compactBuckets?: string[]; } diff --git a/packages/service-core/src/storage/mongo/MongoCompactor.ts b/packages/service-core/src/storage/mongo/MongoCompactor.ts index 07c946bae..102e1daf0 100644 --- a/packages/service-core/src/storage/mongo/MongoCompactor.ts +++ b/packages/service-core/src/storage/mongo/MongoCompactor.ts @@ -94,17 +94,33 @@ export class MongoCompactor { let currentState: CurrentBucketState | null = null; + let bucketLower: string | MinKey; + let bucketUpper: string | MaxKey; + + if (bucket == null) { + bucketLower = new MinKey(); + bucketUpper = new MaxKey(); + } else if (bucket.includes('[')) { + // Exact bucket name + bucketLower = bucket; + bucketUpper = bucket; + } else { + // Bucket definition name + bucketLower = `${bucket}[`; + bucketUpper = `${bucket}[\uFFFF`; + } + // Constant lower bound const lowerBound: BucketDataKey = { g: this.group_id, - b: bucket ?? (new MinKey() as any), + b: bucketLower as string, o: new MinKey() as any }; // Upper bound is adjusted for each batch let upperBound: BucketDataKey = { g: this.group_id, - b: bucket ?? (new MaxKey() as any), + b: bucketUpper as string, o: new MaxKey() as any };