Skip to content

Commit a18a2b9

Browse files
authored
feat(hermes): Add thread pool for wasm modules parallel execution (#488)
1 parent 8e466c1 commit a18a2b9

File tree

29 files changed

+816
-94
lines changed

29 files changed

+816
-94
lines changed

.config/dictionaries/project.dic

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
aarch
22
aido
33
asat
4+
asyncio
5+
auditability
6+
autocheckpoint
47
backpressure
58
bkioshn
69
blockfetch
@@ -74,6 +77,11 @@ mkdelay
7477
mkdirat
7578
nanos
7679
nextest
80+
nolfs
81+
nomutex
82+
nostack
83+
notadb
84+
nsec
7785
oneshot
7886
openat
7987
outlen
@@ -122,6 +130,7 @@ testdocs
122130
testunit
123131
thiserror
124132
thollander
133+
threadsafe
125134
timelike
126135
tinygo
127136
toolsets

hermes/Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hermes/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22
resolver = "2"
33
members = [
44
"bin",
5-
"bin/tests/integration/components/http_request_rte_01"
5+
"bin/tests/integration/components/http_request_rte_01",
6+
"bin/tests/integration/components/sleep_component",
67
]
78
default-members = [
89
"bin",

hermes/bin/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ chrono = { version = "0.4.41", features = ["now"] }
6666
chrono-tz = "0.10.4"
6767
saffron = "0.1.0"
6868
tokio = { version = "1.47.1", features = ["macros", "sync", "rt-multi-thread", "rt", "net"] }
69+
rayon = "1.11.0"
6970
libsqlite3-sys = {version="0.35.0", features = ["bundled"] }
7071
stringzilla = "3.12.6"
7172
temp-dir = "0.1.16"

hermes/bin/src/app.rs

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use std::{collections::HashMap, sync::Arc};
44

55
use crate::{
66
event::HermesEventPayload,
7+
pool,
78
runtime_context::HermesRuntimeContext,
89
runtime_extensions::{
910
init::trait_event::{RteEvent, RteInitEvent},
@@ -41,7 +42,7 @@ pub(crate) struct Application {
4142
name: ApplicationName,
4243

4344
/// WASM modules
44-
indexed_modules: HashMap<ModuleId, Module>,
45+
indexed_modules: HashMap<ModuleId, Arc<Module>>,
4546

4647
/// Application's `Vfs` instance
4748
vfs: Arc<Vfs>,
@@ -57,7 +58,7 @@ impl Application {
5758
) -> Self {
5859
let indexed_modules = modules
5960
.into_iter()
60-
.map(|module| (module.id().clone(), module))
61+
.map(|module| (module.id().clone(), Arc::new(module)))
6162
.collect();
6263
Self {
6364
name: app_name,
@@ -79,66 +80,78 @@ impl Application {
7980
/// Dispatch event for all available modules.
8081
pub(crate) fn dispatch_event(
8182
&self,
82-
event: &dyn HermesEventPayload,
83-
) -> anyhow::Result<()> {
83+
event: &Arc<dyn HermesEventPayload>,
84+
) {
8485
for module in self.indexed_modules.values() {
8586
module_dispatch_event(
86-
module,
87+
module.clone(),
8788
self.name.clone(),
8889
module.id().clone(),
8990
self.vfs.clone(),
90-
event,
91-
)?;
91+
event.clone(),
92+
);
9293
}
93-
Ok(())
9494
}
9595

9696
/// Dispatch event for the target module by the `module_id`.
9797
pub(crate) fn dispatch_event_for_target_module(
9898
&self,
9999
module_id: ModuleId,
100-
event: &dyn HermesEventPayload,
100+
event: Arc<dyn HermesEventPayload>,
101101
) -> anyhow::Result<()> {
102102
let module = self
103103
.indexed_modules
104104
.get(&module_id)
105105
.ok_or(anyhow::anyhow!("Module {module_id} not found"))?;
106106
module_dispatch_event(
107-
module,
107+
module.clone(),
108108
self.name.clone(),
109109
module_id,
110110
self.vfs.clone(),
111111
event,
112-
)
112+
);
113+
Ok(())
113114
}
114115
}
115116

116117
/// Dispatch event
117118
pub(crate) fn module_dispatch_event(
118-
module: &Module,
119+
module: Arc<Module>,
119120
app_name: ApplicationName,
120121
module_id: ModuleId,
121122
vfs: Arc<Vfs>,
122-
event: &dyn HermesEventPayload,
123-
) -> anyhow::Result<()> {
124-
let runtime_ctx = HermesRuntimeContext::new(
125-
app_name,
126-
module_id,
127-
event.event_name().to_string(),
128-
module.exec_counter(),
129-
vfs,
130-
);
131-
132-
// Advise Runtime Extensions of a new context
133-
// TODO: Better handle errors.
134-
RteEvent::new().init(&runtime_ctx)?;
135-
// TODO: (SJ) Remove when all RTE's are migrated.
136-
new_context(&runtime_ctx);
137-
138-
module.execute_event(event, runtime_ctx.clone())?;
139-
140-
// Advise Runtime Extensions that context can be cleaned up.
141-
RteEvent::new().fini(&runtime_ctx)?;
142-
143-
Ok(())
123+
event: Arc<dyn HermesEventPayload>,
124+
) {
125+
// TODO(@aido-mth): fix how init is processed. https://github.com/input-output-hk/hermes/issues/490
126+
pool::execute(move || {
127+
let runtime_ctx = HermesRuntimeContext::new(
128+
app_name,
129+
module_id,
130+
event.event_name().to_string(),
131+
module.exec_counter(),
132+
vfs,
133+
);
134+
135+
// Advise Runtime Extensions of a new context
136+
// TODO: Better handle errors.
137+
if let Err(err) = RteEvent::new().init(&runtime_ctx) {
138+
tracing::error!("module event initialization failed: {err}");
139+
return;
140+
}
141+
142+
// TODO: (SJ) Remove when all RTE's are migrated.
143+
new_context(&runtime_ctx);
144+
145+
if let Err(err) = module.execute_event(event.as_ref(), runtime_ctx.clone()) {
146+
tracing::error!("module event execution failed: {err}");
147+
return;
148+
}
149+
150+
// Advise Runtime Extensions that context can be cleaned up.
151+
drop(
152+
RteEvent::new()
153+
.fini(&runtime_ctx)
154+
.inspect_err(|err| tracing::error!("module event finalization failed: {err}")),
155+
);
156+
});
144157
}

hermes/bin/src/cli/run.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use crate::{
1313
app::{build_app, ApplicationPackage},
1414
sign::certificate::{self, Certificate},
1515
},
16-
reactor,
16+
pool, reactor,
1717
};
1818

1919
/// Run cli command
@@ -56,6 +56,7 @@ impl Run {
5656
let app = build_app(&package, hermes_home_dir)?;
5757

5858
let exit_lock = reactor::init()?;
59+
pool::init()?;
5960
println!(
6061
"{} Loading application {}...",
6162
Emoji::new("🛠️", ""),
@@ -69,6 +70,8 @@ impl Run {
6970
exit_lock.wait()
7071
};
7172

73+
// Wait for scheduled tasks to be finished.
74+
pool::terminate();
7275
Ok(exit)
7376
}
7477
}

hermes/bin/src/event/mod.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
33
pub mod queue;
44

5+
use std::sync::Arc;
6+
57
use crate::{
68
app::ApplicationName,
79
wasm::module::{ModuleId, ModuleInstance},
@@ -47,7 +49,7 @@ pub(crate) enum TargetModule {
4749
/// Hermes event
4850
pub(crate) struct HermesEvent {
4951
/// The payload carried by the `HermesEvent`.
50-
payload: Box<dyn HermesEventPayload>,
52+
payload: Arc<dyn HermesEventPayload>,
5153

5254
/// Target app
5355
target_app: TargetApp,
@@ -64,15 +66,15 @@ impl HermesEvent {
6466
target_module: TargetModule,
6567
) -> Self {
6668
Self {
67-
payload: Box::new(payload),
69+
payload: Arc::new(payload),
6870
target_app,
6971
target_module,
7072
}
7173
}
7274

7375
/// Get event's payload
74-
pub(crate) fn payload(&self) -> &dyn HermesEventPayload {
75-
self.payload.as_ref()
76+
pub(crate) fn payload(&self) -> &Arc<dyn HermesEventPayload> {
77+
&self.payload
7678
}
7779

7880
/// Get event's target app

hermes/bin/src/event/queue.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -96,15 +96,14 @@ fn targeted_module_event_execution(
9696

9797
match event.target_module() {
9898
TargetModule::All => {
99-
if let Err(err) = app.dispatch_event(event.payload()) {
100-
tracing::error!("{err}");
101-
}
99+
app.dispatch_event(event.payload());
102100
},
103101
TargetModule::List(target_modules) => {
104102
for target_module_id in target_modules {
105-
if let Err(err) =
106-
app.dispatch_event_for_target_module(target_module_id.clone(), event.payload())
107-
{
103+
if let Err(err) = app.dispatch_event_for_target_module(
104+
target_module_id.clone(),
105+
event.payload().clone(),
106+
) {
108107
tracing::error!("{err}");
109108
}
110109
}

hermes/bin/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub mod hdf5;
1010
pub mod ipfs;
1111
pub mod logger;
1212
pub mod packaging;
13+
pub mod pool;
1314
pub mod reactor;
1415
pub mod runtime_context;
1516
pub mod runtime_extensions;

hermes/bin/src/main.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod hdf5;
88
mod ipfs;
99
mod logger;
1010
mod packaging;
11+
mod pool;
1112
mod reactor;
1213
mod runtime_context;
1314
mod runtime_extensions;

0 commit comments

Comments
 (0)