Skip to content

Commit cd1019c

Browse files
authored
feat(native): Jinja - async render (#7309)
1 parent 89d841a commit cd1019c

File tree

11 files changed

+237
-35
lines changed

11 files changed

+237
-35
lines changed

packages/cubejs-backend-native/Cargo.lock

Lines changed: 52 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/cubejs-backend-native/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ crate-type = ["cdylib"]
1515
[dependencies]
1616
cubesql = { path = "../../rust/cubesql/cubesql" }
1717
cubeclient = { path = "../../rust/cubesql/cubeclient" }
18-
tokio = { version = "1.0", features = ["full", "rt"] }
18+
tokio = { version = "1", features = ["full", "rt"] }
19+
async-channel = { version = "2" }
1920
async-trait = "0.1.36"
2021
serde_derive = "1.0.115"
2122
serde = "1.0.115"

packages/cubejs-backend-native/js/index.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -376,9 +376,15 @@ export type PythonCtx = {
376376
variables: Record<string, any>
377377
};
378378

379+
export type JinjaEngineOptions = {
380+
debugInfo?: boolean,
381+
filters: Record<string, Function>,
382+
workers: number
383+
};
384+
379385
export interface JinjaEngine {
380386
loadTemplate(templateName: string, templateContent: string): void;
381-
renderTemplate(templateName: string, context: unknown, pythonContext: Record<string, any> | null): string;
387+
renderTemplate(templateName: string, context: unknown, pythonContext: Record<string, any> | null): Promise<string>;
382388
}
383389

384390
export class NativeInstance {
@@ -394,7 +400,7 @@ export class NativeInstance {
394400
return this.native;
395401
}
396402

397-
public newJinjaEngine(options: { debugInfo?: boolean, filters: Record<string, Function> }): JinjaEngine {
403+
public newJinjaEngine(options: JinjaEngineOptions): JinjaEngine {
398404
return this.getNative().newJinjaEngine(options);
399405
}
400406

packages/cubejs-backend-native/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#![feature(async_closure)]
2+
#![feature(thread_id_value)]
23

34
extern crate findshlibs;
45

packages/cubejs-backend-native/src/template/entry.rs

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
use crate::cross::*;
22
use crate::template::mj_value::*;
3-
use crate::template::neon::NeonMiniJinjaContext;
3+
use crate::template::neon::*;
44
use crate::utils::bind_method;
55

66
use log::trace;
77
use minijinja as mj;
88
use neon::prelude::*;
99
use std::cell::RefCell;
1010

11+
use crate::template::workers::{JinjaEngineWorkerJob, JinjaEngineWorkerPool};
1112
#[cfg(feature = "python")]
1213
use pyo3::{exceptions::PyNotImplementedError, prelude::*, types::PyTuple, AsPyPointer};
1314

1415
struct JinjaEngine {
1516
inner: mj::Environment<'static>,
17+
workers_count: usize,
18+
workers: Option<JinjaEngineWorkerPool>,
1619
}
1720

1821
impl Finalize for JinjaEngine {}
@@ -123,14 +126,50 @@ impl JinjaEngine {
123126
}
124127
}
125128

126-
Ok(Self { inner: engine })
129+
let workers_count = {
130+
let workers_count_float = options
131+
.get_value(cx, "workers")?
132+
.downcast_or_throw::<JsNumber, _>(cx)?
133+
.value(cx);
134+
135+
if workers_count_float < 1_f64 {
136+
return cx.throw_error("Option workers must be a positive integer");
137+
}
138+
139+
match workers_count_float.to_string().parse::<usize>() {
140+
Ok(v) => v,
141+
Err(err) => {
142+
return cx.throw_error(format!("Option workers must be a positive: {}", err))
143+
}
144+
}
145+
};
146+
147+
Ok(Self {
148+
inner: engine,
149+
workers_count,
150+
workers: None,
151+
})
127152
}
128153
}
129154

130155
type BoxedJinjaEngine = JsBox<RefCell<JinjaEngine>>;
131156

132157
impl JinjaEngine {
133-
fn render_template(mut cx: FunctionContext) -> JsResult<JsString> {
158+
fn build_if_needed(&mut self, cx: &mut FunctionContext) -> &JinjaEngineWorkerPool {
159+
if let Some(ref workers) = self.workers {
160+
return workers;
161+
}
162+
163+
self.workers = Some(JinjaEngineWorkerPool::new(
164+
self.workers_count,
165+
cx.channel(),
166+
self.inner.clone(),
167+
));
168+
169+
self.workers.as_ref().unwrap()
170+
}
171+
172+
fn render_template(mut cx: FunctionContext) -> JsResult<JsPromise> {
134173
#[cfg(build = "debug")]
135174
trace!("JinjaEngine.render_template");
136175

@@ -142,16 +181,6 @@ impl JinjaEngine {
142181
let template_compile_context = CLRepr::from_js_ref(cx.argument::<JsValue>(1)?, &mut cx)?;
143182
let template_python_context = CLRepr::from_js_ref(cx.argument::<JsValue>(2)?, &mut cx)?;
144183

145-
let engine = &this.borrow().inner;
146-
let template = match engine.get_template(&template_name.value(&mut cx)) {
147-
Ok(t) => t,
148-
Err(err) => {
149-
trace!("jinja get template error: {:?}", err);
150-
151-
return cx.throw_from_mj_error(err);
152-
}
153-
};
154-
155184
let mut to_jinja_ctx = CLReprObject::new();
156185
to_jinja_ctx.insert("COMPILE_CONTEXT".to_string(), template_compile_context);
157186

@@ -163,15 +192,20 @@ impl JinjaEngine {
163192
}
164193
}
165194

166-
let compile_context = to_minijinja_value(CLRepr::Object(to_jinja_ctx));
167-
match template.render(compile_context) {
168-
Ok(r) => Ok(cx.string(r)),
169-
Err(err) => {
170-
trace!("jinja render template error: {:?}", err);
195+
let (deferred, promise) = cx.promise();
171196

172-
cx.throw_from_mj_error(err)
173-
}
174-
}
197+
let mut this = this.borrow_mut();
198+
let pool = this.build_if_needed(&mut cx);
199+
200+
if let Err(err) = pool.render(JinjaEngineWorkerJob {
201+
template_name: template_name.value(&mut cx),
202+
ctx: to_minijinja_value(CLRepr::Object(to_jinja_ctx)),
203+
deferred,
204+
}) {
205+
return cx.throw_error(format!("Unable to render jinja template: {}", err));
206+
};
207+
208+
Ok(promise)
175209
}
176210

177211
fn load_template(mut cx: FunctionContext) -> JsResult<JsUndefined> {
@@ -185,13 +219,18 @@ impl JinjaEngine {
185219
let template_name = cx.argument::<JsString>(0)?;
186220
let template_content = cx.argument::<JsString>(1)?;
187221

188-
if let Err(err) = this.borrow_mut().inner.add_template_owned(
222+
let mut borrowed = this.borrow_mut();
223+
if let Err(err) = borrowed.inner.add_template_owned(
189224
template_name.value(&mut cx),
190225
template_content.value(&mut cx),
191226
) {
192227
trace!("jinja load error: {:?}", err);
193-
194228
return cx.throw_from_mj_error(err);
229+
};
230+
231+
if borrowed.workers.is_some() {
232+
trace!("Restart jinja workers");
233+
borrowed.workers = None;
195234
}
196235

197236
Ok(cx.undefined())
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod entry;
22
mod mj_value;
33
mod neon;
4+
mod workers;
45

56
pub use entry::template_register_module;
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
use crate::template::neon::*;
2+
use cubesql::CubeError;
3+
4+
use log::trace;
5+
use minijinja as mj;
6+
use neon::prelude::*;
7+
use neon::types::Deferred;
8+
9+
pub struct JinjaEngineWorkerJob {
10+
pub(crate) template_name: String,
11+
pub(crate) ctx: minijinja::value::Value,
12+
pub(crate) deferred: Deferred,
13+
}
14+
15+
struct JinjaEngineWorker {
16+
_thread: std::thread::JoinHandle<()>,
17+
}
18+
19+
impl JinjaEngineWorker {
20+
fn new(
21+
id: usize,
22+
env: mj::Environment<'static>,
23+
js_channel: neon::event::Channel,
24+
receiver: async_channel::Receiver<JinjaEngineWorkerJob>,
25+
) -> Self {
26+
let thread = std::thread::spawn(move || loop {
27+
if let Ok(job) = receiver.recv_blocking() {
28+
let template = match env.get_template(&job.template_name) {
29+
Ok(t) => t,
30+
Err(err) => {
31+
job.deferred.settle_with(
32+
&js_channel,
33+
move |mut cx| -> NeonResult<Handle<JsString>> {
34+
cx.throw_from_mj_error(err)
35+
},
36+
);
37+
38+
continue;
39+
}
40+
};
41+
42+
let result = template.render(job.ctx);
43+
job.deferred.settle_with(
44+
&js_channel,
45+
move |mut cx| -> NeonResult<Handle<JsString>> {
46+
match result {
47+
Ok(r) => Ok(cx.string(r)),
48+
Err(err) => cx.throw_from_mj_error(err),
49+
}
50+
},
51+
);
52+
} else {
53+
trace!(
54+
"Closing jinja thread, id: {}, threadId: {}",
55+
id,
56+
std::thread::current().id().as_u64()
57+
);
58+
59+
return;
60+
}
61+
});
62+
63+
Self { _thread: thread }
64+
}
65+
}
66+
67+
pub struct JinjaEngineWorkerPool {
68+
workers_rx: async_channel::Sender<JinjaEngineWorkerJob>,
69+
_workers: Vec<JinjaEngineWorker>,
70+
}
71+
72+
impl JinjaEngineWorkerPool {
73+
pub fn new(
74+
workers_count: usize,
75+
js_channel: Channel,
76+
jinja_engine: minijinja::Environment<'static>,
77+
) -> Self {
78+
let (workers_rx, receiver) = async_channel::bounded::<JinjaEngineWorkerJob>(1_000);
79+
80+
let mut workers = vec![];
81+
82+
for id in 0..workers_count {
83+
workers.push(JinjaEngineWorker::new(
84+
id,
85+
jinja_engine.clone(),
86+
js_channel.clone(),
87+
receiver.clone(),
88+
));
89+
}
90+
91+
Self {
92+
_workers: workers,
93+
workers_rx,
94+
}
95+
}
96+
97+
pub fn render(&self, job: JinjaEngineWorkerJob) -> Result<(), CubeError> {
98+
self.workers_rx
99+
.send_blocking(job)
100+
.map_err(|err| CubeError::internal(format!("Unable to schedule rendering: {}", err)))
101+
}
102+
}

0 commit comments

Comments
 (0)