Skip to content

Commit 84a020b

Browse files
committed
Add caching for own capabilities
1 parent 708cdd9 commit 84a020b

File tree

7 files changed

+214
-11
lines changed

7 files changed

+214
-11
lines changed

packages/stream_feeds/lib/src/generated/api/model/own_capabilities_batch_response.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ class OwnCapabilitiesBatchResponse with _$OwnCapabilitiesBatchResponse {
2323
});
2424

2525
@override
26-
final Map<String, List<String>> capabilities;
26+
final Map<String, List<FeedOwnCapability>> capabilities;
2727

2828
@override
2929
final String duration;

packages/stream_feeds/lib/src/generated/api/model/own_capabilities_batch_response.freezed.dart

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/stream_feeds/lib/src/generated/api/model/own_capabilities_batch_response.g.dart

Lines changed: 40 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import 'dart:async';
2+
3+
import '../../stream_feeds.dart' as api;
4+
import '../../stream_feeds.dart';
5+
import '../utils/batcher.dart';
6+
7+
class CapabilitiesRepository {
8+
CapabilitiesRepository({required api.DefaultApi api}) : _api = api;
9+
10+
final api.DefaultApi _api;
11+
final Map<String, List<FeedOwnCapability>> _capabilities = {};
12+
13+
late final Batcher<String, Result<Map<String, List<FeedOwnCapability>>>>
14+
_fetchBatcher = Batcher(
15+
action: (feeds) => fetchCapabilities(feeds: feeds),
16+
);
17+
18+
Future<Result<Map<String, List<FeedOwnCapability>>>> fetchCapabilities({
19+
required List<String> feeds,
20+
}) async {
21+
final result = await _api.ownCapabilitiesBatch(
22+
ownCapabilitiesBatchRequest: api.OwnCapabilitiesBatchRequest(
23+
feeds: feeds,
24+
),
25+
);
26+
27+
return result.map((response) {
28+
_mergeWithCache(response.capabilities);
29+
return response.capabilities;
30+
});
31+
}
32+
33+
void addCapabilities(String feed, List<FeedOwnCapability> capabilities) {
34+
_capabilities[feed] = capabilities;
35+
}
36+
37+
Future<List<FeedOwnCapability>?> getCapabilities(String feed) async {
38+
return _capabilities[feed] ??
39+
(await _fetchBatchedFeedCapabilities(feed)).getOrNull();
40+
}
41+
42+
void dispose() {
43+
_fetchBatcher.dispose();
44+
}
45+
46+
Future<Result<List<FeedOwnCapability>>> _fetchBatchedFeedCapabilities(
47+
String feed,
48+
) async {
49+
final capabilities = await _fetchBatcher.add(feed);
50+
return capabilities.map((capabilities) => capabilities[feed] ?? []);
51+
}
52+
53+
void _mergeWithCache(Map<String, List<FeedOwnCapability>> capabilities) {
54+
_capabilities.addAll(capabilities);
55+
}
56+
}

packages/stream_feeds/lib/src/state/event/feed_event_handler.dart

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import '../../models/feed_id.dart';
1111
import '../../models/feeds_reaction_data.dart';
1212
import '../../models/follow_data.dart';
1313
import '../../models/mark_activity_data.dart';
14+
import '../../repository/capabilities_repository.dart';
1415
import '../feed_state.dart';
1516

1617
import 'state_event_handler.dart';
@@ -19,16 +20,28 @@ class FeedEventHandler implements StateEventHandler {
1920
const FeedEventHandler({
2021
required this.fid,
2122
required this.state,
23+
required this.capabilitiesRepository,
2224
});
2325

2426
final FeedId fid;
2527
final FeedStateNotifier state;
28+
final CapabilitiesRepository capabilitiesRepository;
2629

2730
@override
28-
void handleEvent(WsEvent event) {
31+
Future<void> handleEvent(WsEvent event) async {
2932
if (event is api.ActivityAddedEvent) {
3033
if (event.fid != fid.rawValue) return;
31-
return state.onActivityAdded(event.activity.toModel());
34+
var activity = event.activity.toModel();
35+
state.onActivityAdded(activity);
36+
37+
final ownCapabilities =
38+
await capabilitiesRepository.getCapabilities(fid.rawValue);
39+
activity = activity.copyWith(
40+
currentFeed: activity.currentFeed?.copyWith(
41+
ownCapabilities: ownCapabilities ?? [],
42+
),
43+
);
44+
state.onActivityUpdated(activity);
3245
}
3346

3447
if (event is api.ActivityDeletedEvent) {

packages/stream_feeds/lib/src/state/feed.dart

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import '../models/request/activity_add_comment_request.dart';
1919
import '../models/request/feed_add_activity_request.dart';
2020
import '../repository/activities_repository.dart';
2121
import '../repository/bookmarks_repository.dart';
22+
import '../repository/capabilities_repository.dart';
2223
import '../repository/comments_repository.dart';
2324
import '../repository/feeds_repository.dart';
2425
import '../repository/polls_repository.dart';
@@ -48,6 +49,7 @@ class Feed with Disposable {
4849
required this.commentsRepository,
4950
required this.feedsRepository,
5051
required this.pollsRepository,
52+
required this.capabilitiesRepository,
5153
required this.eventsEmitter,
5254
required Stream<void> onReconnectEmitter,
5355
}) {
@@ -66,7 +68,11 @@ class Feed with Disposable {
6668
);
6769

6870
// Attach event handlers for the feed events
69-
final handler = FeedEventHandler(fid: fid, state: _stateNotifier);
71+
final handler = FeedEventHandler(
72+
fid: fid,
73+
state: _stateNotifier,
74+
capabilitiesRepository: capabilitiesRepository,
75+
);
7076
_feedSubscriptions.add(eventsEmitter.listen(handler.handleEvent));
7177

7278
// Automatically refetch data on reconnection
@@ -84,6 +90,7 @@ class Feed with Disposable {
8490
final CommentsRepository commentsRepository;
8591
final FeedsRepository feedsRepository;
8692
final PollsRepository pollsRepository;
93+
final CapabilitiesRepository capabilitiesRepository;
8794

8895
late final MemberList _memberList;
8996

@@ -108,7 +115,23 @@ class Feed with Disposable {
108115
/// Returns a [Result] containing the [FeedData] or an error.
109116
Future<Result<FeedData>> getOrCreate() async {
110117
final result = await feedsRepository.getOrCreateFeed(query);
111-
result.onSuccess(_stateNotifier.onQueryFeed);
118+
result.onSuccess((feedData) {
119+
_stateNotifier.onQueryFeed(feedData);
120+
121+
// TODO move and also do with fetchMore
122+
capabilitiesRepository.addCapabilities(
123+
feedData.feed.id,
124+
feedData.feed.ownCapabilities,
125+
);
126+
for (final activity in feedData.activities.items) {
127+
if (activity.currentFeed case final feed?) {
128+
capabilitiesRepository.addCapabilities(
129+
feed.id,
130+
feed.ownCapabilities,
131+
);
132+
}
133+
}
134+
});
112135

113136
return result.map((feedData) => feedData.feed);
114137
}
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import 'dart:async';
2+
3+
class Batcher<T, R> {
4+
Batcher({
5+
required this.action,
6+
this.interval = const Duration(seconds: 2),
7+
});
8+
9+
final Duration interval;
10+
final Action<T, R> action;
11+
Timer? _timer;
12+
Set<T> _itemsToProcess = {};
13+
Set<T> _itemsBeingProcessed = {};
14+
15+
late DateTime _lastRun = DateTime.fromMillisecondsSinceEpoch(0);
16+
Completer<R>? _nextActionCompleter;
17+
Completer<R>? _ongoingActionCompleter;
18+
19+
/// Adds an item to the batch and returns the result of the whole batch.
20+
Future<R> add(T item) {
21+
if (_itemsBeingProcessed.contains(item)) {
22+
return _ongoingActionCompleter!.future;
23+
}
24+
25+
_itemsToProcess.add(item);
26+
_nextActionCompleter ??= _planBatchFetch();
27+
return _nextActionCompleter!.future;
28+
}
29+
30+
void dispose() {
31+
_timer?.cancel();
32+
}
33+
34+
Completer<R> _planBatchFetch() {
35+
final timeSinceLastRun = DateTime.now().difference(_lastRun);
36+
37+
final newActionCompleter = Completer<R>();
38+
_nextActionCompleter = newActionCompleter;
39+
_lastRun = DateTime.now();
40+
41+
if (timeSinceLastRun >= interval) {
42+
_runBatch();
43+
} else {
44+
_timer = Timer(interval - timeSinceLastRun, _runBatch);
45+
}
46+
47+
return newActionCompleter;
48+
}
49+
50+
void _runBatch() {
51+
if (_nextActionCompleter == null) {
52+
return;
53+
}
54+
final completer = _nextActionCompleter!;
55+
_ongoingActionCompleter = completer;
56+
_itemsBeingProcessed = _itemsToProcess;
57+
58+
final currentAction = action(_itemsToProcess.toList());
59+
completer.complete(currentAction);
60+
currentAction.whenComplete(
61+
() {
62+
_ongoingActionCompleter = null;
63+
_itemsBeingProcessed = {};
64+
},
65+
);
66+
67+
_nextActionCompleter = null;
68+
_itemsToProcess = {};
69+
_timer = null;
70+
}
71+
}
72+
73+
typedef Action<T, R> = Future<R> Function(List<T> items);

0 commit comments

Comments
 (0)