Skip to content

Commit 95c927d

Browse files
committed
Timeouts and VM exit for index building process.
1 parent c04e483 commit 95c927d

File tree

2 files changed

+73
-32
lines changed

2 files changed

+73
-32
lines changed

app/lib/search/backend.dart

Lines changed: 42 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class SearchBackend {
149149
// update or remove the document
150150
await retry(() async {
151151
try {
152-
final doc = await loadDocument(package);
152+
final doc = await loadDocument(package).timeout(Duration(minutes: 2));
153153
snapshot.add(doc);
154154
} on RemovedPackageException catch (_) {
155155
snapshot.remove(package);
@@ -158,29 +158,34 @@ class SearchBackend {
158158
}
159159

160160
// initial scan of packages
161-
final pool = Pool(concurrency);
162-
final futures = <Future>[];
163-
await for (final package in dbService.query<Package>().run()) {
164-
if (package.isNotVisible) {
165-
continue;
161+
await withVmTerminationTimeout(timeout: Duration(hours: 12), () async {
162+
final pool = Pool(concurrency);
163+
final futures = <Future>[];
164+
await for (final package in dbService.query<Package>().run()) {
165+
if (package.isNotVisible) {
166+
continue;
167+
}
168+
if (!claim.valid) {
169+
break;
170+
}
171+
// This is the first scan, there isn't any existing document that we
172+
// can compare to, ignoring the updated field.
173+
final f = pool.withResource(() => updatePackage(package.name!, null));
174+
futures.add(f);
166175
}
176+
await Future.wait(futures);
177+
await pool.close();
167178
if (!claim.valid) {
168-
break;
179+
return;
169180
}
170-
// This is the first scan, there isn't any existing document that we
171-
// can compare to, ignoring the updated field.
172-
final f = pool.withResource(() => updatePackage(package.name!, null));
173-
futures.add(f);
174-
}
175-
await Future.wait(futures);
176-
futures.clear();
177-
if (!claim.valid) {
178-
return;
179-
}
180-
snapshot.updateAllScores();
181+
snapshot.updateAllScores();
182+
183+
// first complete snapshot, uploading it
184+
await _snapshotStorage
185+
.uploadDataAsJsonMap(snapshot.toJson())
186+
.timeout(Duration(minutes: 10));
187+
});
181188

182-
// first complete snapshot, uploading it
183-
await _snapshotStorage.uploadDataAsJsonMap(snapshot.toJson());
184189
var lastUploadedSnapshotTimestamp = snapshot.updated!;
185190

186191
// start monitoring
@@ -193,29 +198,34 @@ class SearchBackend {
193198

194199
lastQueryStarted = now;
195200

196-
// query updates
197-
final recentlyUpdated = await _queryRecentlyUpdated(lastQueryStarted);
198-
for (final e in recentlyUpdated.entries) {
199-
if (!claim.valid) {
200-
break;
201+
await withVmTerminationTimeout(timeout: Duration(hours: 1), () async {
202+
final pool = Pool(concurrency);
203+
final futures = <Future>[];
204+
// query updates
205+
final recentlyUpdated = await _queryRecentlyUpdated(lastQueryStarted);
206+
for (final e in recentlyUpdated.entries) {
207+
if (!claim.valid) {
208+
break;
209+
}
210+
final f = pool.withResource(() => updatePackage(e.key, e.value));
211+
futures.add(f);
201212
}
202-
final f = pool.withResource(() => updatePackage(e.key, e.value));
203-
futures.add(f);
204-
}
205-
await Future.wait(futures);
206-
futures.clear();
213+
await Future.wait(futures);
214+
await pool.close();
215+
});
207216

208217
if (claim.valid && lastUploadedSnapshotTimestamp != snapshot.updated) {
209218
// Updates the normalized scores across all the packages.
210219
snapshot.updateAllScores();
211220

212-
await _snapshotStorage.uploadDataAsJsonMap(snapshot.toJson());
221+
await _snapshotStorage
222+
.uploadDataAsJsonMap(snapshot.toJson())
223+
.timeout(Duration(minutes: 10));
213224
lastUploadedSnapshotTimestamp = snapshot.updated!;
214225
}
215226

216227
await Future.delayed(sleepDuration);
217228
}
218-
await pool.close();
219229
}
220230

221231
Future<Map<String, DateTime>> _queryRecentlyUpdated(

app/lib/shared/utils.dart

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ import 'dart:typed_data';
1111

1212
import 'package:appengine/appengine.dart';
1313
import 'package:intl/intl.dart';
14+
import 'package:logging/logging.dart';
1415
// ignore: implementation_imports
1516
import 'package:mime/src/default_extension_map.dart' as mime;
1617
import 'package:path/path.dart' as p;
18+
import 'package:pub_dev/shared/monitoring.dart';
1719
import 'package:pub_semver/pub_semver.dart' as semver;
1820

1921
export 'package:pana/pana.dart' show exampleFileCandidates;
@@ -28,6 +30,7 @@ final Duration twoYears = const Duration(days: 2 * 365);
2830
const _cloudTraceContextHeader = 'X-Cloud-Trace-Context';
2931

3032
final _random = Random.secure();
33+
final _logger = Logger('pub.utils');
3134

3235
final DateFormat shortDateFormat = DateFormat.yMMMd();
3336

@@ -305,3 +308,31 @@ extension ByteFolderExt on Stream<List<int>> {
305308
return buffer.toBytes();
306309
}
307310
}
311+
312+
/// Executes [fn] returning its results, but terminating the Dart VM if that
313+
/// execution takes longer than [timeout].
314+
Future<T> withVmTerminationTimeout<T>(
315+
Future<T> Function() fn, {
316+
required Duration timeout,
317+
}) async {
318+
final trace = StackTrace.current;
319+
final timer = Timer(timeout, () {
320+
// Give the logging a short time to be stored outside of the machine.
321+
Timer(Duration(seconds: 10), () async {
322+
exit(-1);
323+
});
324+
325+
stderr.writeln('Timeout triggering VM termination\n$trace');
326+
_logger.pubNoticeShout(
327+
'vm-termination',
328+
'Timeout triggering VM termination',
329+
Exception(),
330+
trace,
331+
);
332+
});
333+
try {
334+
return await fn();
335+
} finally {
336+
timer.cancel();
337+
}
338+
}

0 commit comments

Comments
 (0)