Skip to content

Commit 7875318

Browse files
kvfasilCopilot
andauthored
RPPL-3818 : Share_watch_history(), discovery.lauch() api added/updated(8.1). (#941)
* Backporting discovery.lauch() changes. * Share_watch_history api added. * Share_watch_history api added. * Share_watch_history Request enum created. * picked some childpolicy chnages #885. * code clean up. * Updated log info Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Updated session handling. Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Removed intent update. * Feat: added http POST support. --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent d10816a commit 7875318

File tree

5 files changed

+213
-134
lines changed

5 files changed

+213
-134
lines changed

core/main/src/broker/http_broker.rs

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,16 @@ use ripple_sdk::{
2424
tokio::{self, sync::mpsc},
2525
utils::error::RippleError,
2626
};
27+
use serde_json::Value;
2728

2829
use super::endpoint_broker::{
2930
BrokerCallback, BrokerCleaner, BrokerConnectRequest, BrokerOutputForwarder, BrokerRequest,
3031
BrokerSender, EndpointBroker, EndpointBrokerState, BROKER_CHANNEL_BUFFER_SIZE,
3132
};
32-
use crate::state::platform_state::PlatformState;
33+
use crate::{
34+
broker::rules_engine::{jq_compile, RuleTransformType},
35+
state::platform_state::PlatformState,
36+
};
3337
use tokio_tungstenite::tungstenite::http::uri::InvalidUri;
3438

3539
pub struct HttpBroker {
@@ -42,39 +46,59 @@ pub struct HttpBroker {
4246

4347
async fn send_http_request(
4448
client: &Client<HttpConnector>,
45-
method: Method,
4649
uri: &Uri,
47-
path: &str,
50+
broker_request: BrokerRequest,
4851
) -> Result<Response<Body>, RippleError> {
4952
/*
5053
TODO? we may need to support body for POST request in the future
5154
*/
52-
let http_request = Request::new(Body::empty());
53-
let (mut parts, _) = http_request.into_parts();
54-
//TODO, need to refactor to support other methods
55-
parts.method = method.clone();
56-
/*
57-
mix endpoint url with method
58-
*/
5955

60-
let uri: Uri = format!("{}{}", uri, path)
56+
let mut method = Method::GET;
57+
let mut body = Body::empty();
58+
59+
// A rule with a request transform defined indicates that the request is a POST, where
60+
// the request transform is the body of the request. Otherwise, it is a GET request.
61+
62+
if let Some(request_transform) = broker_request
63+
.rule
64+
.transform
65+
.get_transform_data(RuleTransformType::Request)
66+
{
67+
method = Method::POST;
68+
69+
let transform_params =
70+
match serde_json::from_str::<Vec<Value>>(&broker_request.rpc.params_json) {
71+
Ok(mut params) => params.pop().unwrap_or(Value::Null),
72+
Err(e) => {
73+
error!(
74+
"send_http_request: Error in http broker parsing request params: e={:?}",
75+
e
76+
);
77+
Value::Null
78+
}
79+
};
80+
81+
let body_val = jq_compile(
82+
transform_params,
83+
&request_transform,
84+
format!("{}_http_post", broker_request.rpc.ctx.method),
85+
)?;
86+
87+
body = Body::from(body_val.to_string());
88+
}
89+
90+
let uri: Uri = format!("{}{}", uri, broker_request.rule.alias)
6191
.parse()
6292
.map_err(|e: InvalidUri| RippleError::BrokerError(e.to_string()))?;
63-
let new_request = Request::builder()
64-
.uri(uri)
65-
.body(Body::empty())
66-
.map_err(|e| RippleError::BrokerError(e.to_string()))?;
67-
let (uri_parts, _) = new_request.into_parts();
6893

69-
parts.uri = uri_parts.uri;
94+
debug!("http_broker sending {} request={}", method, uri,);
7095

71-
let http_request = Request::from_parts(parts, Body::empty());
96+
let http_request = Request::builder()
97+
.uri(uri)
98+
.method(method)
99+
.body(body)
100+
.map_err(|e| RippleError::BrokerError(e.to_string()))?;
72101

73-
debug!(
74-
"http_broker sending {} request={}",
75-
method,
76-
http_request.uri(),
77-
);
78102
match client.request(http_request).await {
79103
Ok(v) => Ok(v),
80104
Err(e) => {
@@ -129,7 +153,7 @@ impl EndpointBroker for HttpBroker {
129153
while let Some(request) = tr.recv().await {
130154
LogSignal::new("http_broker".to_string(), format!("received request - start processing request={:?}", request), request.rpc.ctx.clone())
131155
.with_diagnostic_context_item("rule_alias", request.rule.alias.as_str()).emit_debug();
132-
match send_http_request(&client, Method::GET, &uri, &request.clone().rule.alias)
156+
match send_http_request(&client, &uri, request.clone())
133157
.await
134158
{
135159
Ok(response) => {

core/main/src/firebolt/handlers/discovery_rpc.rs

Lines changed: 87 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
use std::{collections::HashMap, time::Duration};
1919

2020
use crate::{
21-
firebolt::handlers::privacy_rpc::PrivacyImpl,
22-
firebolt::rpc::RippleRPCProvider,
21+
broker::broker_utils::BrokerUtils,
22+
firebolt::{handlers::privacy_rpc::PrivacyImpl, rpc::RippleRPCProvider},
2323
service::apps::{
2424
app_events::{AppEventDecorationError, AppEventDecorator, AppEvents},
2525
provider_broker::{self, ProviderBroker},
@@ -37,7 +37,7 @@ use jsonrpsee::{
3737
use ripple_sdk::{
3838
api::{
3939
account_link::AccountLinkRequest,
40-
apps::{AppError, AppManagerResponse, AppMethod, AppRequest, AppResponse},
40+
apps::{AppError, AppManagerResponse, AppMethod, AppRequest},
4141
config::Config,
4242
firebolt::{
4343
fb_capabilities::FireboltCap,
@@ -51,7 +51,7 @@ use ripple_sdk::{
5151
},
5252
},
5353
extn::extn_client_message::ExtnResponse,
54-
log::{error, info},
54+
log::{debug, error, info},
5555
tokio::{sync::oneshot, time::timeout},
5656
};
5757
use ripple_sdk::{
@@ -527,7 +527,7 @@ impl DiscoveryServer for DiscoveryImpl {
527527
DiscoveryImpl::get_content_policy(&ctx, &self.state, &ctx.app_id).await
528528
}
529529

530-
async fn launch(&self, ctx: CallContext, request: LaunchRequest) -> RpcResult<bool> {
530+
async fn launch(&self, _ctx: CallContext, request: LaunchRequest) -> RpcResult<bool> {
531531
let app_defaults_configuration = self.state.get_device_manifest().applications.defaults;
532532

533533
let intent_validation_config = self
@@ -537,7 +537,7 @@ impl DiscoveryServer for DiscoveryImpl {
537537
.intent_validation;
538538
validate_navigation_intent(intent_validation_config, request.intent.clone()).await?;
539539

540-
let req_updated_source = update_intent_source(ctx.app_id.clone(), request.clone());
540+
let t_state = &mut self.state.clone();
541541

542542
if let Some(reserved_app_id) =
543543
app_defaults_configuration.get_reserved_application_id(&request.app_id)
@@ -552,47 +552,26 @@ impl DiscoveryServer for DiscoveryImpl {
552552
));
553553
}
554554

555-
// Not validating the intent, pass-through to app as is.
556-
if !AppEvents::is_app_registered_for_event(
557-
&self.state,
558-
reserved_app_id.to_string(),
559-
DISCOVERY_EVENT_ON_NAVIGATE_TO,
560-
) {
561-
return Err(rpc_navigate_reserved_app_err(
562-
format!("Discovery.launch: reserved app id {} is not registered for discovery.onNavigateTo event",
563-
reserved_app_id).as_str(),
564-
));
565-
}
566-
// emit EVENT_ON_NAVIGATE_TO to the reserved app.
567-
AppEvents::emit_to_app(
568-
&self.state,
569-
reserved_app_id.to_string(),
570-
DISCOVERY_EVENT_ON_NAVIGATE_TO,
571-
&serde_json::to_value(req_updated_source.intent).unwrap(),
555+
match BrokerUtils::process_internal_main_request(
556+
t_state,
557+
"discovery.launch.internal",
558+
Some(serde_json::to_value(request).map_err(|e| {
559+
error!("Serialization error: {:?}", e);
560+
rpc_err("Failed to serialize LaunchIntent")
561+
})?),
572562
)
573-
.await;
574-
info!(
575-
"emit_to_app called for app {} event {}",
576-
reserved_app_id.to_string(),
577-
DISCOVERY_EVENT_ON_NAVIGATE_TO
578-
);
579-
return Ok(true);
580-
}
581-
let (app_resp_tx, app_resp_rx) = oneshot::channel::<AppResponse>();
582-
583-
let app_request =
584-
AppRequest::new(AppMethod::Launch(req_updated_source.clone()), app_resp_tx);
585-
586-
if self
587-
.state
588-
.get_client()
589-
.send_app_request(app_request)
590-
.is_ok()
591-
&& app_resp_rx.await.is_ok()
592-
{
593-
return Ok(true);
563+
.await
564+
{
565+
Ok(val) => {
566+
debug!("Internal subscription launch successful");
567+
return Ok(val.as_bool().unwrap_or(false));
568+
}
569+
Err(e) => {
570+
error!("Internal subscription launch failed: {:?}", e);
571+
return Err(rpc_err("Internal subscription launch failed"));
572+
}
573+
}
594574
}
595-
596575
Err(jsonrpsee::core::Error::Custom(String::from(
597576
"Discovery.launch: some failure",
598577
)))
@@ -788,70 +767,69 @@ impl DiscoveryServer for DiscoveryImpl {
788767
}
789768
}
790769
}
791-
fn update_intent_source(source_app_id: String, request: LaunchRequest) -> LaunchRequest {
792-
let source = format!("xrn:firebolt:application:{}", source_app_id);
793-
match request.intent.clone() {
794-
Some(NavigationIntent::NavigationIntentStrict(navigation_intent)) => {
795-
let updated_navigation_intent = match navigation_intent {
796-
NavigationIntentStrict::Home(mut home_intent) => {
797-
home_intent.context.source = source;
798-
NavigationIntentStrict::Home(home_intent)
799-
}
800-
NavigationIntentStrict::Launch(mut launch_intent) => {
801-
launch_intent.context.source = source;
802-
NavigationIntentStrict::Launch(launch_intent)
803-
}
804-
NavigationIntentStrict::Entity(mut entity_intent) => {
805-
entity_intent.context.source = source;
806-
NavigationIntentStrict::Entity(entity_intent)
807-
}
808-
NavigationIntentStrict::Playback(mut playback_intent) => {
809-
playback_intent.context.source = source;
810-
NavigationIntentStrict::Playback(playback_intent)
811-
}
812-
NavigationIntentStrict::Search(mut search_intent) => {
813-
search_intent.context.source = source;
814-
NavigationIntentStrict::Search(search_intent)
815-
}
816-
NavigationIntentStrict::Section(mut section_intent) => {
817-
section_intent.context.source = source;
818-
NavigationIntentStrict::Section(section_intent)
819-
}
820-
NavigationIntentStrict::Tune(mut tune_intent) => {
821-
tune_intent.context.source = source;
822-
NavigationIntentStrict::Tune(tune_intent)
823-
}
824-
NavigationIntentStrict::ProviderRequest(mut provider_request_intent) => {
825-
provider_request_intent.context.source = source;
826-
NavigationIntentStrict::ProviderRequest(provider_request_intent)
827-
}
828-
NavigationIntentStrict::PlayEntity(mut p) => {
829-
p.context.source = source;
830-
NavigationIntentStrict::PlayEntity(p)
831-
}
832-
NavigationIntentStrict::PlayQuery(mut p) => {
833-
p.context.source = source;
834-
NavigationIntentStrict::PlayQuery(p)
835-
}
836-
};
837-
838-
LaunchRequest {
839-
app_id: request.app_id,
840-
intent: Some(NavigationIntent::NavigationIntentStrict(
841-
updated_navigation_intent,
842-
)),
843-
}
844-
}
845-
Some(NavigationIntent::NavigationIntentLoose(mut loose_intent)) => {
846-
loose_intent.context.source = source;
847-
LaunchRequest {
848-
app_id: request.app_id,
849-
intent: Some(NavigationIntent::NavigationIntentLoose(loose_intent)),
850-
}
851-
}
852-
_ => request,
853-
}
854-
}
770+
// fn update_intent_source(source: String, request: LaunchRequest) -> LaunchRequest {
771+
// match request.intent.clone() {
772+
// Some(NavigationIntent::NavigationIntentStrict(navigation_intent)) => {
773+
// let updated_navigation_intent = match navigation_intent {
774+
// NavigationIntentStrict::Home(mut home_intent) => {
775+
// home_intent.context.source = source;
776+
// NavigationIntentStrict::Home(home_intent)
777+
// }
778+
// NavigationIntentStrict::Launch(mut launch_intent) => {
779+
// launch_intent.context.source = source;
780+
// NavigationIntentStrict::Launch(launch_intent)
781+
// }
782+
// NavigationIntentStrict::Entity(mut entity_intent) => {
783+
// entity_intent.context.source = source;
784+
// NavigationIntentStrict::Entity(entity_intent)
785+
// }
786+
// NavigationIntentStrict::Playback(mut playback_intent) => {
787+
// playback_intent.context.source = source;
788+
// NavigationIntentStrict::Playback(playback_intent)
789+
// }
790+
// NavigationIntentStrict::Search(mut search_intent) => {
791+
// search_intent.context.source = source;
792+
// NavigationIntentStrict::Search(search_intent)
793+
// }
794+
// NavigationIntentStrict::Section(mut section_intent) => {
795+
// section_intent.context.source = source;
796+
// NavigationIntentStrict::Section(section_intent)
797+
// }
798+
// NavigationIntentStrict::Tune(mut tune_intent) => {
799+
// tune_intent.context.source = source;
800+
// NavigationIntentStrict::Tune(tune_intent)
801+
// }
802+
// NavigationIntentStrict::ProviderRequest(mut provider_request_intent) => {
803+
// provider_request_intent.context.source = source;
804+
// NavigationIntentStrict::ProviderRequest(provider_request_intent)
805+
// }
806+
// NavigationIntentStrict::PlayEntity(mut p) => {
807+
// p.context.source = source;
808+
// NavigationIntentStrict::PlayEntity(p)
809+
// }
810+
// NavigationIntentStrict::PlayQuery(mut p) => {
811+
// p.context.source = source;
812+
// NavigationIntentStrict::PlayQuery(p)
813+
// }
814+
// };
815+
816+
// LaunchRequest {
817+
// app_id: request.app_id,
818+
// intent: Some(NavigationIntent::NavigationIntentStrict(
819+
// updated_navigation_intent,
820+
// )),
821+
// }
822+
// }
823+
// Some(NavigationIntent::NavigationIntentLoose(mut loose_intent)) => {
824+
// loose_intent.context.source = source;
825+
// LaunchRequest {
826+
// app_id: request.app_id,
827+
// intent: Some(NavigationIntent::NavigationIntentLoose(loose_intent)),
828+
// }
829+
// }
830+
// _ => request,
831+
// }
832+
// }
855833

856834
pub async fn validate_navigation_intent(
857835
intent_validation_config: IntentValidation,

0 commit comments

Comments
 (0)