Skip to content

Commit 049ca41

Browse files
derekxu16Commit Queue
authored andcommitted
[DDS] Start caching events sent on the 'Timer' stream
TEST=pkg/dds/test/timer_event_history_test Change-Id: Id1b867e5f582aca9d57c6d52978880bbbd889b27 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/411160 Reviewed-by: Ben Konyi <[email protected]> Commit-Queue: Derek Xu <[email protected]>
1 parent 748a2a2 commit 049ca41

File tree

6 files changed

+109
-3
lines changed

6 files changed

+109
-3
lines changed

pkg/dds/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
- Widen the dependency on `package:shelf_web_socket`.
44
- Require Dart SDK v. 3.5.0 or higher.
5+
- Started caching events sent on the 'Timer' stream. The cached events can be retrieved using the `getStreamHistory` RPC.
56

67
# 5.0.0
78
- [DAP] The debug adapter no longer spawns its own in-process copy of DDS, instead relying on one started by the Dart VM (or `Flutter`). This means the `enableDds` and `enableAuthCodes` arguments to the `DartDebugAdapter` base class have been deprecated and have any effect. Suppressing DDS (or auth codes) should be done in launch configuration (for example using `vmAdditionalArgs` or `toolArgs` depending on the target tool).

pkg/dds/lib/src/stream_manager.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,7 @@ class StreamManager {
427427
static const kEchoStream = '_Echo';
428428
static const kDebugStream = 'Debug';
429429
static const kExtensionStream = 'Extension';
430+
static const kTimerStream = 'Timer';
430431
static const kHeapSnapshotStream = 'HeapSnapshot';
431432
static const kIsolateStream = 'Isolate';
432433
static const kGCStream = 'GC';
@@ -463,10 +464,11 @@ class StreamManager {
463464
kIsolateStream,
464465
};
465466

466-
// Never cancel the logging and extension event streams as `LoggingRepository`
467-
// requires them keep history.
467+
// Never cancel the logging, timer, and extension event streams as
468+
// `LoggingRepository` requires them keep history.
468469
static const loggingRepositoryStreams = <String>{
469470
kExtensionStream,
471+
kTimerStream,
470472
kLoggingStream,
471473
kStderrStream,
472474
kStdoutStream,

pkg/dds/test/extension_event_history_test.dart

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import 'dart:async';
66
import 'dart:developer';
77

8+
import 'package:dds_service_extensions/dds_service_extensions.dart';
89
import 'package:test/test.dart';
910
import 'package:vm_service/vm_service.dart';
1011

@@ -22,7 +23,6 @@ Future testMain() async {
2223

2324
final tests = <IsolateTest>[
2425
hasPausedAtStart,
25-
resumeIsolate,
2626
(VmService service, IsolateRef isolateRef) async {
2727
final completer = Completer<void>();
2828
int i = 1;
@@ -39,8 +39,31 @@ final tests = <IsolateTest>[
3939
}
4040
});
4141
await service.streamListen(EventStreams.kExtension);
42+
43+
resumeIsolate(service, isolateRef);
4244
await completer.future;
4345
},
46+
(VmService service, _) async {
47+
// Confirm that all events in the history buffer get sent on a stream
48+
// returned by [service.onExtensionEventWithHistory].
49+
final completer = Completer<void>();
50+
int i = 1;
51+
late final StreamSubscription subscription;
52+
subscription = service.onExtensionEventWithHistory.listen((event) async {
53+
expect(event.extensionKind, 'Test');
54+
expect(event.extensionData!.data['id'], i);
55+
i++;
56+
57+
if (i == 10) {
58+
await subscription.cancel();
59+
completer.complete();
60+
} else if (i > 10) {
61+
fail('Too many "Test" extension events');
62+
}
63+
});
64+
await service.streamListen(EventStreams.kExtension);
65+
await completer.future;
66+
}
4467
];
4568

4669
void main([args = const <String>[]]) => runIsolateTests(
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:io' show sleep;
7+
8+
import 'package:dds_service_extensions/dds_service_extensions.dart';
9+
import 'package:test/test.dart';
10+
import 'package:vm_service/vm_service.dart';
11+
12+
import 'common/service_test_common.dart';
13+
import 'common/test_helper.dart';
14+
15+
Future<void> testeeMain() async {
16+
final completer = Completer<void>();
17+
late final Timer t;
18+
t = Timer(
19+
const Duration(milliseconds: 100),
20+
() {
21+
t.cancel();
22+
completer.complete();
23+
},
24+
);
25+
26+
// Sleep for 201 ms to force [t] to fire at least 100 ms late. This allows us
27+
// to expect to receive at least one 'TimerSignificantlyOverdue' event in
28+
// [tests] below, because a 'TimerSignificantlyOverdue' event should be fired
29+
// whenever a timer is identified to be at least 100 ms overdue.
30+
sleep(const Duration(milliseconds: 201));
31+
await completer.future;
32+
}
33+
34+
final tests = <IsolateTest>[
35+
hasPausedAtStart,
36+
(VmService service, IsolateRef isolateRef) async {
37+
final completer = Completer<void>();
38+
service.onTimerEvent.listen((event) async {
39+
expect(event.kind, 'TimerSignificantlyOverdue');
40+
41+
await service.streamCancel(EventStreams.kTimer);
42+
completer.complete();
43+
});
44+
await service.streamListen(EventStreams.kTimer);
45+
46+
resumeIsolate(service, isolateRef);
47+
await completer.future;
48+
},
49+
(VmService service, _) async {
50+
// Confirm that all events in the history buffer get sent on a stream
51+
// returned by [service.onTimerEventWithHistory].
52+
final completer = Completer<void>();
53+
late final StreamSubscription subscription;
54+
subscription = service.onTimerEventWithHistory.listen((event) async {
55+
expect(event.kind, 'TimerSignificantlyOverdue');
56+
57+
await subscription.cancel();
58+
completer.complete();
59+
});
60+
await service.streamListen(EventStreams.kTimer);
61+
await completer.future;
62+
}
63+
];
64+
65+
void main([args = const <String>[]]) => runIsolateTests(
66+
args,
67+
tests,
68+
'timer_event_history_test.dart',
69+
testeeConcurrent: testeeMain,
70+
pauseOnStart: true,
71+
pauseOnExit: true,
72+
);

pkg/dds_service_extensions/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# 2.0.2-wip
22
- Require dart sdk v. 3.5.0 or higher.
3+
- Add `DdsExtension.onTimerEventWithHistory`.
34

45
# 2.0.1
56
- Update `vm_service` to `>=14.0.0 <16.0.0`.

pkg/dds_service_extensions/lib/dds_service_extensions.dart

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@ extension DdsExtension on VmService {
202202
Stream<Event> get onExtensionEventWithHistory =>
203203
onEventWithHistory('Extension');
204204

205+
/// Returns a new [Stream<Event>] of events sent on the `Timer` stream which
206+
/// outputs historical events before streaming real-time events.
207+
///
208+
/// Note: unlike [onTimerEvent], the returned stream is a single subscription
209+
/// stream and a new stream is created for each invocation of this getter.
210+
Stream<Event> get onTimerEventWithHistory => onEventWithHistory('Timer');
211+
205212
/// The [getClientName] RPC is used to retrieve the name associated with the
206213
/// currently connected VM service client.
207214
///

0 commit comments

Comments
 (0)