Skip to content

Commit d8b9958

Browse files
fix(cat-voices): pool concurrent documents sync (#2334)
* fix: pool concurrent documents sync * chore: update docs --------- Co-authored-by: Dominik Toton <[email protected]>
1 parent 110a005 commit d8b9958

File tree

5 files changed

+46
-23
lines changed

5 files changed

+46
-23
lines changed

catalyst_voices/melos.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ command:
148148
convert: ^3.1.1
149149
path: ^1.9.1
150150
pinenacl: ^0.6.0
151+
pool: ^1.5.1
151152
uuid_plus: ^0.1.0
152153
sentry_flutter: ^8.8.0
153154
scrollable_positioned_list: ^0.3.8

catalyst_voices/packages/internal/catalyst_voices_repositories/lib/src/document/source/document_data_remote_source.dart

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ import 'package:catalyst_voices_repositories/src/common/response_mapper.dart';
55
import 'package:catalyst_voices_repositories/src/document/document_data_factory.dart';
66
import 'package:catalyst_voices_repositories/src/dto/api/document_index_list_dto.dart';
77
import 'package:catalyst_voices_repositories/src/dto/api/document_index_query_filters_dto.dart';
8+
import 'package:flutter/foundation.dart';
89

910
final class CatGatewayDocumentDataSource implements DocumentDataRemoteSource {
11+
@visibleForTesting
12+
static const indexPageSize = 200;
13+
1014
final ApiServices _api;
1115
final SignedDocumentManager _signedDocumentManager;
1216

@@ -61,7 +65,7 @@ final class CatGatewayDocumentDataSource implements DocumentDataRemoteSource {
6165
final allRefs = <SignedDocumentRef>{};
6266

6367
var page = 0;
64-
const maxPerPage = 100;
68+
const maxPerPage = indexPageSize;
6569
var remaining = 0;
6670

6771
do {

catalyst_voices/packages/internal/catalyst_voices_repositories/test/src/document/source/document_data_remote_source_test.dart

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ void main() {
1818
late final ApiServices apiServices;
1919
late final CatGatewayDocumentDataSource source;
2020

21+
const maxPageSize = CatGatewayDocumentDataSource.indexPageSize;
22+
2123
setUpAll(() {
2224
apiServices = ApiServices.internal(
2325
gateway: gateway,
@@ -43,17 +45,17 @@ void main() {
4345
// Given
4446
final pageZero = DocumentIndexList(
4547
docs: List.generate(
46-
100,
48+
maxPageSize,
4749
(_) => _buildDocumentIndexList().toJson(),
4850
),
49-
page: const CurrentPage(page: 0, limit: 100, remaining: 5),
51+
page: const CurrentPage(page: 0, limit: maxPageSize, remaining: 5),
5052
);
5153
final pageOne = DocumentIndexList(
5254
docs: List.generate(
5355
5,
5456
(_) => _buildDocumentIndexList().toJson(),
5557
),
56-
page: const CurrentPage(page: 1, limit: 100, remaining: 0),
58+
page: const CurrentPage(page: 1, limit: maxPageSize, remaining: 0),
5759
);
5860

5961
final pageZeroResponse = Response(http.Response('', 200), pageZero);
@@ -63,14 +65,14 @@ void main() {
6365
when(
6466
() => gateway.apiV1DocumentIndexPost(
6567
body: const DocumentIndexQueryFilter(),
66-
limit: 100,
68+
limit: maxPageSize,
6769
page: 0,
6870
),
6971
).thenAnswer((_) => Future.value(pageZeroResponse));
7072
when(
7173
() => gateway.apiV1DocumentIndexPost(
7274
body: const DocumentIndexQueryFilter(),
73-
limit: 100,
75+
limit: maxPageSize,
7476
page: 1,
7577
),
7678
).thenAnswer((_) => Future.value(pageOneResponse));
@@ -111,7 +113,7 @@ void main() {
111113
}).toList(),
112114
).toJson(),
113115
],
114-
page: const CurrentPage(page: 0, limit: 100, remaining: 0),
116+
page: const CurrentPage(page: 0, limit: maxPageSize, remaining: 0),
115117
);
116118
final response = Response(http.Response('', 200), page);
117119

@@ -124,7 +126,7 @@ void main() {
124126
when(
125127
() => gateway.apiV1DocumentIndexPost(
126128
body: const DocumentIndexQueryFilter(),
127-
limit: 100,
129+
limit: maxPageSize,
128130
page: 0,
129131
),
130132
).thenAnswer((_) => Future.value(response));

catalyst_voices/packages/internal/catalyst_voices_services/lib/src/documents/documents_service.dart

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import 'package:catalyst_voices_models/catalyst_voices_models.dart';
44
import 'package:catalyst_voices_repositories/catalyst_voices_repositories.dart';
55
import 'package:catalyst_voices_shared/catalyst_voices_shared.dart';
66
import 'package:flutter/foundation.dart';
7+
import 'package:pool/pool.dart';
78
import 'package:result_type/result_type.dart';
89

910
final _logger = Logger('DocumentsService');
@@ -25,6 +26,7 @@ abstract interface class DocumentsService {
2526
/// Returns list of added refs.
2627
Future<List<SignedDocumentRef>> sync({
2728
ValueChanged<double>? onProgress,
29+
int maxConcurrent,
2830
});
2931
}
3032

@@ -38,6 +40,7 @@ final class DocumentsServiceImpl implements DocumentsService {
3840
@override
3941
Future<List<SignedDocumentRef>> sync({
4042
ValueChanged<double>? onProgress,
43+
int maxConcurrent = 100,
4144
}) async {
4245
onProgress?.call(0.1);
4346

@@ -60,31 +63,43 @@ final class DocumentsServiceImpl implements DocumentsService {
6063

6164
var completed = 0;
6265
final total = missingRefs.length;
63-
64-
// Note. Handling or errors as Outcome because we have to
65-
// give a change to all refs to finish and keep all info about what
66-
// failed.
67-
final futures = <Future<Result<SignedDocumentRef, Exception>>>[
68-
for (final ref in missingRefs)
69-
_documentRepository
70-
.cacheDocument(ref: ref)
71-
.then<Result<SignedDocumentRef, Exception>>((_) => Success(ref))
72-
.catchError((Object error, StackTrace stackTrace) {
66+
final pool = Pool(maxConcurrent);
67+
68+
final futures = <Future<void>>[];
69+
final outcomes = <Result<SignedDocumentRef, Exception>>[];
70+
71+
/// Handling or errors as Outcome because we have to
72+
/// give a change to all refs to finish and keep all info about what
73+
/// failed.
74+
for (final ref in missingRefs) {
75+
/// Its possible that missingRefs can be very large
76+
/// and executing too many requests at once throws
77+
/// net::ERR_INSUFFICIENT_RESOURCES in chrome.
78+
/// That's reason for adding pool and limiting max requests.
79+
final future = pool.withResource<void>(() async {
80+
try {
81+
await _documentRepository.cacheDocument(ref: ref);
82+
outcomes.add(_RefSuccess(ref));
83+
} catch (error, stackTrace) {
7384
final exception = RefSyncException(
7485
ref,
7586
error: error,
7687
stack: stackTrace,
7788
);
78-
return _RefFailure(exception);
79-
}).whenComplete(() {
89+
outcomes.add(_RefFailure(exception));
90+
} finally {
8091
completed += 1;
8192
final progress = completed / total;
8293
final totalProgress = 0.2 + (progress * 0.8);
8394
onProgress?.call(totalProgress);
84-
}),
85-
];
95+
}
96+
});
97+
98+
futures.add(future);
99+
}
86100

87-
final outcomes = await futures.wait;
101+
// Wait for all operations managed by the pool to complete
102+
await Future.wait(futures);
88103
final failures = outcomes.whereType<_RefFailure>();
89104

90105
if (failures.isNotEmpty) {

catalyst_voices/packages/internal/catalyst_voices_services/pubspec.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ dependencies:
2929
sdk: flutter
3030
logging: ^1.3.0
3131
path: ^1.9.1
32+
pool: ^1.5.1
3233
result_type: ^0.2.0
3334
rxdart: ^0.27.7
3435
shared_preferences: ^2.3.3

0 commit comments

Comments
 (0)