Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions libs/lib-mongodb/src/db/mongo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ export const MONGO_OPERATION_TIMEOUT_MS = 30_000;
*/
export const MONGO_CLEAR_OPERATION_TIMEOUT_MS = 5_000;

export function createMongoClient(config: BaseMongoConfigDecoded) {
export interface MongoConnectionOptions {
maxPoolSize: number;
}

export function createMongoClient(config: BaseMongoConfigDecoded, options?: MongoConnectionOptions) {
const normalized = normalizeMongoConfig(config);
return new mongo.MongoClient(normalized.uri, {
auth: {
Expand All @@ -48,7 +52,7 @@ export function createMongoClient(config: BaseMongoConfigDecoded) {
// Avoid too many connections:
// 1. It can overwhelm the source database.
// 2. Processing too many queries in parallel can cause the process to run out of memory.
maxPoolSize: 8,
maxPoolSize: options?.maxPoolSize ?? 8,

maxConnecting: 3,
maxIdleTimeMS: 60_000
Expand Down
4 changes: 3 additions & 1 deletion libs/lib-postgres/src/db/connection/DatabaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ export class DatabaseClient extends AbstractPostgresConnection<DatabaseClientLis
constructor(protected options: DatabaseClientOptions) {
super();
this.closed = false;
this.pool = pgwire.connectPgWirePool(options.config);
this.pool = pgwire.connectPgWirePool(options.config, {
maxSize: options.config.max_pool_size
});
this.connections = Array.from({ length: TRANSACTION_CONNECTION_COUNT }, (v, index) => {
// Only listen to notifications on a single (the first) connection
const notificationChannels = index == 0 ? options.notificationChannels : [];
Expand Down
12 changes: 9 additions & 3 deletions libs/lib-postgres/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import * as service_types from '@powersync/service-types';
import * as t from 'ts-codec';
import * as urijs from 'uri-js';

export interface NormalizedBasePostgresConnectionConfig extends jpgwire.NormalizedConnectionConfig {}
export interface NormalizedBasePostgresConnectionConfig extends jpgwire.NormalizedConnectionConfig {
max_pool_size: number;
}

export const POSTGRES_CONNECTION_TYPE = 'postgresql' as const;

Expand Down Expand Up @@ -42,7 +44,9 @@ export const BasePostgresConnectionConfig = t.object({
/**
* Prefix for the slot name. Defaults to "powersync_"
*/
slot_name_prefix: t.string.optional()
slot_name_prefix: t.string.optional(),

max_pool_size: t.number.optional()
});

export type BasePostgresConnectionConfig = t.Encoded<typeof BasePostgresConnectionConfig>;
Expand Down Expand Up @@ -125,7 +129,9 @@ export function normalizeConnectionConfig(options: BasePostgresConnectionConfigD
lookup,

client_certificate: options.client_certificate ?? undefined,
client_private_key: options.client_private_key ?? undefined
client_private_key: options.client_private_key ?? undefined,

max_pool_size: options.max_pool_size ?? 8
} satisfies NormalizedBasePostgresConnectionConfig;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ export class MongoStorageProvider implements storage.BucketStorageProvider {
}

const decodedConfig = MongoStorageConfig.decode(storage as any);
const client = lib_mongo.db.createMongoClient(decodedConfig);
const client = lib_mongo.db.createMongoClient(decodedConfig, {
maxPoolSize: resolvedConfig.storage.max_pool_size ?? 8
});

const database = new PowerSyncMongo(client, { database: resolvedConfig.storage.database });
const factory = new MongoBucketStorage(database, {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,6 @@ export class PowerSyncMongo {
}
}

export function createPowerSyncMongo(config: MongoStorageConfig) {
return new PowerSyncMongo(lib_mongo.createMongoClient(config), { database: config.database });
export function createPowerSyncMongo(config: MongoStorageConfig, options?: lib_mongo.MongoConnectionOptions) {
return new PowerSyncMongo(lib_mongo.createMongoClient(config, options), { database: config.database });
}
1 change: 1 addition & 0 deletions modules/module-postgres-storage/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ export type RequiredOperationBatchLimits = Required<OperationBatchLimits>;

export type NormalizedPostgresStorageConfig = pg_wire.NormalizedConnectionConfig & {
batch_limits: RequiredOperationBatchLimits;
max_pool_size: number;
};

export const normalizePostgresStorageConfig = (
Expand Down
35 changes: 25 additions & 10 deletions packages/service-core-tests/src/tests/register-sync-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ export const SYNC_SNAPSHOT_PATH = path.resolve(__dirname, '../__snapshots/sync.t
*/
export function registerSyncTests(factory: storage.TestStorageFactory) {
const tracker = new sync.RequestTracker();
const syncContext = new sync.SyncContext({
maxBuckets: 10,
maxParameterQueryResults: 10,
maxDataFetchConcurrency: 2
});

test('sync global data', async () => {
await using f = await factory();
Expand Down Expand Up @@ -67,6 +72,7 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
});

const stream = sync.streamResponse({
syncContext,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
Expand Down Expand Up @@ -128,7 +134,8 @@ bucket_definitions:
});

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand Down Expand Up @@ -191,7 +198,8 @@ bucket_definitions:
});

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand Down Expand Up @@ -276,7 +284,8 @@ bucket_definitions:
});

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand All @@ -302,7 +311,7 @@ bucket_definitions:
receivedCompletions++;
if (receivedCompletions == 1) {
// Trigger an empty bucket update.
await bucketStorage.createManagedWriteCheckpoint({user_id: '', heads: {'1': '1/0'}});
await bucketStorage.createManagedWriteCheckpoint({ user_id: '', heads: { '1': '1/0' } });
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
await batch.commit('1/0');
});
Expand Down Expand Up @@ -342,7 +351,8 @@ bucket_definitions:
});

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand Down Expand Up @@ -371,7 +381,8 @@ bucket_definitions:
await bucketStorage.autoActivate();

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand All @@ -398,7 +409,8 @@ bucket_definitions:
await bucketStorage.autoActivate();

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand Down Expand Up @@ -461,7 +473,8 @@ bucket_definitions:
const exp = Date.now() / 1000 + 0.1;

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand Down Expand Up @@ -521,7 +534,8 @@ bucket_definitions:
});

const stream = sync.streamResponse({
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand Down Expand Up @@ -644,7 +658,8 @@ bucket_definitions:
});

const params: sync.SyncStreamParameters = {
bucketStorage: bucketStorage,
syncContext,
bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
Expand Down
1 change: 1 addition & 0 deletions packages/service-core/src/routes/RouterEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { SYNC_RULES_ROUTES } from './endpoints/sync-rules.js';
import { SYNC_STREAM_ROUTES } from './endpoints/sync-stream.js';
import { SocketRouteGenerator } from './router-socket.js';
import { RouteDefinition } from './router.js';
import { SyncContext } from '../sync/SyncContext.js';

export type RouterSetupResponse = {
onShutdown: () => Promise<void>;
Expand Down
3 changes: 2 additions & 1 deletion packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
handler: async ({ context, params, responder, observer, initialN, signal: upstreamSignal }) => {
const { service_context } = context;
const { routerEngine } = service_context;
const { routerEngine, syncContext } = service_context;

// Create our own controller that we can abort directly
const controller = new AbortController();
Expand Down Expand Up @@ -73,6 +73,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
const tracker = new sync.RequestTracker();
try {
for await (const data of sync.streamResponse({
syncContext: syncContext,
bucketStorage: bucketStorage,
syncRules: syncRules,
params: {
Expand Down
3 changes: 2 additions & 1 deletion packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export const syncStreamed = routeDefinition({
validator: schema.createTsCodecValidator(util.StreamingSyncRequest, { allowAdditional: true }),
handler: async (payload) => {
const { service_context } = payload.context;
const { routerEngine, storageEngine } = service_context;
const { routerEngine, storageEngine, syncContext } = service_context;
const headers = payload.request.headers;
const userAgent = headers['x-user-agent'] ?? headers['user-agent'];
const clientId = payload.params.client_id;
Expand Down Expand Up @@ -56,6 +56,7 @@ export const syncStreamed = routeDefinition({
sync.transformToBytesTracked(
sync.ndjson(
sync.streamResponse({
syncContext: syncContext,
bucketStorage,
syncRules: syncRules,
params,
Expand Down
36 changes: 31 additions & 5 deletions packages/service-core/src/sync/BucketChecksumState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import * as util from '../util/util-index.js';
import { ErrorCode, logger, ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework';
import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js';
import { BucketSyncState } from './sync.js';
import { SyncContext } from './SyncContext.js';

export interface BucketChecksumStateOptions {
syncContext: SyncContext;
bucketStorage: BucketChecksumStateStorage;
syncRules: SqlSyncRules;
syncParams: RequestParameters;
Expand All @@ -20,6 +22,7 @@ export interface BucketChecksumStateOptions {
* Handles incrementally re-computing checkpoints.
*/
export class BucketChecksumState {
private readonly context: SyncContext;
private readonly bucketStorage: BucketChecksumStateStorage;

/**
Expand All @@ -43,8 +46,14 @@ export class BucketChecksumState {
private pendingBucketDownloads = new Set<string>();

constructor(options: BucketChecksumStateOptions) {
this.context = options.syncContext;
this.bucketStorage = options.bucketStorage;
this.parameterState = new BucketParameterState(options.bucketStorage, options.syncRules, options.syncParams);
this.parameterState = new BucketParameterState(
options.syncContext,
options.bucketStorage,
options.syncRules,
options.syncParams
);
this.bucketDataPositions = new Map();

for (let { name, after: start } of options.initialBucketPositions ?? []) {
Expand Down Expand Up @@ -73,6 +82,12 @@ export class BucketChecksumState {
});
}
this.bucketDataPositions = dataBucketsNew;
if (dataBucketsNew.size > this.context.maxBuckets) {
throw new ServiceError(
ErrorCode.PSYNC_S2305,
`Too many buckets: ${dataBucketsNew.size} (limit of ${this.context.maxBuckets})`
);
}

let checksumMap: util.ChecksumMap;
if (updatedBuckets != null) {
Expand Down Expand Up @@ -247,13 +262,20 @@ export interface CheckpointUpdate {
}

export class BucketParameterState {
private readonly context: SyncContext;
public readonly bucketStorage: BucketChecksumStateStorage;
public readonly syncRules: SqlSyncRules;
public readonly syncParams: RequestParameters;
private readonly querier: BucketParameterQuerier;
private readonly staticBuckets: Map<string, BucketDescription>;

constructor(bucketStorage: BucketChecksumStateStorage, syncRules: SqlSyncRules, syncParams: RequestParameters) {
constructor(
context: SyncContext,
bucketStorage: BucketChecksumStateStorage,
syncRules: SqlSyncRules,
syncParams: RequestParameters
) {
this.context = context;
this.bucketStorage = bucketStorage;
this.syncRules = syncRules;
this.syncParams = syncParams;
Expand All @@ -275,9 +297,13 @@ export class BucketParameterState {
return null;
}

if (update.buckets.length > 1000) {
// TODO: Limit number of buckets even before we get to this point
const error = new ServiceError(ErrorCode.PSYNC_S2305, `Too many buckets: ${update.buckets.length}`);
if (update.buckets.length > this.context.maxParameterQueryResults) {
// TODO: Limit number of results even before we get to this point
// This limit applies _before_ we get the unique set
const error = new ServiceError(
ErrorCode.PSYNC_S2305,
`Too many parameter query results: ${update.buckets.length} (limit of ${this.context.maxParameterQueryResults})`
);
logger.error(error.message, {
checkpoint: checkpoint,
user_id: this.syncParams.user_id,
Expand Down
36 changes: 36 additions & 0 deletions packages/service-core/src/sync/SyncContext.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { Semaphore, SemaphoreInterface, withTimeout } from 'async-mutex';

export interface SyncContextOptions {
maxBuckets: number;
maxParameterQueryResults: number;
maxDataFetchConcurrency: number;
}

/**
* Maximum duration to wait for the mutex to become available.
*
* This gives an explicit error if there are mutex issues, rather than just hanging.
*/
const MUTEX_ACQUIRE_TIMEOUT = 30_000;

/**
* Represents the context in which sync happens.
*
* This is global to all sync requests, not per request.
*/
export class SyncContext {
readonly maxBuckets: number;
readonly maxParameterQueryResults: number;

readonly syncSemaphore: SemaphoreInterface;

constructor(options: SyncContextOptions) {
this.maxBuckets = options.maxBuckets;
this.maxParameterQueryResults = options.maxParameterQueryResults;
this.syncSemaphore = withTimeout(
new Semaphore(options.maxDataFetchConcurrency),
MUTEX_ACQUIRE_TIMEOUT,
new Error(`Timeout while waiting for data`)
);
}
}
1 change: 1 addition & 0 deletions packages/service-core/src/sync/sync-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from './safeRace.js';
export * from './sync.js';
export * from './util.js';
export * from './BucketChecksumState.js';
export * from './SyncContext.js';
Loading
Loading