Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/funny-pianos-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Fix "operation exceeded time limit" error
5 changes: 5 additions & 0 deletions .changeset/green-books-shout.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Fix storageStats error in metrics endpoint when collections don't exist.
5 changes: 0 additions & 5 deletions .changeset/short-bats-sin.md

This file was deleted.

6 changes: 0 additions & 6 deletions .changeset/stupid-maps-buy.md

This file was deleted.

5 changes: 0 additions & 5 deletions .changeset/wet-gorillas-remember.md

This file was deleted.

5 changes: 0 additions & 5 deletions .changeset/young-rockets-behave.md

This file was deleted.

10 changes: 10 additions & 0 deletions packages/service-core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# @powersync/service-core

## 0.8.5

### Patch Changes

- 1fd50a5: Fix checksum cache edge case with compacting
- aa4eb0a: Fix "JavaScript heap out of memory" on startup (slot health check)
- Updated dependencies [9e78ff1]
- Updated dependencies [0e16938]
- @powersync/[email protected]

## 0.8.4

### Patch Changes
Expand Down
2 changes: 1 addition & 1 deletion packages/service-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"publishConfig": {
"access": "public"
},
"version": "0.8.4",
"version": "0.8.5",
"main": "dist/index.js",
"license": "FSL-1.1-Apache-2.0",
"type": "module",
Expand Down
7 changes: 7 additions & 0 deletions packages/service-core/src/db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ export const MONGO_SOCKET_TIMEOUT_MS = 60_000;
*/
export const MONGO_OPERATION_TIMEOUT_MS = 30_000;

/**
* Same as above, but specifically for clear operations.
*
* These are retried when reaching the timeout.
*/
export const MONGO_CLEAR_OPERATION_TIMEOUT_MS = 5_000;

export function createMongoClient(config: configFile.PowerSyncConfig['storage']) {
const normalized = normalizeMongoConfig(config);
return new mongo.MongoClient(normalized.uri, {
Expand Down
2 changes: 0 additions & 2 deletions packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -265,8 +265,6 @@ export interface SyncRulesBucketStorage extends DisposableObserverClient<SyncRul
*/
clear(): Promise<void>;

setSnapshotDone(lsn: string): Promise<void>;

autoActivate(): Promise<void>;

/**
Expand Down
35 changes: 26 additions & 9 deletions packages/service-core/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -331,39 +331,56 @@ export class MongoBucketStorage
}

async getStorageMetrics(): Promise<StorageMetrics> {
const ignoreNotExiting = (e: unknown) => {
if (e instanceof mongo.MongoServerError && e.codeName == 'NamespaceNotFound') {
// Collection doesn't exist - return 0
return [{ storageStats: { size: 0 } }];
} else {
return Promise.reject(e);
}
};

const active_sync_rules = await this.getActiveSyncRules({ defaultSchema: 'public' });
if (active_sync_rules == null) {
return {
operations_size_bytes: 0,
parameters_size_bytes: 0,
replication_size_bytes: 0
};
}
const operations_aggregate = await this.db.bucket_data

.aggregate([
{
$collStats: {
storageStats: {},
count: {}
storageStats: {}
}
}
])
.toArray();
.toArray()
.catch(ignoreNotExiting);

const parameters_aggregate = await this.db.bucket_parameters
.aggregate([
{
$collStats: {
storageStats: {},
count: {}
storageStats: {}
}
}
])
.toArray();
.toArray()
.catch(ignoreNotExiting);

const replication_aggregate = await this.db.current_data
.aggregate([
{
$collStats: {
storageStats: {},
count: {}
storageStats: {}
}
}
])
.toArray();
.toArray()
.catch(ignoreNotExiting);

return {
operations_size_bytes: operations_aggregate[0].storageStats.size,
Expand Down
47 changes: 26 additions & 21 deletions packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleState } from './m
import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoCompactor } from './MongoCompactor.js';
import { BSON_DESERIALIZE_OPTIONS, idPrefixFilter, mapOpEntry, readSingleBatch, serializeLookup } from './util.js';
import { logger } from '@powersync/lib-services-framework';
import * as timers from 'timers/promises';

export class MongoSyncBucketStorage
extends DisposableObserver<SyncRulesBucketStorageListener>
Expand Down Expand Up @@ -459,10 +461,28 @@ export class MongoSyncBucketStorage
}

async clear(): Promise<void> {
while (true) {
try {
await this.clearIteration();
return;
} catch (e: unknown) {
if (e instanceof mongo.MongoServerError && e.codeName == 'MaxTimeMSExpired') {
logger.info(
`Clearing took longer than ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, waiting and triggering another iteration.`
);
await timers.setTimeout(db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
continue;
} else {
throw e;
}
}
}
}

private async clearIteration(): Promise<void> {
// Individual operations here may time out with the maxTimeMS option.
// It is expected to still make progress, and continue on the next try.

// TODO: Transactional?
await this.db.sync_rules.updateOne(
{
_id: this.group_id
Expand All @@ -476,48 +496,33 @@ export class MongoSyncBucketStorage
no_checkpoint_before: null
}
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);
await this.db.bucket_data.deleteMany(
{
_id: idPrefixFilter<BucketDataKey>({ g: this.group_id }, ['b', 'o'])
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);
await this.db.bucket_parameters.deleteMany(
{
key: idPrefixFilter<SourceKey>({ g: this.group_id }, ['t', 'k'])
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.current_data.deleteMany(
{
_id: idPrefixFilter<SourceKey>({ g: this.group_id }, ['t', 'k'])
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.source_tables.deleteMany(
{
group_id: this.group_id
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
);
}

async setSnapshotDone(lsn: string): Promise<void> {
await this.db.sync_rules.updateOne(
{
_id: this.group_id
},
{
$set: {
snapshot_done: true,
persisted_lsn: lsn,
last_checkpoint_ts: new Date()
}
}
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);
}

Expand Down
12 changes: 12 additions & 0 deletions packages/service-core/src/storage/mongo/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ export class PowerSyncMongo {
this.locks = this.db.collection('locks');
}

/**
* Clear all collections.
*/
async clear() {
await this.current_data.deleteMany({});
await this.bucket_data.deleteMany({});
Expand All @@ -73,4 +76,13 @@ export class PowerSyncMongo {
await this.instance.deleteOne({});
await this.locks.deleteMany({});
}

/**
* Drop the entire database.
*
* Primarily for tests.
*/
async drop() {
await this.db.dropDatabase();
}
}
22 changes: 22 additions & 0 deletions packages/service-core/test/src/data_storage.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1438,4 +1438,26 @@ bucket_definitions:
expect(errorCaught).true;
expect(isDisposed).true;
});

test('empty storage metrics', async () => {
const f = await factory({ dropAll: true });

const metrics = await f.getStorageMetrics();
expect(metrics).toEqual({
operations_size_bytes: 0,
parameters_size_bytes: 0,
replication_size_bytes: 0
});

const r = await f.configureSyncRules('bucket_definitions: {}');
const storage = f.getInstance(r.persisted_sync_rules!);
await storage.autoActivate();

const metrics2 = await f.getStorageMetrics();
expect(metrics2).toEqual({
operations_size_bytes: 0,
parameters_size_bytes: 0,
replication_size_bytes: 0
});
});
}
18 changes: 2 additions & 16 deletions packages/service-core/test/src/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,7 @@ import { JSONBig } from '@powersync/service-jsonbig';
import { RequestParameters } from '@powersync/service-sync-rules';
import * as timers from 'timers/promises';
import { describe, expect, test } from 'vitest';
import {
BATCH_OPTIONS,
makeTestTable,
MONGO_STORAGE_FACTORY,
PARSE_OPTIONS,
StorageFactory,
ZERO_LSN
} from './util.js';
import { ParseSyncRulesOptions, StartBatchOptions } from '@/storage/BucketStorage.js';
import { BATCH_OPTIONS, makeTestTable, MONGO_STORAGE_FACTORY, PARSE_OPTIONS, StorageFactory } from './util.js';

describe('sync - mongodb', function () {
defineTests(MONGO_STORAGE_FACTORY);
Expand All @@ -38,8 +30,7 @@ function defineTests(factory: StorageFactory) {
content: BASIC_SYNC_RULES
});

const storage = await f.getInstance(syncRules);
await storage.setSnapshotDone(ZERO_LSN);
const storage = f.getInstance(syncRules);
await storage.autoActivate();

const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => {
Expand Down Expand Up @@ -91,7 +82,6 @@ function defineTests(factory: StorageFactory) {
});

const storage = await f.getInstance(syncRules);
await storage.setSnapshotDone(ZERO_LSN);
await storage.autoActivate();

const result = await storage.startBatch(BATCH_OPTIONS, async (batch) => {
Expand Down Expand Up @@ -136,7 +126,6 @@ function defineTests(factory: StorageFactory) {
});

const storage = await f.getInstance(syncRules);
await storage.setSnapshotDone(ZERO_LSN);
await storage.autoActivate();

const stream = streamResponse({
Expand Down Expand Up @@ -164,7 +153,6 @@ function defineTests(factory: StorageFactory) {
});

const storage = await f.getInstance(syncRules);
await storage.setSnapshotDone(ZERO_LSN);
await storage.autoActivate();

const stream = streamResponse({
Expand Down Expand Up @@ -226,7 +214,6 @@ function defineTests(factory: StorageFactory) {
});

const storage = await f.getInstance(syncRules);
await storage.setSnapshotDone(ZERO_LSN);
await storage.autoActivate();

const exp = Date.now() / 1000 + 0.1;
Expand Down Expand Up @@ -265,7 +252,6 @@ function defineTests(factory: StorageFactory) {
});

const storage = await f.getInstance(syncRules);
await storage.setSnapshotDone(ZERO_LSN);
await storage.autoActivate();

await storage.startBatch(BATCH_OPTIONS, async (batch) => {
Expand Down
17 changes: 14 additions & 3 deletions packages/service-core/test/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,22 @@ await Metrics.initialise({
});
Metrics.getInstance().resetCounters();

export type StorageFactory = () => Promise<BucketStorageFactory>;
export interface StorageOptions {
/**
* By default, collections are only cleared/
* Setting this to true will drop the collections completely.
*/
dropAll?: boolean;
}
export type StorageFactory = (options?: StorageOptions) => Promise<BucketStorageFactory>;

export const MONGO_STORAGE_FACTORY: StorageFactory = async () => {
export const MONGO_STORAGE_FACTORY: StorageFactory = async (options?: StorageOptions) => {
const db = await connectMongo();
await db.clear();
if (options?.dropAll) {
await db.drop();
} else {
await db.clear();
}
return new MongoBucketStorage(db, { slot_name_prefix: 'test_' });
};

Expand Down
Loading
Loading