Skip to content

Commit 51ea425

Browse files
iinozemtsevCommit Queue
authored andcommitted
Add support for flushing microtasks in dart_engine.h
Also add a sample for calling Dart functions which return/accept futures. Change-Id: I932d0577709f4e068ccd3212797751ecb347c4f9 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/408281 Commit-Queue: Ivan Inozemtsev <[email protected]> Reviewed-by: Martin Kustermann <[email protected]>
1 parent a6f06e8 commit 51ea425

File tree

17 files changed

+478
-38
lines changed

17 files changed

+478
-38
lines changed

runtime/engine/dart_engine_impl.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,5 +72,9 @@ DART_EXPORT void DartEngine_HandleMessage(Dart_Isolate isolate) {
7272
Engine::instance()->HandleMessage(isolate);
7373
}
7474

75+
DART_EXPORT Dart_Handle DartEngine_DrainMicrotasksQueue() {
76+
return Engine::instance()->DrainMicrotasksQueue();
77+
}
78+
7579
} // namespace engine
7680
} // namespace dart

runtime/engine/engine.cc

Lines changed: 41 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
#include "engine/engine.h"
66
#include <memory>
7-
#include <utility>
87
#include "bin/dartutils.h"
98
#include "include/dart_api.h"
109
#include "include/dart_embedder_api.h"
@@ -16,6 +15,8 @@
1615
namespace dart {
1716
namespace engine {
1817

18+
constexpr char kRunPendingImmediateCallback[] = "_runPendingImmediateCallback";
19+
1920
using platform::MutexLocker;
2021

2122
Engine* Engine::instance() {
@@ -195,8 +196,8 @@ Dart_Isolate Engine::StartIsolate(DartEngine_SnapshotData snapshot,
195196
Dart_Handle core_libs_result =
196197
bin::DartUtils::PrepareForScriptLoading(false, false);
197198
if (Dart_IsError(core_libs_result)) {
198-
Dart_ShutdownIsolate();
199199
*error = Utils::StrDup(Dart_GetError(core_libs_result));
200+
Dart_ShutdownIsolate();
200201
return nullptr;
201202
}
202203

@@ -210,12 +211,27 @@ Dart_Isolate Engine::StartIsolate(DartEngine_SnapshotData snapshot,
210211
snapshot.kernel_buffer, snapshot.kernel_buffer_size);
211212

212213
if (Dart_IsError(library)) {
213-
Dart_ShutdownIsolate();
214214
*error = Utils::StrDup(Dart_GetError(library));
215+
Dart_ShutdownIsolate();
215216
return nullptr;
216217
}
217218
}
218219

220+
Dart_Handle isolate_library = Dart_LookupLibrary(
221+
Dart_NewStringFromCString(bin::DartUtils::kIsolateLibURL));
222+
if (Dart_IsError(isolate_library)) {
223+
*error = Utils::StrDup(Dart_GetError(isolate_library));
224+
Dart_ShutdownIsolate();
225+
return nullptr;
226+
}
227+
228+
std::shared_ptr<Engine::IsolateData> isolate_data = DataForIsolate(isolate);
229+
isolate_data->isolate_library = Dart_NewPersistentHandle(isolate_library);
230+
isolate_data->drain_microtasks_function_name = Dart_NewPersistentHandle(
231+
Dart_NewStringFromCString(kRunPendingImmediateCallback));
232+
isolate_data->scheduler.context = nullptr;
233+
isolate_data->scheduler.schedule_callback = nullptr;
234+
219235
Dart_ExitScope();
220236
Dart_ExitIsolate();
221237
is_running_ = true;
@@ -232,9 +248,12 @@ void Engine::Shutdown() {
232248

233249
is_running_ = false;
234250
for (auto isolate : isolates_) {
251+
std::shared_ptr<Engine::IsolateData> isolate_data = DataForIsolate(isolate);
235252
LockIsolate(isolate);
236253
Dart_EnterIsolate(isolate);
237254
Dart_SetMessageNotifyCallback(nullptr);
255+
Dart_DeletePersistentHandle(isolate_data->isolate_library);
256+
Dart_DeletePersistentHandle(isolate_data->drain_microtasks_function_name);
238257
Dart_ShutdownIsolate();
239258
UnlockIsolate(isolate);
240259
}
@@ -277,17 +296,30 @@ void Engine::HandleMessage(Dart_Isolate isolate) {
277296
UnlockIsolate(isolate);
278297
}
279298

280-
Mutex& Engine::MutexForIsolate(Dart_Isolate isolate) {
299+
Dart_Handle Engine::DrainMicrotasksQueue() {
300+
std::shared_ptr<Engine::IsolateData> isolate_data =
301+
DataForIsolate(Dart_CurrentIsolate());
302+
return Dart_Invoke(isolate_data->isolate_library,
303+
isolate_data->drain_microtasks_function_name, 0, nullptr);
304+
}
305+
306+
std::shared_ptr<Engine::IsolateData> Engine::DataForIsolate(
307+
Dart_Isolate isolate) {
281308
MutexLocker ml(&engine_state_);
282-
return mutexes_[isolate];
309+
auto it = isolate_data_.find(isolate);
310+
if (it == isolate_data_.end()) {
311+
it = isolate_data_.emplace(isolate, std::make_shared<Engine::IsolateData>())
312+
.first;
313+
}
314+
return it->second;
283315
}
284316

285317
void Engine::LockIsolate(Dart_Isolate isolate) {
286-
MutexForIsolate(isolate).Lock();
318+
DataForIsolate(isolate)->mutex.Lock();
287319
}
288320

289321
void Engine::UnlockIsolate(Dart_Isolate isolate) {
290-
MutexForIsolate(isolate).Unlock();
322+
DataForIsolate(isolate)->mutex.Unlock();
291323
}
292324

293325
void Engine::NotifyMessage(Dart_Isolate isolate) {
@@ -313,11 +345,7 @@ void Engine::NotifyMessage(Dart_Isolate isolate) {
313345
return;
314346
}
315347

316-
DartEngine_MessageScheduler scheduler;
317-
{
318-
MutexLocker ml(&engine_state_);
319-
scheduler = schedulers_[isolate];
320-
}
348+
DartEngine_MessageScheduler scheduler = DataForIsolate(isolate)->scheduler;
321349

322350
if (scheduler.schedule_callback == nullptr) {
323351
scheduler = default_scheduler_;
@@ -341,8 +369,7 @@ void Engine::SetDefaultMessageScheduler(DartEngine_MessageScheduler scheduler) {
341369

342370
void Engine::SetMessageScheduler(DartEngine_MessageScheduler scheduler,
343371
Dart_Isolate isolate) {
344-
MutexLocker ml(&engine_state_);
345-
schedulers_[isolate] = scheduler;
372+
DataForIsolate(isolate)->scheduler = scheduler;
346373
}
347374

348375
} // namespace engine

runtime/engine/engine.h

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
#define RUNTIME_ENGINE_ENGINE_H_
77

88
#include <memory>
9-
#include <queue>
109
#include <unordered_map>
1110
#include <vector>
1211
#include "include/dart_engine.h"
@@ -54,6 +53,9 @@ class Engine {
5453
// Calls Dart_HandleMessage, managing an isolate lock and Dart scope.
5554
void HandleMessage(Dart_Isolate isolate);
5655

56+
// Drains the microtasks queue, requires an active isolate.
57+
Dart_Handle DrainMicrotasksQueue();
58+
5759
// Sets a callback to be called when Dart_HandleMessage returns an error.
5860
void SetHandleMessageErrorCallback(
5961
DartEngine_HandleMessageErrorCallback callback);
@@ -85,6 +87,14 @@ class Engine {
8587
static void HandleMessageCallback(Dart_Isolate isolate);
8688

8789
private:
90+
// Engine's internal data for isolate.
91+
struct IsolateData {
92+
DartEngine_MessageScheduler scheduler;
93+
Mutex mutex;
94+
Dart_PersistentHandle isolate_library;
95+
Dart_PersistentHandle drain_microtasks_function_name;
96+
};
97+
8898
// Set to false once shutdown starts.
8999
bool is_running_ = false;
90100

@@ -114,19 +124,17 @@ class Engine {
114124
// All isolates, started via Engine::StartIsolate.
115125
std::vector<Dart_Isolate> isolates_;
116126

117-
// Stores per-isolate mutexes, used by Engine::LockIsolate/UnlockIsolate.
118-
std::unordered_map<Dart_Isolate, Mutex> mutexes_;
127+
// Stores per-isolate engine state.
128+
std::unordered_map<Dart_Isolate, std::shared_ptr<IsolateData>> isolate_data_;
119129

120130
// Default scheduler.
121131
DartEngine_MessageScheduler default_scheduler_;
122-
// Per-isolate message schedulers.
123-
std::unordered_map<Dart_Isolate, DartEngine_MessageScheduler> schedulers_;
124132

125133
// Callback to notify about Dart_HandleMessage errors.
126134
DartEngine_HandleMessageErrorCallback handle_message_error_callback_;
127135

128-
// Helper function to get an element from mutexes_.
129-
Mutex& MutexForIsolate(Dart_Isolate isolate);
136+
// Helper function to get an element from isolate_data_.
137+
std::shared_ptr<IsolateData> DataForIsolate(Dart_Isolate isolate);
130138
};
131139

132140
} // namespace engine

runtime/engine/include/dart_engine.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,17 @@ typedef void (*DartEngine_HandleMessageErrorCallback)(
109109
DART_EXPORT void DartEngine_SetHandleMessageErrorCallback(
110110
DartEngine_HandleMessageErrorCallback handle_message_error_callback);
111111

112+
/**
113+
* Drains the microtasks queue. Requires to be an active isolate.
114+
*
115+
* Normally the microtasks queue is drained after handling each
116+
* isolate message, but when the engine calls into Dart, it might be
117+
* required to manually drain the microtasks queue.
118+
*
119+
* \return Dart_Handle invocation result.
120+
*/
121+
DART_EXPORT Dart_Handle DartEngine_DrainMicrotasksQueue();
122+
112123
/**
113124
* Handles a single message for an isolate.
114125
*/
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
3+
<plist version="1.0">
4+
<dict>
5+
<key>com.apple.security.cs.allow-jit</key>
6+
<true/>
7+
<key>com.apple.security.cs.disable-library-validation</key>
8+
<true/>
9+
</dict>
10+
</plist>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!DOCTYPE plist PUBLIC "-//Apple Computer//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
3+
<plist version="1.0">
4+
<dict>
5+
<key>com.apple.security.cs.allow-jit</key>
6+
<true/>
7+
<key>com.apple.security.cs.disable-library-validation</key>
8+
<true/>
9+
</dict>
10+
</plist>

samples/embedder/BUILD.gn

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ group("all") {
1515

1616
group("aot") {
1717
deps = [
18+
":run_futures_aot",
1819
":run_main_aot",
1920
":run_timer_aot",
2021
":run_timer_async_aot",
@@ -24,6 +25,7 @@ group("aot") {
2425

2526
group("kernel") {
2627
deps = [
28+
":run_futures_kernel",
2729
":run_main_kernel",
2830
":run_timer_async_kernel",
2931
":run_timer_kernel",
@@ -150,3 +152,12 @@ sample("run_timer_async") {
150152
snapshots("timer") {
151153
main_dart = "timer.dart"
152154
}
155+
156+
snapshots("futures") {
157+
main_dart = "futures.dart"
158+
}
159+
160+
sample("run_futures") {
161+
sources = [ "run_futures.cc" ]
162+
snapshots = [ ":futures" ]
163+
}

samples/embedder/futures.dart

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
import 'dart:ffi';
7+
8+
void main() async {
9+
print(
10+
'returnRegularFuture returns: '
11+
'${await returnRegularFuture(5)}',
12+
);
13+
print(
14+
'sumIntStream(useAsyncStar = false) returns: '
15+
'${await sumIntStream(5, 1, false)}',
16+
);
17+
print(
18+
'sumIntStream(useAsyncStar = true) returns: '
19+
'${await sumIntStream(5, 1, true)}',
20+
);
21+
}
22+
23+
Future<int> returnRegularFuture(int delayMs) async {
24+
await Future.delayed(Duration(milliseconds: 5));
25+
return 256;
26+
}
27+
28+
Future<int> returnMicrotaskFuture(int delayMs) => Future.microtask(() async {
29+
await Future.delayed(Duration(milliseconds: delayMs));
30+
return 256;
31+
});
32+
33+
Stream<int> produceIntStreamWithAsyncStar(int count, int delayMs) async* {
34+
final delay = Duration(milliseconds: delayMs);
35+
36+
for (var i = 0; i < count; i++) {
37+
await Future.delayed(delay);
38+
yield i;
39+
}
40+
}
41+
42+
Stream<int> produceIntStreamWithController(int count, int delayMs) {
43+
final delay = Duration(milliseconds: delayMs);
44+
45+
final sc = StreamController<int>();
46+
(() async {
47+
for (var i = 0; i < count; i++) {
48+
await Future.delayed(delay);
49+
sc.add(i);
50+
}
51+
sc.close();
52+
})();
53+
return sc.stream;
54+
}
55+
56+
Future<int> sumIntStream(int count, int delayMs, bool useAsyncStar) async {
57+
final stream =
58+
useAsyncStar
59+
? produceIntStreamWithAsyncStar(count, delayMs)
60+
: produceIntStreamWithController(count, delayMs);
61+
var sum = 0;
62+
await for (var value in stream) {
63+
sum += value;
64+
}
65+
return sum;
66+
}
67+
68+
Future<int> awaitAndMultiply(Future<int> a, Future<int> b) async =>
69+
(await a) * (await b);
70+
71+
class AwaitAndMultiplyCall {
72+
final _a = Completer<int>();
73+
final _b = Completer<int>();
74+
75+
Future<int> getA() => _a.future;
76+
Future<int> getB() => _b.future;
77+
78+
@pragma('vm:entry-point', 'call')
79+
void setA(int value) => _a.complete(value);
80+
@pragma('vm:entry-point', 'call')
81+
void setB(int value) => _b.complete(value);
82+
}
83+
84+
@pragma('vm:entry-point', 'call')
85+
AwaitAndMultiplyCall awaitAndMultiplyC(int callbackPtr, int contextPtr) {
86+
final call = AwaitAndMultiplyCall();
87+
() async {
88+
Pointer<NativeFunction<Void Function(Pointer<Opaque>, Int64)>>.fromAddress(
89+
callbackPtr,
90+
).asFunction<void Function(Pointer<Opaque>, int)>()(
91+
Pointer<Opaque>.fromAddress(contextPtr),
92+
await awaitAndMultiply(call.getA(), call.getB()),
93+
);
94+
}();
95+
return call;
96+
}
97+
98+
@pragma('vm:entry-point', 'call')
99+
// C-friendly wrapper over [returnRegularFuture].
100+
void returnRegularFutureC(
101+
int delayMs,
102+
bool useMicrotask,
103+
int callbackPtr,
104+
int contextPtr,
105+
) async =>
106+
Pointer<NativeFunction<Void Function(Pointer<Opaque>, Int64)>>.fromAddress(
107+
callbackPtr,
108+
).asFunction<void Function(Pointer<Opaque>, int)>()(
109+
Pointer<Opaque>.fromAddress(contextPtr),
110+
await (useMicrotask
111+
? returnMicrotaskFuture(delayMs)
112+
: returnRegularFuture(delayMs)),
113+
);
114+
115+
@pragma('vm:entry-point', 'call')
116+
// C-friendly wrapper over [sumIntStream].
117+
void sumIntStreamC(
118+
int count,
119+
int delayMs,
120+
bool useAsyncStar,
121+
int callbackPtr,
122+
int contextPtr,
123+
) async =>
124+
Pointer<NativeFunction<Void Function(Pointer<Opaque>, Int64)>>.fromAddress(
125+
callbackPtr,
126+
).asFunction<void Function(Pointer<Opaque>, int)>()(
127+
Pointer<Opaque>.fromAddress(contextPtr),
128+
await sumIntStream(count, delayMs, useAsyncStar),
129+
);

0 commit comments

Comments
 (0)