Skip to content

Commit 3b77c8e

Browse files
authored
perf(cloud_auth): Use batches for migrations (#387)
When seeding the database or inserting entities across multiple tables, use batches to improve performance. This is required for Turso since transactions have short time limits and can easily timeout when not using batches.
1 parent 91fe006 commit 3b77c8e

File tree

4 files changed

+180
-84
lines changed

4 files changed

+180
-84
lines changed

services/celest_cloud_auth/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
## 0.3.2+2
22

33
- fix: Migration of Cloud Auth tables
4+
- perf: Use batches for migrations
45

56
## 0.3.2+1
67

services/celest_cloud_auth/lib/src/database/auth_database_accessors.dart

Lines changed: 177 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import 'package:celest_cloud_auth/src/context.dart';
1010
import 'package:celest_cloud_auth/src/database/auth_database.steps.dart';
1111
import 'package:celest_cloud_auth/src/database/auth_database_accessors.drift.dart';
1212
import 'package:celest_cloud_auth/src/database/schema/cedar.drift.dart';
13+
import 'package:celest_cloud_auth/src/database/schema/cloud_auth_projects.drift.dart';
1314
import 'package:celest_cloud_auth/src/database/schema/cloud_auth_users.drift.dart';
1415
import 'package:celest_cloud_auth/src/util/typeid.dart';
1516
import 'package:celest_core/_internal.dart' show kDebugMode;
@@ -207,27 +208,24 @@ class CloudAuthDatabaseAccessors extends DatabaseAccessor<GeneratedDatabase>
207208
for (final type in allCedarTypes)
208209
CedarTypesCompanion.insert(fqn: type),
209210
]);
210-
for (final entity in allCedarEntities.values) {
211-
b.insert(
212-
cedarEntities,
211+
b.insertAllOnConflictUpdate(cedarEntities, [
212+
for (final entity in allCedarEntities.values)
213213
CedarEntitiesCompanion.insert(
214214
entityType: entity.uid.type,
215215
entityId: entity.uid.id,
216216
attributeJson: drift.Value(entity.attributes),
217217
),
218-
);
219-
for (final parent in entity.parents) {
220-
b.insert(
221-
cedarRelationships,
218+
]);
219+
b.insertAllOnConflictUpdate(cedarRelationships, [
220+
for (final entity in allCedarEntities.values)
221+
for (final parent in entity.parents)
222222
CedarRelationshipsCompanion.insert(
223223
entityType: entity.uid.type,
224224
entityId: entity.uid.id,
225225
parentType: parent.type,
226226
parentId: parent.id,
227227
),
228-
);
229-
}
230-
}
228+
]);
231229
});
232230
await upsertPolicySet(corePolicySet.merge(additionalCedarPolicies));
233231
});
@@ -395,7 +393,7 @@ class CloudAuthDatabaseAccessors extends DatabaseAccessor<GeneratedDatabase>
395393
PolicySet? additionalCedarPolicies,
396394
}) async {
397395
await _withoutForeignKeys(() async {
398-
if (details.wasCreated) {
396+
if (details.wasCreated || details.hadUpgrade) {
399397
await seed(
400398
additionalCedarTypes: additionalCedarTypes,
401399
additionalCedarEntities: additionalCedarEntities,
@@ -535,7 +533,7 @@ class CloudAuthDatabaseAccessors extends DatabaseAccessor<GeneratedDatabase>
535533
/// Returns the new, effective policy set for the project.
536534
Future<void> upsertProject({
537535
ResolvedProject? project,
538-
}) {
536+
}) async {
539537
project ??= context.project;
540538
_logger.finer('Upserting project: ${project.projectId}');
541539

@@ -554,109 +552,206 @@ class CloudAuthDatabaseAccessors extends DatabaseAccessor<GeneratedDatabase>
554552
_logger.finer('Project AST is up-to-date. Skipping AST upsert.');
555553
return;
556554
}
557-
await createEntity(Entity(uid: project.uid));
558-
for (final api in project.apis.values) {
559-
_logger.finer('Upserting API: ${api.apiId}');
560-
await cloudAuthProjectsDrift.upsertApi(
561-
projectId: project.projectId,
562-
apiId: api.apiId,
563-
resolvedAst: api,
564-
etag: _etag(api.toProto()),
565-
);
566-
await createEntity(Entity(uid: api.uid, parents: [project.uid]));
567-
for (final function in api.functions.values) {
568-
_logger.finer('Upserting function: ${function.functionId}');
569-
await cloudAuthProjectsDrift.upsertFunction(
570-
apiId: api.apiId,
571-
functionId: function.functionId,
572-
resolvedAst: function,
573-
etag: _etag(function.toProto()),
555+
556+
await batch((b) async {
557+
await createEntity(Entity(uid: project!.uid), b);
558+
for (final api in project.apis.values) {
559+
_logger.finer('Upserting API: ${api.apiId}');
560+
b.insert<CloudAuthApis, CloudAuthApi>(
561+
cloudAuthApis,
562+
CloudAuthApisCompanion.insert(
563+
projectId: project.projectId,
564+
apiId: api.apiId,
565+
resolvedAst: api,
566+
etag: _etag(api.toProto()),
567+
),
568+
onConflict: DoUpdate.withExcluded(
569+
(_, excluded) => CloudAuthApisCompanion.custom(
570+
projectId: excluded.projectId,
571+
resolvedAst: excluded.resolvedAst,
572+
etag: excluded.etag,
573+
),
574+
target: [cloudAuthApis.apiId],
575+
),
574576
);
575-
await createEntity(Entity(uid: function.uid, parents: [api.uid]));
577+
await createEntity(Entity(uid: api.uid, parents: [project.uid]), b);
578+
for (final function in api.functions.values) {
579+
_logger.finer('Upserting function: ${function.functionId}');
580+
b.insert<CloudAuthFunctions, CloudAuthFunction>(
581+
cloudAuthFunctions,
582+
CloudAuthFunctionsCompanion.insert(
583+
apiId: api.apiId,
584+
functionId: function.functionId,
585+
resolvedAst: function,
586+
etag: _etag(function.toProto()),
587+
),
588+
onConflict: DoUpdate.withExcluded(
589+
(_, excluded) => CloudAuthFunctionsCompanion.custom(
590+
apiId: excluded.apiId,
591+
resolvedAst: excluded.resolvedAst,
592+
etag: excluded.etag,
593+
),
594+
target: [cloudAuthFunctions.functionId],
595+
),
596+
);
597+
await createEntity(
598+
Entity(uid: function.uid, parents: [api.uid]),
599+
b,
600+
);
601+
}
576602
}
577-
}
578603

579-
final differ = _ProjectAuthDiff();
580-
project.acceptWithArg(differ, oldProject?.resolvedAst);
581-
for (final linkId in differ.removedTemplateLinks) {
582-
_logger.finer('Deleting policy template link: $linkId');
583-
await cedarDrift.deletePolicyTemplateLink(policyId: linkId);
584-
}
585-
for (final templateId in differ.removedTemplateIds) {
586-
_logger.finer('Deleting policy template: $templateId');
587-
await cedarDrift.deletePolicyTemplate(templateId: templateId);
588-
}
589-
for (final policyId in differ.removedPolicyIds) {
590-
_logger.finer('Deleting policy: $policyId');
591-
await cedarDrift.deletePolicy(policyId: policyId);
592-
}
593-
await upsertPolicySet(corePolicySet.merge(differ.newPolicySet));
604+
final differ = _ProjectAuthDiff();
605+
project.acceptWithArg(differ, oldProject?.resolvedAst);
606+
for (final linkId in differ.removedTemplateLinks) {
607+
_logger.finer('Deleting policy template link: $linkId');
608+
b.deleteWhere(
609+
cedarDrift.cedarPolicyTemplateLinks,
610+
(link) => link.id.equals(linkId),
611+
);
612+
}
613+
for (final templateId in differ.removedTemplateIds) {
614+
_logger.finer('Deleting policy template: $templateId');
615+
b.deleteWhere(
616+
cedarDrift.cedarPolicyTemplates,
617+
(template) => template.templateId.equals(templateId),
618+
);
619+
}
620+
for (final policyId in differ.removedPolicyIds) {
621+
_logger.finer('Deleting policy: $policyId');
622+
b.deleteWhere(
623+
cedarDrift.cedarPolicies,
624+
(policy) => policy.id.equals(policyId),
625+
);
626+
}
627+
await upsertPolicySet(corePolicySet.merge(differ.newPolicySet), b);
628+
});
594629
});
595630
}
596631

597632
/// Upserts a Cedar [PolicySet] into the database.
598633
///
599634
/// Returns the new, effective [PolicySet].
600-
Future<void> upsertPolicySet(PolicySet policySet) async {
601-
await transaction(() async {
635+
Future<void> upsertPolicySet(PolicySet policySet, [Batch? batch]) async {
636+
await _withBatch(batch, (b) async {
602637
for (final policy in policySet.policies.entries) {
603638
_logger.finer('Upserting policy: ${policy.key}');
604-
await cedarDrift.upsertPolicy(
605-
id: typeId('pol'),
606-
policyId: policy.key,
607-
policy: policy.value,
608-
enforcementLevel: 1,
639+
b.insert<CedarPolicies, CedarPolicy>(
640+
cedarDrift.cedarPolicies,
641+
CedarPoliciesCompanion.insert(
642+
id: typeId('pol'),
643+
policyId: policy.key,
644+
policy: policy.value,
645+
enforcementLevel: drift.Value(1),
646+
),
647+
onConflict: DoUpdate.withExcluded(
648+
(_, excluded) => CedarPoliciesCompanion.custom(
649+
policy: excluded.policy,
650+
enforcementLevel: excluded.enforcementLevel,
651+
),
652+
target: [cedarPolicies.policyId],
653+
),
609654
);
610655
}
656+
611657
for (final template in policySet.templates.entries) {
612658
_logger.finer('Upserting policy template: ${template.key}');
613-
await cedarDrift.upsertPolicyTemplate(
614-
id: typeId('polt'),
615-
templateId: template.key,
616-
template: template.value,
659+
b.insert<CedarPolicyTemplates, CedarPolicyTemplate>(
660+
cedarDrift.cedarPolicyTemplates,
661+
CedarPolicyTemplatesCompanion.insert(
662+
id: typeId('polt'),
663+
templateId: template.key,
664+
template: template.value,
665+
),
666+
onConflict: DoUpdate.withExcluded(
667+
(_, excluded) => CedarPolicyTemplatesCompanion.custom(
668+
template: excluded.template,
669+
),
670+
target: [cedarPolicyTemplates.templateId],
671+
),
617672
);
618673
}
674+
619675
for (final link in policySet.templateLinks) {
620676
_logger.finer(
621677
'Upserting policy template link: ${link.shortDisplayString}',
622678
);
623-
624-
await cedarDrift.upsertPolicyTemplateLink(
625-
id: typeId('polk'),
626-
policyId: link.newId,
627-
templateId: link.templateId,
628-
principalType: link.values[SlotId.principal]?.type,
629-
principalId: link.values[SlotId.principal]?.id,
630-
resourceType: link.values[SlotId.resource]?.type,
631-
resourceId: link.values[SlotId.resource]?.id,
632-
enforcementLevel: 1,
679+
b.insert<CedarPolicyTemplateLinks, CedarPolicyTemplateLink>(
680+
cedarDrift.cedarPolicyTemplateLinks,
681+
CedarPolicyTemplateLinksCompanion.insert(
682+
id: typeId('polk'),
683+
policyId: link.newId,
684+
templateId: link.templateId,
685+
principalType: drift.Value.absentIfNull(
686+
link.values[SlotId.principal]?.type,
687+
),
688+
principalId: drift.Value.absentIfNull(
689+
link.values[SlotId.principal]?.id,
690+
),
691+
resourceType: drift.Value.absentIfNull(
692+
link.values[SlotId.resource]?.type,
693+
),
694+
resourceId: drift.Value.absentIfNull(
695+
link.values[SlotId.resource]?.id,
696+
),
697+
enforcementLevel: drift.Value(1),
698+
),
699+
onConflict: DoUpdate.withExcluded(
700+
(_, excluded) => CedarPolicyTemplateLinksCompanion.custom(
701+
principalType: excluded.principalType,
702+
principalId: excluded.principalId,
703+
resourceType: excluded.resourceType,
704+
resourceId: excluded.resourceId,
705+
enforcementLevel: excluded.enforcementLevel,
706+
),
707+
target: [cedarPolicyTemplateLinks.policyId],
708+
),
633709
);
634710
}
635711
});
636712
_effectivePolicySetCache.invalidate();
637713
}
638714

639715
/// Creates a new [entity] in the database.
640-
Future<void> createEntity(Entity entity) async {
716+
Future<void> createEntity(Entity entity, [Batch? batch]) async {
641717
_logger.finer(
642718
'Creating entity: ${entity.uid} with parents: ${entity.parents}',
643719
);
644-
await cedarDrift.createType(
645-
fqn: entity.uid.type,
646-
);
647-
await cedarDrift.createEntity(
648-
entityType: entity.uid.type,
649-
entityId: entity.uid.id,
650-
attributeJson: entity.attributes,
651-
);
652-
for (final parent in entity.parents) {
653-
await cedarDrift.createRelationship(
654-
entityType: entity.uid.type,
655-
entityId: entity.uid.id,
656-
parentType: parent.type,
657-
parentId: parent.id,
720+
await _withBatch(batch, (b) async {
721+
b.insertAllOnConflictUpdate(
722+
cedarTypes,
723+
[CedarTypesCompanion.insert(fqn: entity.uid.type)],
658724
);
725+
b.insertAllOnConflictUpdate(cedarEntities, [
726+
CedarEntitiesCompanion.insert(
727+
entityType: entity.uid.type,
728+
entityId: entity.uid.id,
729+
attributeJson: drift.Value(entity.attributes),
730+
),
731+
]);
732+
b.insertAllOnConflictUpdate(cedarRelationships, [
733+
for (final parent in entity.parents)
734+
CedarRelationshipsCompanion.insert(
735+
entityType: entity.uid.type,
736+
entityId: entity.uid.id,
737+
parentType: parent.type,
738+
parentId: parent.id,
739+
),
740+
]);
741+
});
742+
}
743+
744+
Future<void> _withBatch(
745+
Batch? batch,
746+
Future<void> Function(Batch) action,
747+
) async {
748+
if (batch != null) {
749+
await action(batch);
750+
return;
659751
}
752+
await db.batch((b) async {
753+
await action(b);
754+
});
660755
}
661756

662757
/// Computes the transitive closure for an [AuthorizationRequest].

services/celest_cloud_auth/lib/src/database/schema/cloud_auth_core.drift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ deleteCork:
348348
WHERE cork_id = :cork_id;
349349

350350
-- Revokes all cloud_auth_corks under the given entity.
351-
deletecloud_auth_corksForEntity(
351+
deleteCorksForEntity(
352352
:bearer_type AS TEXT OR NULL,
353353
:bearer_id AS TEXT OR NULL,
354354
:audience_type AS TEXT OR NULL,

services/celest_cloud_auth/lib/src/database/schema/cloud_auth_core.drift.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2496,7 +2496,7 @@ class CloudAuthCoreDrift extends i7.ModularAccessor {
24962496
);
24972497
}
24982498

2499-
i8.Future<List<i2.Uint8List>> deletecloud_auth_corksForEntity(
2499+
i8.Future<List<i2.Uint8List>> deleteCorksForEntity(
25002500
{String? bearerType,
25012501
String? bearerId,
25022502
String? audienceType,

0 commit comments

Comments
 (0)