Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/funny-pianos-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-core': patch
---

Fix "operation exceeded time limit" error
7 changes: 7 additions & 0 deletions packages/service-core/src/db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ export const MONGO_SOCKET_TIMEOUT_MS = 60_000;
*/
export const MONGO_OPERATION_TIMEOUT_MS = 30_000;

/**
* Same as above, but specifically for clear operations.
*
* These are retried when reaching the timeout.
*/
export const MONGO_CLEAR_OPERATION_TIMEOUT_MS = 5_000;

export function createMongoClient(config: configFile.PowerSyncConfig['storage']) {
return new mongo.MongoClient(config.uri, {
auth: {
Expand Down
32 changes: 26 additions & 6 deletions packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleState } from './m
import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoCompactor } from './MongoCompactor.js';
import { BSON_DESERIALIZE_OPTIONS, idPrefixFilter, mapOpEntry, readSingleBatch, serializeLookup } from './util.js';
import { logger } from '@powersync/lib-services-framework';
import * as timers from 'timers/promises';

export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
private readonly db: PowerSyncMongo;
Expand Down Expand Up @@ -438,10 +440,28 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
}

async clear(): Promise<void> {
while (true) {
try {
await this.clearIteration();
return;
} catch (e: unknown) {
if (e instanceof mongo.MongoServerError && e.codeName == 'MaxTimeMSExpired') {
logger.info(
`Clearing took longer than ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, waiting and triggering another iteration.`
);
await timers.setTimeout(db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
continue;
} else {
throw e;
}
}
}
}

private async clearIteration(): Promise<void> {
// Individual operations here may time out with the maxTimeMS option.
// It is expected to still make progress, and continue on the next try.

// TODO: Transactional?
await this.db.sync_rules.updateOne(
{
_id: this.group_id
Expand All @@ -455,33 +475,33 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage {
no_checkpoint_before: null
}
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);
await this.db.bucket_data.deleteMany(
{
_id: idPrefixFilter<BucketDataKey>({ g: this.group_id }, ['b', 'o'])
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);
await this.db.bucket_parameters.deleteMany(
{
key: idPrefixFilter<SourceKey>({ g: this.group_id }, ['t', 'k'])
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.current_data.deleteMany(
{
_id: idPrefixFilter<SourceKey>({ g: this.group_id }, ['t', 'k'])
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);

await this.db.source_tables.deleteMany(
{
group_id: this.group_id
},
{ maxTimeMS: db.mongo.MONGO_OPERATION_TIMEOUT_MS }
{ maxTimeMS: db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS }
);
}

Expand Down
Loading