@@ -11,6 +11,7 @@ import 'package:logging/logging.dart';
1111import 'package:pub_dev/search/result_combiner.dart' ;
1212import 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart' ;
1313import 'package:pub_dev/service/entrypoint/search_index.dart' ;
14+ import 'package:shelf/shelf.dart' ;
1415
1516import '../../search/backend.dart' ;
1617import '../../search/handlers.dart' ;
@@ -38,47 +39,77 @@ class SearchCommand extends Command {
3839
3940 envConfig.checkServiceEnvironment (name);
4041 await withServices (() async {
41- final packageIsolate = await startSearchIsolate (logger: _logger);
42- registerScopeExitCallback (packageIsolate.close);
43-
44- final sdkIsolate = await startQueryIsolate (
45- logger: _logger,
46- kind: 'sdk' ,
47- spawnUri: Uri .parse (
48- 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart' ,
49- ),
50- );
51- registerScopeExitCallback (sdkIsolate.close);
52-
53- registerSearchIndex (
54- SearchResultCombiner (
55- primaryIndex: LatencyAwareSearchIndex (
56- IsolateSearchIndex (packageIsolate),
57- ),
58- sdkIndex: SdkIsolateIndex (sdkIsolate),
59- ),
42+ await runSearchInstanceController (
43+ port: 8080 ,
44+ renewPackageIndex: _createRenewStream (delayDrift: delayDrift),
6045 );
61-
62- void scheduleRenew () {
63- scheduleMicrotask (() async {
64- // 12 - 17 minutes delay
65- final delay = Duration (
66- minutes: 12 ,
67- seconds: delayDrift + _random.nextInt (240 ),
68- );
69- await Future .delayed (delay);
70-
71- // create a new index and handover with a 2-minute maximum wait
72- await packageIsolate.renew (count: 1 , wait: Duration (minutes: 2 ));
73-
74- // schedule the renewal again
75- scheduleRenew ();
76- });
77- }
78-
79- scheduleRenew ();
80-
81- await runHandler (_logger, searchServiceHandler);
8246 });
8347 }
8448}
49+
50+ /// Creates a stream with events separated by 12 - 17 minutes
51+ Stream <Completer > _createRenewStream ({required int delayDrift}) {
52+ return Stream .periodic (Duration (minutes: 12 ), (_) => Completer ()).asyncMap (
53+ (c) => Future .delayed (
54+ Duration (seconds: delayDrift + _random.nextInt (240 )),
55+ () => c,
56+ ),
57+ );
58+ }
59+
60+ /// Runs the search instance main controller, which creates separate isolates
61+ /// for the package and the SDK indexes.
62+ ///
63+ /// When the [renewPackageIndex] has a new event, it will trigger the renewal of the
64+ /// package index isolate, updating the search index.
65+ Future <void > runSearchInstanceController ({
66+ required int port,
67+ required Stream <Completer > renewPackageIndex,
68+ Duration renewWait = const Duration (minutes: 2 ),
69+ String ? snapshot,
70+ Handler ? handler,
71+ Future <void > Function ()? processTerminationSignal,
72+ }) async {
73+ final packageIsolate = await startSearchIsolate (
74+ logger: _logger,
75+ snapshot: snapshot,
76+ );
77+ registerScopeExitCallback (packageIsolate.close);
78+
79+ final sdkIsolate = await startQueryIsolate (
80+ logger: _logger,
81+ kind: 'sdk' ,
82+ spawnUri: Uri .parse (
83+ 'package:pub_dev/service/entrypoint/sdk_isolate_index.dart' ,
84+ ),
85+ );
86+ registerScopeExitCallback (sdkIsolate.close);
87+
88+ registerSearchIndex (
89+ SearchResultCombiner (
90+ primaryIndex: LatencyAwareSearchIndex (IsolateSearchIndex (packageIsolate)),
91+ sdkIndex: SdkIsolateIndex (sdkIsolate),
92+ ),
93+ );
94+
95+ final updateStream = renewPackageIndex.asyncMap ((c) async {
96+ try {
97+ // create a new index and handover with a 2-minute maximum wait
98+ await packageIsolate.renew (count: 1 , wait: renewWait);
99+ c.complete (null );
100+ } catch (e, st) {
101+ c.completeError (e, st);
102+ }
103+ });
104+ final updateListener = updateStream.listen ((_) {
105+ _logger.info ('Package SDK isolate renewed.' );
106+ });
107+
108+ await runHandler (
109+ _logger,
110+ handler ?? searchServiceHandler,
111+ port: port,
112+ processTerminationSignal: processTerminationSignal,
113+ );
114+ await updateListener.cancel ();
115+ }
0 commit comments