Skip to content

Commit bb01efd

Browse files
committed
WIP: change event processing.
1 parent f4cf87f commit bb01efd

File tree

4 files changed

+154
-11
lines changed

4 files changed

+154
-11
lines changed

app/lib/account/backend.dart

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import 'package:meta/meta.dart';
1313
// ignore: import_of_legacy_library_into_null_safe
1414
import 'package:neat_cache/neat_cache.dart';
1515
import 'package:pub_dev/admin/models.dart';
16+
import 'package:pub_dev/service/change_event/change_event.dart';
1617

1718
import '../audit/models.dart';
1819
import '../frontend/request_context.dart';
@@ -462,7 +463,7 @@ class AccountBackend {
462463
if (user == null || user.isModerated || user.isDeleted) {
463464
throw AuthenticationException.failed();
464465
}
465-
final data = await withRetryTransaction(_db, (tx) async {
466+
final (data, change) = await withRetryTransaction(_db, (tx) async {
466467
final session = await tx.userSessions.lookupOrNull(sessionId);
467468
if (session == null || session.isExpired()) {
468469
throw AuthenticationException.failed('Session has been expired.');
@@ -473,7 +474,8 @@ class AccountBackend {
473474
oldUserId != user.userId) {
474475
// expire old session
475476
tx.delete(session.key);
476-
await cache.userSessionData(sessionId).purgeAndRepeat();
477+
final change =
478+
CapturedChange(ChangeAction.delete, UserSession, [sessionId]);
477479

478480
// create a new session
479481
final newSession = UserSession.init()
@@ -487,7 +489,7 @@ class AccountBackend {
487489
..authenticatedAt = now
488490
..expires = now.add(_sessionDuration);
489491
tx.insert(newSession);
490-
return SessionData.fromModel(newSession);
492+
return (SessionData.fromModel(newSession), change);
491493
} else {
492494
// only update the current one
493495
session
@@ -500,10 +502,13 @@ class AccountBackend {
500502
..authenticatedAt = now
501503
..expires = now.add(_sessionDuration);
502504
tx.insert(session);
503-
return SessionData.fromModel(session);
505+
return (SessionData.fromModel(session), null);
504506
}
505507
});
506508
await cache.userSessionData(data.sessionId).set(data);
509+
if (change != null) {
510+
changeEventAggregator.addChange(change);
511+
}
507512
return data;
508513
}
509514

@@ -698,7 +703,8 @@ class _UserSessionDataAccess {
698703
} on Exception catch (_) {
699704
// ignore if the entity has been already deleted concurrently
700705
}
701-
await cache.userSessionData(sessionId).purge();
706+
changeEventAggregator.addChange(
707+
CapturedChange(ChangeAction.delete, UserSession, [sessionId]));
702708
}
703709

704710
/// Removes the session data that has expiry before [ts].
@@ -719,3 +725,12 @@ class _UserSessionTransactionDataAcccess {
719725
return await _tx.lookupOrNull<UserSession>(key);
720726
}
721727
}
728+
729+
Iterable<TriggeredEvent> processUserSessionChange(CapturedChange change) sync* {
730+
if (change.entity == UserSession && change.action == ChangeAction.delete) {
731+
yield TriggeredEvent(
732+
[UserSession, ...change.keys],
733+
() => cache.userSessionData(change.keys.single as String).purge(),
734+
);
735+
}
736+
}

app/lib/package/backend.dart

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import 'package:pub_dev/package/api_export/api_exporter.dart';
2020
import 'package:pub_dev/package/tarball_storage.dart';
2121
import 'package:pub_dev/scorecard/backend.dart';
2222
import 'package:pub_dev/service/async_queue/async_queue.dart';
23+
import 'package:pub_dev/service/change_event/change_event.dart';
2324
import 'package:pub_dev/service/rate_limit/rate_limit.dart';
2425
import 'package:pub_dev/shared/versions.dart';
2526
import 'package:pub_dev/task/backend.dart';
@@ -480,9 +481,8 @@ class PackageBackend {
480481
options: optionsChanges,
481482
));
482483
});
483-
await purgePackageCache(package);
484-
await taskBackend.trackPackage(package);
485-
await apiExporter.synchronizePackage(package);
484+
changeEventAggregator
485+
.addChange(CapturedChange(ChangeAction.update, Package, [package]));
486486
}
487487

488488
/// Updates [options] on [package]/[version], assuming the current user
@@ -2096,3 +2096,22 @@ class _VersionTransactionDataAcccess {
20962096
return await _tx.query<PackageVersion>(pkgKey).run().toList();
20972097
}
20982098
}
2099+
2100+
Iterable<TriggeredEvent> processPackageChange(CapturedChange change) sync* {
2101+
if (change.entity != Package) return;
2102+
final package = change.keys.single as String;
2103+
if (change.action == ChangeAction.update) {
2104+
yield change.toTriggeredEvent(
2105+
['purge-cache'],
2106+
() => purgePackageCache(package),
2107+
);
2108+
yield change.toTriggeredEvent(
2109+
['track-task'],
2110+
() => taskBackend.trackPackage(package),
2111+
);
2112+
yield change.toTriggeredEvent(
2113+
['export-package'],
2114+
() => apiExporter.synchronizePackage(package),
2115+
);
2116+
}
2117+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'package:gcloud/service_scope.dart' as ss;
6+
7+
import 'package:pub_dev/service/async_queue/async_queue.dart';
8+
9+
/// Sets the [ChangeEventAggregator] service in the scope.
10+
void registerChangeEventAggregator(ChangeEventAggregator service) =>
11+
ss.register(#_changeEventAggregator, service);
12+
13+
/// The active [ChangeEventAggregator] service.
14+
ChangeEventAggregator get changeEventAggregator =>
15+
ss.lookup(#_changeEventAggregator) as ChangeEventAggregator;
16+
17+
/// Creates events that need to be run after given [change].
18+
typedef ChangeProcessorFn = Iterable<TriggeredEvent> Function(
19+
CapturedChange change);
20+
21+
/// Processes entity changes that could trigger further events (e.g. cache invalidation
22+
/// or re-running export to CDN bucket).
23+
///
24+
/// The events are aggregated (duplicates are filtered) and run in batches, off-request.
25+
class ChangeEventAggregator {
26+
final List<ChangeProcessorFn> _changeProcessorFns;
27+
final _events = <String, TriggeredEvent>{};
28+
bool _scheduled = false;
29+
30+
ChangeEventAggregator(this._changeProcessorFns);
31+
32+
/// Digests the [change] and may trigger new event(s).
33+
///
34+
/// The events are scheduled via [AsyncQueue], off-request.
35+
void addChange(CapturedChange change) {
36+
for (final fn in _changeProcessorFns) {
37+
for (final event in fn(change)) {
38+
if (_events.containsKey(event.deduplicateKey)) {
39+
continue;
40+
}
41+
_events[event.deduplicateKey] = event;
42+
}
43+
}
44+
_scheduleProcessing();
45+
}
46+
47+
void _scheduleProcessing() {
48+
if (_scheduled) return;
49+
_scheduled = true;
50+
asyncQueue.addAsyncFn(() async {
51+
_scheduled = false;
52+
await executeEvents();
53+
if (_events.isNotEmpty) {
54+
_scheduleProcessing();
55+
}
56+
});
57+
}
58+
59+
/// Executes the scheduled events.
60+
Future<void> executeEvents() async {
61+
if (_events.isEmpty) {
62+
return;
63+
}
64+
// Clears event map to prevent concurrent updates while the events are processed.
65+
final fns = _events.values.map((e) => e.fn).toList();
66+
_events.clear();
67+
68+
// Processing the events without any batching.
69+
// TODO: consider concurrency for events that are cheap (e.g. cache invalidation).
70+
for (final fn in fns) {
71+
await fn();
72+
}
73+
}
74+
}
75+
76+
/// Describes the entity being changed with [keys] and other relevant [fields].
77+
class CapturedChange {
78+
final ChangeAction action;
79+
final Type entity;
80+
final List<Object> keys;
81+
final Map<String, Object?>? fields;
82+
83+
CapturedChange(this.action, this.entity, this.keys, [this.fields]);
84+
85+
TriggeredEvent toTriggeredEvent(Iterable<Object>? parameters, AsyncFn fn) =>
86+
TriggeredEvent([entity, action, ...keys, ...?parameters], fn);
87+
}
88+
89+
enum ChangeAction {
90+
create,
91+
update,
92+
delete,
93+
}
94+
95+
/// Describes the event that was triggered.
96+
class TriggeredEvent {
97+
final List<Object?> parameters;
98+
final AsyncFn fn;
99+
100+
TriggeredEvent(this.parameters, this.fn);
101+
102+
late final deduplicateKey =
103+
Uri(pathSegments: parameters.map((p) => p.toString())).toString();
104+
}

app/lib/service/services.dart

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@ import 'package:googleapis_auth/auth_io.dart' as auth;
1616
import 'package:logging/logging.dart';
1717
import 'package:pub_dev/package/api_export/api_exporter.dart';
1818
import 'package:pub_dev/search/handlers.dart';
19-
import 'package:pub_dev/service/async_queue/async_queue.dart';
20-
import 'package:pub_dev/service/download_counts/backend.dart';
21-
import 'package:pub_dev/service/security_advisories/backend.dart';
2219
import 'package:shelf/shelf.dart' as shelf;
2320
import 'package:shelf/shelf_io.dart';
2421

@@ -47,7 +44,11 @@ import '../search/backend.dart';
4744
import '../search/search_client.dart';
4845
import '../search/top_packages.dart';
4946
import '../search/updater.dart';
47+
import '../service/async_queue/async_queue.dart';
48+
import '../service/change_event/change_event.dart';
49+
import '../service/download_counts/backend.dart';
5050
import '../service/email/backend.dart';
51+
import '../service/security_advisories/backend.dart';
5152
import '../service/youtube/backend.dart';
5253
import '../shared/configuration.dart';
5354
import '../shared/datastore.dart';
@@ -250,6 +251,10 @@ Future<R> _withPubServices<R>(FutureOr<R> Function() fn) async {
250251
));
251252
registerAsyncQueue(AsyncQueue());
252253
registerAuditBackend(AuditBackend(dbService));
254+
registerChangeEventAggregator(ChangeEventAggregator([
255+
processPackageChange,
256+
processUserSessionChange,
257+
]));
253258
registerConsentBackend(ConsentBackend(dbService));
254259
registerEmailBackend(EmailBackend(dbService));
255260
registerLikeBackend(LikeBackend(dbService));

0 commit comments

Comments
 (0)