Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/bold-bears-like.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@aws-amplify/datastore': minor
---

Add per-model `syncPageSize` and `maxRecordsToSync` configuration via optional third parameter to `syncExpression()`. This allows configuring sync page size on a per-model basis instead of only globally, so models with large records can use a smaller page size without penalizing sync performance for all other models.
106 changes: 106 additions & 0 deletions packages/datastore/__tests__/connectivityHandling.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,112 @@ describe('DataStore sync engine', () => {
}
`);
});

describe('per-model sync configuration', () => {
/**
* These tests verify that per-model syncPageSize and maxRecordsToSync
* overrides (passed as the 3rd argument to syncExpression) are correctly
* applied to sync queries, while models without overrides fall back to
* the global defaults.
*
* graphqlService.requests accumulates across the entire test lifecycle
* (initial sync from beforeEach, data generation, resync). We snapshot
* the request count before resyncWith() and only inspect requests made
* after that point.
*/

const getSyncRequestsAfter = (
tableName: string,
afterIndex: number,
) =>
graphqlService.requests.slice(afterIndex).filter(
r =>
r.operation === 'query' &&
r.type === 'sync' &&
r.tableName === tableName,
);

test('per-model syncPageSize overrides global default', async () => {
(DataStore as any).amplifyConfig.syncPageSize = 1000;
(DataStore as any).amplifyConfig.maxRecordsToSync = 10000;

const requestsBefore = graphqlService.requests.length;
await resyncWith([
syncExpression(Post, () => Predicates.ALL, {
syncPageSize: 50,
}),
]);

const postSyncRequests = getSyncRequestsAfter('Post', requestsBefore);
expect(postSyncRequests.length).toBeGreaterThan(0);
expect(postSyncRequests[0].variables.limit).toBe(50);
});

test('model without per-model config uses global syncPageSize', async () => {
(DataStore as any).amplifyConfig.syncPageSize = 500;
(DataStore as any).amplifyConfig.maxRecordsToSync = 10000;

const requestsBefore = graphqlService.requests.length;
await resyncWith([
syncExpression(Post, () => Predicates.ALL, {
syncPageSize: 50,
}),
]);

// Comment has no per-model config, should use global
const commentSyncRequests = getSyncRequestsAfter(
'Comment',
requestsBefore,
);
expect(commentSyncRequests.length).toBeGreaterThan(0);
expect(commentSyncRequests[0].variables.limit).toBe(500);
});

test('multiple models with different per-model syncPageSize', async () => {
(DataStore as any).amplifyConfig.syncPageSize = 1000;
(DataStore as any).amplifyConfig.maxRecordsToSync = 10000;

const requestsBefore = graphqlService.requests.length;
await resyncWith([
syncExpression(Post, () => Predicates.ALL, {
syncPageSize: 75,
}),
syncExpression(Model, () => Predicates.ALL, {
syncPageSize: 200,
}),
]);

const postSyncRequests = getSyncRequestsAfter('Post', requestsBefore);
expect(postSyncRequests.length).toBeGreaterThan(0);
expect(postSyncRequests[0].variables.limit).toBe(75);

const modelSyncRequests = getSyncRequestsAfter(
'Model',
requestsBefore,
);
expect(modelSyncRequests.length).toBeGreaterThan(0);
expect(modelSyncRequests[0].variables.limit).toBe(200);
});

test('per-model maxRecordsToSync caps limit below syncPageSize', async () => {
(DataStore as any).amplifyConfig.syncPageSize = 1000;
(DataStore as any).amplifyConfig.maxRecordsToSync = 10000;

const requestsBefore = graphqlService.requests.length;
await resyncWith([
syncExpression(Post, () => Predicates.ALL, {
syncPageSize: 1000,
maxRecordsToSync: 25,
}),
]);

const postSyncRequests = getSyncRequestsAfter('Post', requestsBefore);
expect(postSyncRequests.length).toBeGreaterThan(0);
// limit = Math.min(maxRecordsToSync - 0, syncPageSize) = Math.min(25, 1000) = 25
expect(postSyncRequests[0].variables.limit).toBe(25);
});

});
});

describe('error handling', () => {
Expand Down
56 changes: 56 additions & 0 deletions packages/datastore/__tests__/syncExpression.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { syncExpression, PerModelSyncConfig } from '../src/types';
import { Predicates } from '../src/predicates';

// Minimal fake model constructor — syncExpression only stores the reference,
// it doesn't inspect it.
const FakeModel: any = function FakeModel() {};
FakeModel.copyOf = () => {};

describe('syncExpression', () => {
test('returns syncConfig when provided', async () => {
const config: PerModelSyncConfig = { syncPageSize: 50 };
const result = await syncExpression(
FakeModel,
() => Predicates.ALL,
config,
);

expect(result.syncConfig).toEqual({ syncPageSize: 50 });
expect(result.modelConstructor).toBe(FakeModel);
});

test('returns syncConfig with both fields', async () => {
const config: PerModelSyncConfig = {
syncPageSize: 100,
maxRecordsToSync: 500,
};
const result = await syncExpression(
FakeModel,
() => Predicates.ALL,
config,
);

expect(result.syncConfig).toEqual({
syncPageSize: 100,
maxRecordsToSync: 500,
});
});

test('syncConfig is undefined when omitted (backward compatible)', async () => {
const result = await syncExpression(FakeModel, () => Predicates.ALL);

expect(result.syncConfig).toBeUndefined();
expect(result.modelConstructor).toBe(FakeModel);
expect(result.conditionProducer).toBeDefined();
});

test('syncConfig is undefined when explicitly passed as undefined', async () => {
const result = await syncExpression(
FakeModel,
() => Predicates.ALL,
undefined,
);

expect(result.syncConfig).toBeUndefined();
});
});
43 changes: 34 additions & 9 deletions packages/datastore/src/datastore/datastore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import {
NonModelTypeConstructor,
ObserveQueryOptions,
PaginationInput,
PerModelSyncConfig,
PersistentModel,
PersistentModelConstructor,
PersistentModelMetaData,
Expand Down Expand Up @@ -1551,7 +1552,10 @@ class DataStore {
aws_appsync_graphqlEndpoint,
);

this.syncPredicates = await this.processSyncExpressions();
const { syncPredicates, perModelSyncConfig } =
await this.processSyncExpressions();
this.syncPredicates = syncPredicates;
this.amplifyConfig.perModelSyncConfig = perModelSyncConfig;
this.sync = new SyncEngine(
schema,
namespaceResolver,
Expand Down Expand Up @@ -2675,22 +2679,40 @@ class DataStore {

/**
* Examines the configured `syncExpressions` and produces a WeakMap of
* SchemaModel -> predicate to use during sync.
* SchemaModel -> predicate to use during sync, and a WeakMap of
* SchemaModel -> per-model sync config.
*/
private async processSyncExpressions(): Promise<
WeakMap<SchemaModel, ModelPredicate<any> | null>
> {
private async processSyncExpressions(): Promise<{
syncPredicates: WeakMap<SchemaModel, ModelPredicate<any> | null>;
perModelSyncConfig: WeakMap<SchemaModel, PerModelSyncConfig>;
}> {
if (!this.syncExpressions || !this.syncExpressions.length) {
return new WeakMap<SchemaModel, ModelPredicate<any>>();
return {
syncPredicates: new WeakMap<SchemaModel, ModelPredicate<any>>(),
perModelSyncConfig: new WeakMap<SchemaModel, PerModelSyncConfig>(),
};
}

const perModelSyncConfig = new WeakMap<SchemaModel, PerModelSyncConfig>();

const syncPredicates = await Promise.all(
this.syncExpressions.map(
async (
syncExpression: SyncExpression,
): Promise<[SchemaModel, ModelPredicate<any> | null]> => {
const { modelConstructor, conditionProducer } = await syncExpression;
const modelDefinition = getModelDefinition(modelConstructor)!;
const { modelConstructor, conditionProducer, syncConfig } =
await syncExpression;
const modelDefinition = getModelDefinition(modelConstructor);

if (!modelDefinition) {
throw new Error(
`Invalid model provided to syncExpression: ${modelConstructor?.name}; unable to find model definition.`,
);
}

if (syncConfig) {
perModelSyncConfig.set(modelDefinition, syncConfig);
}

// conditionProducer is either a predicate, e.g. (c) => c.field.eq(1)
// OR a function/promise that returns a predicate
Expand All @@ -2714,7 +2736,10 @@ class DataStore {
),
);

return this.weakMapFromEntries(syncPredicates);
return {
syncPredicates: this.weakMapFromEntries(syncPredicates),
perModelSyncConfig,
};
}

private async unwrapPromise<T extends PersistentModel>(
Expand Down
12 changes: 11 additions & 1 deletion packages/datastore/src/sync/processors/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,11 @@ class SyncProcessor {
start(
typesLastSync: Map<SchemaModel, [string, number]>,
): Observable<SyncModelPage> {
const { maxRecordsToSync, syncPageSize } = this.amplifyConfig;
const {
maxRecordsToSync: defaultMaxRecordsToSync,
syncPageSize: defaultSyncPageSize,
perModelSyncConfig,
} = this.amplifyConfig;
const parentPromises = new Map<string, Promise<void>>();
const observable = new Observable<SyncModelPage>(observer => {
const sortedTypesLastSyncs = Object.values(this.schema.namespaces).reduce(
Expand Down Expand Up @@ -394,6 +398,12 @@ class SyncProcessor {
let recordsReceived = 0;
const filter = this.graphqlFilterFromPredicate(modelDefinition);

const modelSyncConfig = perModelSyncConfig?.get(modelDefinition);
const syncPageSize =
modelSyncConfig?.syncPageSize ?? defaultSyncPageSize;
const maxRecordsToSync =
modelSyncConfig?.maxRecordsToSync ?? defaultMaxRecordsToSync;

const parents = this.schema.namespaces[
namespace
].modelTopologicalOrdering!.get(modelDefinition.name);
Expand Down
10 changes: 10 additions & 0 deletions packages/datastore/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1093,9 +1093,15 @@ export type ModelAuthModes = Record<
Record<ModelOperation, GraphQLAuthMode[]>
>;

export interface PerModelSyncConfig {
syncPageSize?: number;
maxRecordsToSync?: number;
}

export type SyncExpression = Promise<{
modelConstructor: any;
conditionProducer(c?: any): any;
syncConfig?: PerModelSyncConfig;
}>;

/*
Expand Down Expand Up @@ -1156,6 +1162,7 @@ type ConditionProducer<T extends PersistentModel, A extends Option<T>> = (
*
* @param modelConstructor The Model from the schema.
* @param conditionProducer A function that builds a condition object that can describe how to filter the model.
* @param syncConfig Optional per-model sync configuration (e.g., `syncPageSize`, `maxRecordsToSync`).
* @returns An sync expression object that can be attached to the DataStore `syncExpressions` configuration property.
*/
export async function syncExpression<
Expand All @@ -1164,13 +1171,16 @@ export async function syncExpression<
>(
modelConstructor: PersistentModelConstructor<T>,
conditionProducer: ConditionProducer<T, A>,
syncConfig?: PerModelSyncConfig,
): Promise<{
modelConstructor: PersistentModelConstructor<T>;
conditionProducer: ConditionProducer<T, A>;
syncConfig?: PerModelSyncConfig;
}> {
return {
modelConstructor,
conditionProducer,
syncConfig,
};
}

Expand Down