Skip to content

Commit a6a99dc

Browse files
mralephCommit Queue
authored andcommitted
[vm] Fix issues in SampleBlockProcessor
This is follow up to commit cb0c2bf. SampleBlockProcessor only enters isolate group and not a specific isolate so the code must not rely on thread->isolate(). This fixes two places where this was not the case: * ProfileBuilder::IsPCInDartHeap * UserTags::TagName pkg/dds/test/get_cached_cpu_samples_test was supposed to cover this but it has two problems: First I observed that SampleBlockProcessor never gets a chance to process a block if mutator thread always gets to it first (via a scheduled interrupt), so this code is not well exercised. I started by adding a variant of the test where interrupts are inhibited via a vm:unsafe:no-interrupts pragma - which revealed the crashes in the SampleBlockProcessor code. This revealed the second problem: get_cached_cpu_samples_test does not actually fail if testee crashes during the test, it just silently completes with success. This seems to happen because disposal of VmService connection is not forwarded into the future on which the test is awaiting - and the whole process just exits once VmService connection to the testee disappears (because all ports are closed, no pending activity is possible after that one). I have fixed this by adding a helper function which checks that connection to VmService only goes away when we dispose it. Note: there is another obvious issue here, which I am leaving unfixed for now. SampleBlockProcessor calls UserTags::TagName in a way that can race with isolate itself modifying the table. I think this race is extremely unlikely but it can cause crashes on ARMs with its weak memory model (e.g. we might end up reading garbage due to the reordering of stores). TEST=pkg/dds/test/get_cached_cpu_samples_test Change-Id: Iee15ec2b019928b798c312e63edc76696abf5527 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/426300 Reviewed-by: Martin Kustermann <[email protected]> Commit-Queue: Slava Egorov <[email protected]>
1 parent 2455ace commit a6a99dc

File tree

7 files changed

+239
-183
lines changed

7 files changed

+239
-183
lines changed

pkg/dds/test/common/test_helper.dart

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -583,3 +583,25 @@ Future<void> runVMTests(
583583
);
584584
}
585585
}
586+
587+
/// Runs the given [testBody] against the [VmService] at [serviceUri].
588+
///
589+
/// The test will fail if connection goes away prematurely.
590+
Future<void> withServiceConnection(
591+
Uri serviceUri, Future<void> Function(VmService) testBody) async {
592+
var disposed = false;
593+
final service = await vmServiceConnectUri(serviceUri.toString());
594+
await Future.wait([
595+
() async {
596+
try {
597+
await testBody(service);
598+
} finally {
599+
disposed = true;
600+
await service.dispose();
601+
}
602+
}(),
603+
service.onDone.then((_) {
604+
expect(disposed, isTrue);
605+
}),
606+
], eagerError: true);
607+
}

pkg/dds/test/cpu_sample_streaming_test.dart

Lines changed: 85 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
// Copyright (c) 2021, the Dart project authors. Please see the AUTHORS file
22
// for details. All rights reserved. Use of this source code is governed by a
33
// BSD-style license that can be found in the LICENSE file.
4+
// VMOptions=
5+
// VMOptions=-Ddisable.interrupts.to.test.sample.block.processor=true
46

57
import 'dart:async';
68
import 'dart:io';
79

810
import 'package:dds/dds.dart';
911
import 'package:test/test.dart';
1012
import 'package:vm_service/vm_service.dart';
11-
import 'package:vm_service/vm_service_io.dart';
1213
import 'common/test_helper.dart';
1314

1415
void main() {
@@ -41,90 +42,100 @@ void main() {
4142
} else {
4243
serviceUri = serviceUri.replace(scheme: 'ws', path: 'ws');
4344
}
44-
final service = await vmServiceConnectUri(serviceUri.toString());
45-
final otherService = await vmServiceConnectUri(serviceUri.toString());
46-
47-
IsolateRef isolate;
48-
while (true) {
49-
final vm = await service.getVM();
50-
if (vm.isolates!.isNotEmpty) {
51-
isolate = vm.isolates!.first;
52-
try {
53-
isolate = await service.getIsolate(isolate.id!);
54-
if ((isolate as Isolate).runnable!) {
55-
break;
45+
await withServiceConnection(serviceUri, (service) async {
46+
await withServiceConnection(serviceUri, (otherService) async {
47+
IsolateRef isolate;
48+
while (true) {
49+
final vm = await service.getVM();
50+
if (vm.isolates!.isNotEmpty) {
51+
isolate = vm.isolates!.first;
52+
try {
53+
isolate = await service.getIsolate(isolate.id!);
54+
if ((isolate as Isolate).runnable!) {
55+
break;
56+
}
57+
} on SentinelException {
58+
// ignore
59+
}
5660
}
57-
} on SentinelException {
58-
// ignore
61+
await Future.delayed(const Duration(seconds: 1));
5962
}
60-
}
61-
await Future.delayed(const Duration(seconds: 1));
62-
}
63-
expect(isolate, isNotNull);
63+
expect(isolate, isNotNull);
6464

65-
final expectedUserTags = <String>{};
65+
final expectedUserTags = <String>{};
6666

67-
Future<void> listenForSamples() {
68-
late StreamSubscription sub;
69-
final completer = Completer<void>();
70-
int i = 0;
71-
sub = service.onProfilerEvent.listen(
72-
(event) async {
73-
if (event.kind == EventKind.kCpuSamples &&
74-
event.isolate!.id! == isolate.id!) {
75-
expect(expectedUserTags.isNotEmpty, true);
76-
++i;
77-
if (i > 3) {
67+
Future<void> listenForSamples() {
68+
late StreamSubscription sub;
69+
final completer = Completer<void>();
70+
int i = 0;
71+
sub = service.onProfilerEvent.listen(
72+
(event) async {
73+
if (event.kind == EventKind.kCpuSamples &&
74+
event.isolate!.id! == isolate.id!) {
75+
expect(expectedUserTags.isNotEmpty, true);
76+
++i;
77+
if (i > 3) {
78+
if (!completer.isCompleted) {
79+
await sub.cancel();
80+
completer.complete();
81+
}
82+
return;
83+
}
84+
expect(event.cpuSamples, isNotNull);
85+
final sampleCount = event.cpuSamples!.samples!
86+
.where((e) => expectedUserTags.contains(e.userTag))
87+
.length;
88+
expect(sampleCount, event.cpuSamples!.samples!.length);
89+
}
90+
},
91+
onDone: () {
7892
if (!completer.isCompleted) {
79-
await sub.cancel();
80-
completer.complete();
93+
completer.completeError(
94+
StateError('unexpected end of the profiler stream'));
8195
}
82-
return;
83-
}
84-
expect(event.cpuSamples, isNotNull);
85-
final sampleCount = event.cpuSamples!.samples!
86-
.where((e) => expectedUserTags.contains(e.userTag))
87-
.length;
88-
expect(sampleCount, event.cpuSamples!.samples!.length);
96+
},
97+
onError: (e) {
98+
completer.completeError(e);
99+
},
100+
);
101+
if (expectedUserTags.isEmpty) {
102+
return Future.delayed(const Duration(seconds: 2)).then(
103+
(_) async => await sub.cancel(),
104+
);
89105
}
90-
},
91-
);
92-
if (expectedUserTags.isEmpty) {
93-
return Future.delayed(const Duration(seconds: 2)).then(
94-
(_) async => await sub.cancel(),
95-
);
96-
}
97-
return completer.future;
98-
}
99-
100-
await service.streamListen(EventStreams.kProfiler);
101-
Future<void> subscription = listenForSamples();
102-
await service.resume(isolate.id!);
103-
await subscription;
104-
await service.pause(isolate.id!);
106+
return completer.future;
107+
}
105108

106-
expectedUserTags.add('Testing');
107-
await service.streamCpuSamplesWithUserTag(expectedUserTags.toList());
108-
subscription = listenForSamples();
109-
await service.resume(isolate.id!);
110-
await subscription;
111-
await service.pause(isolate.id!);
109+
await service.streamListen(EventStreams.kProfiler);
110+
Future<void> subscription = listenForSamples();
111+
await service.resume(isolate.id!);
112+
await subscription;
113+
await service.pause(isolate.id!);
112114

113-
expectedUserTags.add('Baz');
114-
await service.streamCpuSamplesWithUserTag(expectedUserTags.toList());
115-
subscription = listenForSamples();
116-
await service.resume(isolate.id!);
117-
await subscription;
118-
await service.pause(isolate.id!);
115+
expectedUserTags.add('Testing');
116+
await service
117+
.streamCpuSamplesWithUserTag(expectedUserTags.toList());
118+
subscription = listenForSamples();
119+
await service.resume(isolate.id!);
120+
await subscription;
121+
await service.pause(isolate.id!);
119122

120-
expectedUserTags.clear();
121-
await service.streamCpuSamplesWithUserTag(expectedUserTags.toList());
122-
subscription = listenForSamples();
123-
await service.resume(isolate.id!);
124-
await subscription;
123+
expectedUserTags.add('Baz');
124+
await service
125+
.streamCpuSamplesWithUserTag(expectedUserTags.toList());
126+
subscription = listenForSamples();
127+
await service.resume(isolate.id!);
128+
await subscription;
129+
await service.pause(isolate.id!);
125130

126-
await service.dispose();
127-
await otherService.dispose();
131+
expectedUserTags.clear();
132+
await service
133+
.streamCpuSamplesWithUserTag(expectedUserTags.toList());
134+
subscription = listenForSamples();
135+
await service.resume(isolate.id!);
136+
await subscription;
137+
});
138+
});
128139
},
129140
timeout: Timeout.none,
130141
);

pkg/dds/test/get_cached_cpu_samples_script.dart

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,18 @@
44

55
import 'dart:developer';
66

7+
// VM processes collected samples using two different mechanisms: by
8+
// scheduling VM interrupts and via `SampleBlockProcessor` which periodically
9+
// wakes up and checks if any thread has unprocessed blocks. To test both
10+
// mechanisms we introduce a way to run this script and suppress interrupts
11+
// within `fib` function - this way completed blocks will be processed by
12+
// `SampleBlockProcessor`.
13+
const noInterrupts =
14+
bool.fromEnvironment('disable.interrupts.to.test.sample.block.processor')
15+
? pragma('vm:unsafe:no-interrupts')
16+
: Object();
17+
18+
@noInterrupts
719
fib(int n) {
820
if (n <= 1) {
921
return n;
@@ -14,10 +26,9 @@ fib(int n) {
1426
void main() {
1527
final tag = UserTag('Testing')..makeCurrent();
1628
final tag2 = UserTag('Baz');
17-
int i = 5;
29+
int i = 35;
1830
while (true) {
1931
tag.makeCurrent();
20-
++i;
2132
fib(i);
2233
tag2.makeCurrent();
2334
fib(i);

0 commit comments

Comments
 (0)