Skip to content

Commit 2acf367

Browse files
committed
feat: remove mutex and use semaphore solution to solve the indexing issue, add document service unit tests
1 parent 03fc4fe commit 2acf367

File tree

12 files changed

+1264
-66
lines changed

12 files changed

+1264
-66
lines changed

lib/l10n/arb/app_localizations_en.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// ignore: unused_import
2+
import 'package:intl/intl.dart' as intl;
13
import 'app_localizations.dart';
24

35
// ignore_for_file: type=lint

lib/l10n/arb/app_localizations_es.dart

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// ignore: unused_import
2+
import 'package:intl/intl.dart' as intl;
13
import 'app_localizations.dart';
24

35
// ignore_for_file: type=lint

melos.yaml

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,17 @@ scripts:
99
description: Run `flutter analyze` for all packages
1010

1111
test:
12-
run: melos exec -- flutter test
13-
description: Run `flutter test` for all packages
12+
run: melos exec -- flutter test --platform chrome
13+
description: Run `flutter test` for all packages in web environment
14+
packageFilters:
15+
dirExists: test
16+
17+
test_document_service:
18+
run: melos exec -- flutter test test/services/document_service_test.dart --platform chrome
19+
description: Run `flutter test` for document service in web environment
20+
packageFilters:
21+
scope:
22+
- document
1423

1524
build_dev:
1625
run: flutter build web --release --no-tree-shake-icons --target lib/main_development.dart --dart-define-from-file=.env.dev --no-source-maps --pwa-strategy=none

packages/chat/lib/src/services/chat_api_service.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ class ChatApiService {
275275

276276
List<int> _gzipEncoderFunction(String request, RequestOptions options) {
277277
options.headers.putIfAbsent('Content-Encoding', () => 'gzip');
278-
return _gzipEncoder.encode(utf8.encode(request))!;
278+
return _gzipEncoder.encode(utf8.encode(request));
279279
}
280280

281281
Future<Map<String, dynamic>?> embed(

packages/document/lib/src/app/app.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,5 +71,5 @@ class App {
7171
mixpanelAnalyticsClient,
7272
if (!foundation.kReleaseMode) LoggerAnalyticsClient(),
7373
]);
74-
}
74+
}
7575
}

packages/document/lib/src/services/document_api_service.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class DocumentApiService {
172172

173173
List<int> gzipRequestEncoder(String request, RequestOptions options) {
174174
options.headers.putIfAbsent('Content-Encoding', () => 'gzip');
175-
return _gzipEncoder.encode(utf8.encode(request))!;
175+
return _gzipEncoder.encode(utf8.encode(request));
176176
}
177177
}
178178

packages/document/lib/src/services/document_service.dart

Lines changed: 84 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import 'dart:typed_data';
55

66
import 'package:analytics/analytics.dart';
77
import 'package:archive/archive.dart';
8+
import 'package:async/async.dart';
9+
import 'package:async_locks/async_locks.dart';
810
import 'package:dio/dio.dart';
911
import 'package:document/src/app/app.locator.dart';
1012
import 'package:document/src/app/app.logger.dart';
@@ -19,7 +21,6 @@ import 'package:document/src/services/document_repository.dart';
1921
import 'package:document/src/services/embedding.dart';
2022
import 'package:document/src/services/embedding_repository.dart';
2123
import 'package:document/src/services/split_config.dart';
22-
import 'package:mutex/mutex.dart';
2324
import 'package:settings/settings.dart';
2425
import 'package:stacked/stacked.dart';
2526
import 'package:surrealdb_js/surrealdb_js.dart';
@@ -40,7 +41,8 @@ class DocumentService with ListenableServiceMixin {
4041
final _gzipEncoder = locator<GZipEncoder>();
4142
final _gzipDecoder = locator<GZipDecoder>();
4243
final _analyticsFacade = locator<AnalyticsFacade>();
43-
final _mutex = Mutex();
44+
final AsyncMemoizer<void> _semaphoreInitMemoizer = AsyncMemoizer<void>();
45+
Semaphore? _indexingSemaphore;
4446

4547
int _total = -1;
4648
final _items = <DocumentItem>[];
@@ -49,6 +51,24 @@ class DocumentService with ListenableServiceMixin {
4951

5052
final _log = getLogger('DocumentService');
5153

54+
// Helper to initialize the semaphore, potentially reading from settings
55+
Future<void> _initializeIndexingSemaphore() async {
56+
await _semaphoreInitMemoizer.runOnce(() async {
57+
var maxConcurrency = 5; // Default value
58+
try {
59+
// Example: Load from settings if available
60+
final settingValue = _settingService.get(maxIndexingConcurrencyKey);
61+
maxConcurrency = int.tryParse(settingValue.value) ?? maxConcurrency;
62+
} catch (e) {
63+
_log.w('''
64+
Could not read maxIndexingConcurrency setting, using default: $maxConcurrency.
65+
Error: $e''');
66+
}
67+
_indexingSemaphore = Semaphore(maxConcurrency);
68+
_log.i('Indexing Semaphore initialized with limit: $maxConcurrency');
69+
});
70+
}
71+
5272
Future<bool> isSchemaCreated(String tablePrefix) async {
5373
final results = await _db.query('INFO FOR DB');
5474
final result = Map<String, dynamic>.from(results! as Map);
@@ -79,6 +99,9 @@ class DocumentService with ListenableServiceMixin {
7999
}
80100

81101
Future<void> initialise(String tablePrefix, String dimensions) async {
102+
// Initialize semaphore during service initialization
103+
await _initializeIndexingSemaphore();
104+
82105
if (!await isSchemaCreated(tablePrefix)) {
83106
await createSchema(tablePrefix, dimensions);
84107
}
@@ -135,8 +158,26 @@ class DocumentService with ListenableServiceMixin {
135158
);
136159
_items.insert(0, documentItem);
137160
notifyListeners();
138-
_log.d('0. documentItem.hashCode ${documentItem.hashCode}');
139-
await _split(documentItem);
161+
_log.d('Queueing split/index for document: ${documentItem.item.id}');
162+
// No await here, let it run in the background
163+
unawaited(
164+
_split(documentItem).catchError(
165+
(Object e, StackTrace s) {
166+
_log.e(
167+
'Error during background split/index process for ${documentItem.item.id}',
168+
error: e,
169+
stackTrace: s,
170+
);
171+
// Ensure status is updated even if error occurs outside
172+
//_onError scope
173+
updateDocumentDoneStatus(
174+
documentItem,
175+
DocumentStatus.failed,
176+
'Unhandled error during processing: $e',
177+
);
178+
},
179+
),
180+
);
140181
}
141182
}
142183
}
@@ -332,7 +373,7 @@ class DocumentService with ListenableServiceMixin {
332373
if (_isGzFile(bytes)) {
333374
return Uint8List.fromList(bytes);
334375
} else {
335-
return Uint8List.fromList(_gzipEncoder.encode(bytes)!);
376+
return Uint8List.fromList(_gzipEncoder.encode(bytes));
336377
}
337378
}
338379

@@ -427,11 +468,45 @@ class DocumentService with ListenableServiceMixin {
427468
final embeddings = await _splitted(documentItem, responseData).timeout(
428469
Duration(seconds: max((responseData?['items'] as List).length, 600)),
429470
);
430-
await _mutex.protect(() async {
471+
472+
// Acquire semaphore before starting indexing
473+
// Check if semaphore is null before acquiring,
474+
// though it should be initialized.
475+
if (_indexingSemaphore == null) {
476+
_log.e(
477+
'Indexing Semaphore is null, cannot index ${documentItem.item.id}',
478+
);
479+
await _handleError(
480+
documentItem,
481+
'Internal error: Concurrency control not initialized.',
482+
);
483+
return;
484+
}
485+
486+
await _indexingSemaphore!.acquire();
487+
_log.d('Semaphore acquired for document: ${documentItem.item.id}');
488+
try {
489+
// Execute the indexing logic
431490
await _indexing(documentItem, embeddings).timeout(
432491
Duration(seconds: max(embeddings.length, 900)),
433492
);
434-
});
493+
_log.i(
494+
'Indexing completed successfully for document: ${documentItem.item.id}',
495+
);
496+
} catch (e, s) {
497+
_log.e(
498+
'Error during _indexing for ${documentItem.item.id}',
499+
error: e,
500+
stackTrace: s,
501+
);
502+
// Handle error specifically during indexing phase
503+
await _handleError(documentItem, 'Error during indexing: $e');
504+
} finally {
505+
// IMPORTANT: Release semaphore in the finally block
506+
// to ensure it's always released, even if _indexing throws an error.
507+
_indexingSemaphore!.release();
508+
_log.d('Semaphore released for document: ${documentItem.item.id}');
509+
}
435510
}
436511

437512
Future<List<Embedding>> _splitted(
@@ -582,7 +657,7 @@ class DocumentService with ListenableServiceMixin {
582657
DocumentStatus status, [
583658
String? errorMessage,
584659
]) async {
585-
switch(status) {
660+
switch (status) {
586661
case DocumentStatus.completed:
587662
unawaited(_analyticsFacade.trackDocumentUploadCompleted());
588663
case DocumentStatus.failed:
@@ -591,11 +666,10 @@ class DocumentService with ListenableServiceMixin {
591666
unawaited(_analyticsFacade.trackDocumentUploadCancelled());
592667
// ignore: no_default_cases
593668
default:
594-
// do nothing
669+
// do nothing
595670
}
596671
}
597672

598-
599673
Future<void> updateDocumentIndexingStatus(DocumentItem documentItem) async {
600674
final now = DateTime.now();
601675
documentItem.item = (await _documentRepository.updateDocument(

packages/document/pubspec.yaml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ dependencies:
1111
analytics:
1212
path: ../analytics
1313
archive: ^4.0.1
14+
async: ^2.12.0
15+
async_locks: ^4.0.2
1416
database:
1517
path: ../database
1618
dio: ^5.6.0
@@ -25,7 +27,6 @@ dependencies:
2527
logger:
2628
mime: ^2.0.0
2729
mime_type: ^1.0.0
28-
mutex: ^3.1.0
2930
responsive_builder: ^0.7.0
3031
settings:
3132
path: ../settings
@@ -41,6 +42,7 @@ dependencies:
4142

4243
dev_dependencies:
4344
build_runner: ^2.4.11
45+
fake_async: ^1.3.1
4446
flutter_test:
4547
sdk: flutter
4648
freezed: ^2.5.3

0 commit comments

Comments
 (0)