Skip to content

Commit d10816a

Browse files
authored
Backporting Memory Optimizations to 1.21.rc (#883)
* fix: Moving Arc<openrpc_state> Arc<exclusory> , Arc<ProviderRegistrations>. * fix: Moving Arc<Manifests> Arc<rule_engine>. * fix: Improving rule parse and error. * fix: Avoid cloning Methods in rpc_router. * feat: Adding memory logging feature.
1 parent 838e5cc commit d10816a

File tree

14 files changed

+184
-74
lines changed

14 files changed

+184
-74
lines changed

core/main/src/bootstrap/boot.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use ripple_sdk::{
2121
RippleResponse,
2222
},
2323
log::error,
24+
utils::test_utils::log_memory_usage,
2425
};
2526

2627
use crate::state::bootstrap_state::BootstrapState;
@@ -61,18 +62,31 @@ use super::{
6162
6263
///
6364
pub async fn boot(state: BootstrapState) -> RippleResponse {
65+
log_memory_usage("boot-Begining");
6466
let bootstrap = Bootstrap::new(state);
67+
log_memory_usage("After-LoggingBootstrapStep");
6568
execute_step(StartCommunicationBroker, &bootstrap).await?;
69+
log_memory_usage("After-StartCommunicationBroker");
6670
execute_step(SetupExtnClientStep, &bootstrap).await?;
71+
log_memory_usage("After-SetupExtnClientStep");
6772
execute_step(LoadExtensionMetadataStep, &bootstrap).await?;
73+
log_memory_usage("After-LoadExtensionMetadataStep");
6874
execute_step(LoadExtensionsStep, &bootstrap).await?;
75+
log_memory_usage("After-LoadExtensionsStep");
6976
execute_step(StartExtnChannelsStep, &bootstrap).await?;
77+
log_memory_usage("After-StartExtnChannelsStep");
7078
execute_step(StartAppManagerStep, &bootstrap).await?;
79+
log_memory_usage("After-StartAppManagerStep");
7180
execute_step(StartOtherBrokers, &bootstrap).await?;
81+
log_memory_usage("After-StartOtherBrokers");
7282
execute_step(LoadDistributorValuesStep, &bootstrap).await?;
83+
log_memory_usage("After-LoadDistributorValuesStep");
7384
execute_step(CheckLauncherStep, &bootstrap).await?;
85+
log_memory_usage("After-CheckLauncherStep");
7486
execute_step(StartWsStep, &bootstrap).await?;
87+
log_memory_usage("After-StartWsStep");
7588
execute_step(FireboltGatewayStep, &bootstrap).await?;
89+
log_memory_usage("After-FireboltGatewayStep");
7690
Ok(())
7791
}
7892

core/main/src/bootstrap/manifest/apps.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type AppLibraryLoader = Vec<fn() -> Result<(String, Vec<AppLibraryEntry>), Rippl
4040

4141
fn try_app_library_files() -> Result<Vec<AppLibraryEntry>, RippleError> {
4242
let al_arr: AppLibraryLoader = if cfg!(feature = "local_dev") {
43-
vec![load_from_env, load_from_home]
43+
vec![load_from_env, load_from_home, load_from_etc]
4444
} else if cfg!(test) {
4545
vec![load_from_env]
4646
} else {

core/main/src/bootstrap/manifest/device.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type DeviceManifestLoader = Vec<fn() -> Result<(String, DeviceManifest), RippleE
3636

3737
fn try_manifest_files() -> Result<DeviceManifest, RippleError> {
3838
let dm_arr: DeviceManifestLoader = if cfg!(feature = "local_dev") {
39-
vec![load_from_env, load_from_home]
39+
vec![load_from_env, load_from_home, load_from_etc]
4040
} else if cfg!(test) {
4141
vec![load_from_env]
4242
} else {

core/main/src/bootstrap/manifest/extn.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type ExtnManifestLoader = Vec<fn() -> Result<(String, ExtnManifest), RippleError
3131

3232
fn try_manifest_files() -> Result<ExtnManifest, RippleError> {
3333
let dm_arr: ExtnManifestLoader = if cfg!(feature = "local_dev") {
34-
vec![load_from_env, load_from_home]
34+
vec![load_from_env, load_from_home, load_from_etc]
3535
} else if cfg!(test) {
3636
vec![load_from_env]
3737
} else {

core/main/src/broker/endpoint_broker.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,7 @@ pub struct EndpointBrokerState {
371371
callback: BrokerCallback,
372372
request_map: Arc<RwLock<HashMap<u64, BrokerRequest>>>,
373373
extension_request_map: Arc<RwLock<HashMap<u64, ExtnMessage>>>,
374-
rule_engine: RuleEngine,
374+
rule_engine: Arc<RwLock<RuleEngine>>,
375375
cleaner_list: Arc<RwLock<Vec<BrokerCleaner>>>,
376376
reconnect_tx: Sender<BrokerConnectRequest>,
377377
provider_broker_state: ProvideBrokerState,
@@ -384,7 +384,7 @@ impl Default for EndpointBrokerState {
384384
callback: BrokerCallback::default(),
385385
request_map: Arc::new(RwLock::new(HashMap::new())),
386386
extension_request_map: Arc::new(RwLock::new(HashMap::new())),
387-
rule_engine: RuleEngine::default(),
387+
rule_engine: Arc::new(RwLock::new(RuleEngine::default())),
388388
cleaner_list: Arc::new(RwLock::new(Vec::new())),
389389
reconnect_tx: mpsc::channel(2).0,
390390
provider_broker_state: ProvideBrokerState::default(),
@@ -406,7 +406,7 @@ impl EndpointBrokerState {
406406
callback: BrokerCallback { sender: tx },
407407
request_map: Arc::new(RwLock::new(HashMap::new())),
408408
extension_request_map: Arc::new(RwLock::new(HashMap::new())),
409-
rule_engine,
409+
rule_engine: Arc::new(RwLock::new(rule_engine)),
410410
cleaner_list: Arc::new(RwLock::new(Vec::new())),
411411
reconnect_tx,
412412
provider_broker_state: ProvideBrokerState::default(),
@@ -416,7 +416,7 @@ impl EndpointBrokerState {
416416
state
417417
}
418418
pub fn with_rules_engine(mut self, rule_engine: RuleEngine) -> Self {
419-
self.rule_engine = rule_engine;
419+
self.rule_engine = Arc::new(RwLock::new(rule_engine));
420420
self
421421
}
422422

@@ -523,7 +523,16 @@ impl EndpointBrokerState {
523523
)
524524
}
525525
pub fn build_thunder_endpoint(&mut self) {
526-
if let Some(endpoint) = self.rule_engine.rules.endpoints.get("thunder").cloned() {
526+
let endpoint = {
527+
self.rule_engine
528+
.write()
529+
.unwrap()
530+
.rules
531+
.endpoints
532+
.get("thunder")
533+
.cloned()
534+
};
535+
if let Some(endpoint) = endpoint {
527536
let request = BrokerConnectRequest::new(
528537
"thunder".to_owned(),
529538
endpoint.clone(),
@@ -534,7 +543,8 @@ impl EndpointBrokerState {
534543
}
535544

536545
pub fn build_other_endpoints(&mut self, ps: PlatformState, session: Option<AccountSession>) {
537-
for (key, endpoint) in self.rule_engine.rules.endpoints.clone() {
546+
let endpoints = self.rule_engine.read().unwrap().rules.endpoints.clone();
547+
for (key, endpoint) in endpoints {
538548
// skip thunder endpoint as it is already built using build_thunder_endpoint
539549
if let RuleEndpointProtocol::Thunder = endpoint.protocol {
540550
continue;
@@ -719,7 +729,7 @@ impl EndpointBrokerState {
719729
rpc_request.ctx.clone(),
720730
)
721731
.emit_debug();
722-
if let Some(rule) = self.rule_engine.get_rule(&rpc_request) {
732+
if let Some(rule) = self.rule_engine.read().unwrap().get_rule(&rpc_request) {
723733
found_rule = Some(rule.clone());
724734

725735
if let Some(endpoint) = rule.endpoint {
@@ -847,7 +857,7 @@ impl EndpointBrokerState {
847857
}
848858

849859
pub fn get_rule(&self, rpc_request: &RpcRequest) -> Option<Rule> {
850-
self.rule_engine.get_rule(rpc_request)
860+
self.rule_engine.read().unwrap().get_rule(rpc_request)
851861
}
852862

853863
// Method to cleanup all subscription on App termination

core/main/src/broker/rules_engine.rs

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,12 @@ use ripple_sdk::{
2727
};
2828
use serde::{Deserialize, Serialize};
2929
use std::collections::HashMap;
30+
use std::sync::{Mutex, MutexGuard, Once};
3031
use std::{fs, path::Path};
3132

33+
static BASE_PARSE_CTX_INIT: Once = Once::new();
34+
static mut BASE_PARSE_CTX_PTR: Option<Mutex<ParseCtx>> = None;
35+
3236
#[derive(Debug, Deserialize, Default, Clone)]
3337
pub struct RuleSet {
3438
pub endpoints: HashMap<String, RuleEndpoint>,
@@ -314,6 +318,24 @@ impl RuleEngine {
314318
/// let result = jq_compile(input, filter, String::new());
315319
/// assert_eq!(result.unwrap(), json!("SCXI11BEI_VBN_24Q2_sprint_20240620140024sdy_FG_GRT"));
316320
/// ```
321+
// Initializes the base ParseCtx with core and std filters, only once.
322+
fn get_parse_ctx() -> MutexGuard<'static, ParseCtx> {
323+
BASE_PARSE_CTX_INIT.call_once(|| {
324+
let mut ctx = ParseCtx::new(Vec::new());
325+
ctx.insert_natives(jaq_core::core());
326+
ctx.insert_defs(jaq_std::std());
327+
unsafe {
328+
BASE_PARSE_CTX_PTR = Some(Mutex::new(ctx));
329+
}
330+
});
331+
unsafe {
332+
BASE_PARSE_CTX_PTR
333+
.as_ref()
334+
.expect("BASE_PARSE_CTX_PTR not initialized")
335+
.lock()
336+
.expect("Failed to lock BASE_PARSE_CTX_PTR")
337+
}
338+
}
317339
pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result<Value, RippleError> {
318340
info!(
319341
"Jq rule {} input {:?}, reference {}",
@@ -324,22 +346,22 @@ pub fn jq_compile(input: Value, filter: &str, reference: String) -> Result<Value
324346
// which do not include filters in the standard library
325347
// such as `map`, `select` etc.
326348

327-
let mut defs = ParseCtx::new(Vec::new());
328-
defs.insert_natives(jaq_core::core());
329-
defs.insert_defs(jaq_std::std());
330349
// parse the filter
331350
let (f, errs) = jaq_parse::parse(filter, jaq_parse::main());
332351
if !errs.is_empty() {
333-
error!("Error in rule {:?}", errs);
352+
error!("Error in rule {:?}: {:?}", reference, errs);
334353
return Err(RippleError::RuleError);
335354
}
355+
// Lock and use the shared ParseCtx
356+
let mut defs = get_parse_ctx();
336357
// compile the filter in the context of the given definitions
337358
let f = defs.compile(f.unwrap());
338359
if !defs.errs.is_empty() {
339360
error!("Error in rule {}", reference);
340-
for (err, _) in defs.errs {
361+
for (err, _) in &defs.errs {
341362
error!("reference={} {}", reference, err);
342363
}
364+
defs.errs.clear(); // Clear errors before returning
343365
return Err(RippleError::RuleError);
344366
}
345367

core/main/src/firebolt/handlers/device_rpc.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,9 @@ impl DeviceServer for DeviceImpl {
202202

203203
async fn version(&self, ctx: CallContext) -> RpcResult<DeviceVersionResponse> {
204204
let firmware_info = self.firmware_info(ctx).await?;
205-
let open_rpc_state = self.state.clone().open_rpc_state;
206-
let api = open_rpc_state.get_open_rpc().info;
205+
let open_rpc_state = &self.state.clone().open_rpc_state;
206+
let api = &*open_rpc_state.get_open_rpc().clone();
207+
let api = api.info.clone();
207208

208209
// os is deprecated, for now senidng firmware ver in os as well
209210
let os_ver = firmware_info.clone().version;

core/main/src/firebolt/rpc_router.rs

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717

1818
use futures::StreamExt;
1919
use jsonrpsee::{
20-
core::{
21-
server::{
22-
helpers::MethodSink,
23-
resource_limiting::Resources,
24-
rpc_module::{MethodKind, Methods},
25-
},
26-
TEN_MB_SIZE_BYTES,
20+
core::server::{
21+
helpers::MethodSink,
22+
resource_limiting::Resources,
23+
rpc_module::{MethodCallback, MethodKind, Methods},
2724
},
2825
types::{error::ErrorCode, Id, Params},
2926
};
@@ -71,8 +68,12 @@ impl RouterState {
7168
let _ = methods_state.merge(methods.initialize_resources(&self.resources).unwrap());
7269
}
7370

74-
fn get_methods(&self) -> Methods {
75-
self.methods.read().unwrap().clone()
71+
pub fn get_method_entry(&self, method_name: &str) -> Option<(String, MethodCallback)> {
72+
// Acquire a read lock without cloning the entire Methods registry
73+
let methods_guard = self.methods.read().ok()?;
74+
methods_guard
75+
.method_with_name(method_name)
76+
.map(|(name, method)| (name.to_owned(), method.clone()))
7677
}
7778
}
7879

@@ -84,7 +85,7 @@ impl Default for RouterState {
8485

8586
async fn resolve_route(
8687
platform_state: &mut PlatformState,
87-
methods: Methods,
88+
method_entry: Option<(String, MethodCallback)>,
8889
resources: Resources,
8990
req: RpcRequest,
9091
) -> Result<ApiMessage, RippleError> {
@@ -93,26 +94,29 @@ async fn resolve_route(
9394
let request_c = req.clone();
9495
let sink_size = 1024 * 1024;
9596
let (sink_tx, mut sink_rx) = futures_channel::mpsc::unbounded::<String>();
96-
let sink = MethodSink::new_with_limit(sink_tx, TEN_MB_SIZE_BYTES, 512 * 1024);
97-
let method = request_c.method.clone();
97+
let sink = MethodSink::new_with_limit(sink_tx, 1024 * 1024, 100 * 1024);
98+
let method_name = request_c.method.clone();
9899

99100
tokio::spawn(async move {
100101
let params_json = request_c.params_json.as_ref();
101102
let params = Params::new(Some(params_json));
102103

103-
match methods.method_with_name(&method) {
104+
match method_entry {
104105
None => {
105106
LogSignal::new(
106107
"rpc_router".to_string(),
107108
"resolve_route".into(),
108109
request_c.clone(),
109110
)
110-
.with_diagnostic_context_item("error", &format!("Method not found: {}", method))
111+
.with_diagnostic_context_item(
112+
"error",
113+
&format!("Method not found: {}", method_name),
114+
)
111115
.emit_error();
112116
sink.send_error(id, ErrorCode::MethodNotFound.into());
113117
}
114118
Some((name, method)) => match &method.inner() {
115-
MethodKind::Sync(callback) => match method.claim(name, &resources) {
119+
MethodKind::Sync(callback) => match method.claim(&name, &resources) {
116120
Ok(_guard) => {
117121
if let Err(e) =
118122
sink.send_raw((callback)(id.clone(), params, 512 * 1024).result)
@@ -134,7 +138,7 @@ async fn resolve_route(
134138
sink.send_error(id, ErrorCode::MethodNotFound.into());
135139
}
136140
},
137-
MethodKind::Async(callback) => match method.claim(name, &resources) {
141+
MethodKind::Async(callback) => match method.claim(&name, &resources) {
138142
Ok(guard) => {
139143
let id = id.into_owned();
140144
let params = params.into_owned();
@@ -209,7 +213,7 @@ impl RpcRouter {
209213
session: Session,
210214
timer: Option<Timer>,
211215
) {
212-
let methods = state.router_state.get_methods();
216+
let method_entry = state.router_state.get_method_entry(&req.method);
213217
let resources = state.router_state.resources.clone();
214218

215219
if let Some(overridden_method) = state.get_manifest().has_rpc_override_method(&req.method) {
@@ -218,7 +222,7 @@ impl RpcRouter {
218222
LogSignal::new("rpc_router".to_string(), "routing".into(), req.clone());
219223
tokio::spawn(async move {
220224
let start = Utc::now().timestamp_millis();
221-
let resp = resolve_route(&mut state, methods, resources, req.clone()).await;
225+
let resp = resolve_route(&mut state, method_entry, resources, req.clone()).await;
222226

223227
let status = match resp.clone() {
224228
Ok(msg) => {
@@ -247,7 +251,7 @@ impl RpcRouter {
247251
req: RpcRequest,
248252
extn_msg: ExtnMessage,
249253
) {
250-
let methods = state.router_state.get_methods();
254+
let method_entry = state.router_state.get_method_entry(&req.method);
251255
let resources = state.router_state.resources.clone();
252256

253257
let mut platform_state = state.clone();
@@ -258,7 +262,8 @@ impl RpcRouter {
258262
)
259263
.emit_debug();
260264
tokio::spawn(async move {
261-
if let Ok(msg) = resolve_route(&mut platform_state, methods, resources, req).await {
265+
if let Ok(msg) = resolve_route(&mut platform_state, method_entry, resources, req).await
266+
{
262267
return_extn_response(msg, extn_msg);
263268
}
264269
});

core/main/src/processor/config_processor.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ impl ExtnRequestProcessor for ConfigRequestProcessor {
135135
.unwrap_or_default(),
136136
),
137137
Config::Firebolt => ExtnResponse::Value(
138-
serde_json::to_value(state.open_rpc_state.get_open_rpc()).unwrap_or_default(),
138+
serde_json::to_value(&*state.open_rpc_state.get_open_rpc().clone())
139+
.unwrap_or_default(),
139140
),
140141
Config::RFC(flag) => {
141142
let mut resp =

0 commit comments

Comments
 (0)