diff --git a/crates/router/src/core/payments.rs b/crates/router/src/core/payments.rs index 3592208b857..32114a46b00 100644 --- a/crates/router/src/core/payments.rs +++ b/crates/router/src/core/payments.rs @@ -76,6 +76,7 @@ use redis_interface::errors::RedisError; use router_env::{instrument, tracing}; #[cfg(feature = "olap")] use router_types::transformers::ForeignFrom; +use routing::RoutingStage; use rustc_hash::FxHashMap; use scheduler::utils as pt_utils; #[cfg(feature = "v2")] @@ -100,7 +101,7 @@ use self::{ use super::{ errors::StorageErrorExt, payment_methods::surcharge_decision_configs, - routing::TransactionData, + routing::{transaction_type_from_payments_dsl, TransactionData}, unified_connector_service::{ extract_gateway_system_from_payment_intent, should_call_unified_connector_service, }, @@ -10376,83 +10377,113 @@ where .change_context(errors::ApiErrorResponse::InternalServerError) .attach_printable("Could not decode merchant routing algorithm ref")? .unwrap_or_default(); + algorithm_ref.algorithm_id }; - let (connectors, routing_approach) = routing::perform_static_routing_v1( - state, - platform.get_processor().get_account().get_id(), - routing_algorithm_id.as_ref(), - business_profile, - &TransactionData::Payment(transaction_data.clone()), - ) - .await - .change_context(errors::ApiErrorResponse::InternalServerError)?; - - payment_data.set_routing_approach_in_attempt(routing_approach); - - #[cfg(all(feature = "v1", feature = "dynamic_routing"))] - let payment_attempt = transaction_data.payment_attempt.clone(); - - let connectors = routing::perform_eligibility_analysis_with_fallback( - &state.clone(), - platform.get_processor().get_key_store(), - connectors, - &TransactionData::Payment(transaction_data), - eligible_connectors, - business_profile, + let fallback_config = routing_helpers::get_merchant_default_config( + &*state.clone().store, + business_profile.get_id().get_string_repr(), + &transaction_type_from_payments_dsl(&transaction_data), ) .await .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("failed eligibility analysis and fallback")?; + .attach_printable("euclid: failed to fetch fallback config")?; - // dynamic success based connector selection - #[cfg(all(feature = "v1", feature = "dynamic_routing"))] - let connectors = if let Some(algo) = business_profile.dynamic_routing_algorithm.clone() { - let dynamic_routing_config: api_models::routing::DynamicRoutingAlgorithmRef = algo - .parse_value("DynamicRoutingAlgorithmRef") - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("unable to deserialize DynamicRoutingAlgorithmRef from JSON")?; - let dynamic_split = api_models::routing::RoutingVolumeSplit { - routing_type: api_models::routing::RoutingType::Dynamic, - split: dynamic_routing_config - .dynamic_routing_volume_split - .unwrap_or_default(), - }; - let static_split: api_models::routing::RoutingVolumeSplit = - api_models::routing::RoutingVolumeSplit { - routing_type: api_models::routing::RoutingType::Static, - split: consts::DYNAMIC_ROUTING_MAX_VOLUME - - dynamic_routing_config - .dynamic_routing_volume_split - .unwrap_or_default(), - }; - let volume_split_vec = vec![dynamic_split, static_split]; - let routing_choice = routing::perform_dynamic_routing_volume_split(volume_split_vec, None) - .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("failed to perform volume split on routing type")?; - - if routing_choice.routing_type.is_dynamic_routing() - && state.conf.open_router.dynamic_routing_enabled + let cached_algorithm = if let Some(routing_algorithm_id) = routing_algorithm_id { + match routing::ensure_algorithm_cached_v1( + state, + &business_profile.merchant_id, + &routing_algorithm_id, + business_profile.get_id(), + &transaction_type_from_payments_dsl(&transaction_data), + ) + .await { - routing::perform_dynamic_routing_with_open_router( - state, - connectors.clone(), - business_profile, - payment_attempt, - payment_data, - ) - .await - .map_err(|e| logger::error!(open_routing_error=?e)) - .unwrap_or(connectors) - } else { - connectors + Ok(algo) => Some(algo), + Err(err) => { + logger::error!( + error=?err, + "euclid_routing: ensure_algorithm_cached failed, falling back" + ); + None + } } } else { - connectors + None }; - let connector_data = connectors + let routable_connectors: Vec = + if let Some(cached_algorithm) = cached_algorithm { + let routing_attempt = async { + let static_stage = routing::StaticRoutingStage { + ctx: routing::RoutingContext { + routing_algorithm: cached_algorithm, + }, + }; + + let static_input = routing::StaticRoutingInput { + platform, + business_profile, + eligible_connectors: eligible_connectors.as_ref(), + transaction_data: &transaction_data, + }; + + let mut routing_outcome = static_stage.route(static_input).await?; + + let eligibility = routing::perform_eligibility_analysis_with_fallback( + state, + platform.get_processor().get_key_store(), + routing_outcome.connectors.clone(), + &TransactionData::Payment(transaction_data.clone()), + eligible_connectors, + business_profile, + ) + .await?; + + routing_outcome.connectors = eligibility; + + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] + { + if business_profile.dynamic_routing_algorithm.is_some() { + let dynamic_input = routing::DynamicRoutingInput { + state, + business_profile, + transaction_data: &transaction_data, + static_connectors: &routing_outcome.connectors, + }; + + let dynamic_stage = routing::DynamicRoutingStage; + + if let Ok(dynamic_outcome) = dynamic_stage.route(dynamic_input).await { + routing_outcome.connectors = dynamic_outcome.connectors; + } + } + } + + Ok::<_, error_stack::Report>(routing_outcome.connectors) + } + .await; + + match routing_attempt { + Ok(connectors) if !connectors.is_empty() => connectors, + Ok(_) => { + logger::warn!("euclid: empty routing result, falling back"); + fallback_config.clone() + } + Err(err) => { + logger::error!( + error=?err, + "euclid: routing failed, falling back to merchant default" + ); + fallback_config.clone() + } + } + } else { + fallback_config.clone() + }; + + let connector_data = routable_connectors .into_iter() .map(|conn| { api::ConnectorData::get_connector_by_name( @@ -10465,7 +10496,7 @@ where }) .collect::, _>>() .change_context(errors::ApiErrorResponse::InternalServerError) - .attach_printable("Invalid connector name received")?; + .attach_printable("euclid: Invalid connector name received")?; decide_multiplex_connector_for_normal_or_recurring_payment( state, diff --git a/crates/router/src/core/payments/routing.rs b/crates/router/src/core/payments/routing.rs index 27fa56a315b..28049029050 100644 --- a/crates/router/src/core/payments/routing.rs +++ b/crates/router/src/core/payments/routing.rs @@ -4,7 +4,7 @@ pub mod utils; use std::collections::hash_map; #[cfg(all(feature = "v1", feature = "dynamic_routing"))] use std::hash::{Hash, Hasher}; -use std::{collections::HashMap, str::FromStr, sync::Arc}; +use std::{collections::HashMap, future::Future, pin::Pin, str::FromStr, sync::Arc}; #[cfg(feature = "v1")] use api_models::open_router::{self as or_types, DecidedGateway, OpenRouterDecideGatewayRequest}; @@ -496,6 +496,183 @@ pub fn make_dsl_input( }) } +type BoxFuture<'a, T> = Pin + Send + 'a>>; +pub trait RoutingStage: Send + Sync { + type Input<'a> + where + Self: 'a; + + type Fut<'a>: Future> + Send + where + Self: 'a; + + fn route<'a>(&'a self, input: Self::Input<'a>) -> Self::Fut<'a>; + + fn routing_approach(&self) -> common_enums::RoutingApproach; +} + +pub struct RoutingContext { + pub routing_algorithm: Arc, +} + +pub struct ConnectorOutcome { + pub connectors: Vec, +} + +pub struct StaticRoutingInput<'a> { + pub platform: &'a domain::Platform, + pub business_profile: &'a domain::Profile, + pub eligible_connectors: Option<&'a Vec>, + pub transaction_data: &'a routing::PaymentsDslInput<'a>, +} + +pub struct StaticRoutingStage { + pub ctx: RoutingContext, +} + +impl RoutingStage for StaticRoutingStage { + type Input<'a> = StaticRoutingInput<'a>; + type Fut<'a> = BoxFuture<'a, RoutingResult>; + + fn route<'a>(&'a self, input: Self::Input<'a>) -> Self::Fut<'a> { + Box::pin(async move { + let connectors = static_routing_v1( + &self.ctx.routing_algorithm, + &routing::TransactionData::Payment(input.transaction_data.clone()), + ) + .await + .change_context(errors::RoutingError::DslExecutionError) + .attach_printable("euclid: unable to perform static routing")?; + + Ok(ConnectorOutcome { connectors }) + }) + } + + fn routing_approach(&self) -> common_enums::RoutingApproach { + common_enums::RoutingApproach::RuleBasedRouting + } +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +pub struct DynamicRoutingStage; + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl DynamicRoutingStage { + async fn resolve_dynamic_connectors( + &self, + input: &DynamicRoutingInput<'_>, + routing_type: &api_models::routing::RoutingType, + ) -> RoutingResult> { + if !routing_type.is_dynamic_routing() + || !input.state.conf.open_router.dynamic_routing_enabled + { + Ok(input.static_connectors.to_vec()) + } else { + let payment_attempt = input.transaction_data.payment_attempt.clone(); + let dynamic_result = perform_dynamic_routing_with_open_router( + input.state, + input.static_connectors.to_vec(), + input.business_profile, + payment_attempt, + ) + .await + .unwrap_or_else(|e| { + logger::error!(open_router_error=?e); + DynamicRoutingResult { + connectors: input.static_connectors.to_vec(), + routing_approach: None, + } + }); + + Ok(dynamic_result.connectors) + } + } +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +pub struct DynamicRoutingInput<'a> { + pub state: &'a SessionState, + pub business_profile: &'a domain::Profile, + pub transaction_data: &'a routing::PaymentsDslInput<'a>, + pub static_connectors: &'a [routing_types::RoutableConnectorChoice], +} + +#[cfg(all(feature = "v1", feature = "dynamic_routing"))] +impl RoutingStage for DynamicRoutingStage { + type Input<'a> = DynamicRoutingInput<'a>; + type Fut<'a> = BoxFuture<'a, RoutingResult>; + + fn route<'a>(&'a self, input: Self::Input<'a>) -> Self::Fut<'a> { + Box::pin(async move { + let Some(algo) = input.business_profile.dynamic_routing_algorithm.clone() else { + return Ok(ConnectorOutcome { + connectors: input.static_connectors.to_vec(), + }); + }; + + let dynamic_routing_config: api_models::routing::DynamicRoutingAlgorithmRef = algo + .parse_value("DynamicRoutingAlgorithmRef") + .change_context(errors::RoutingError::DeserializationError { + from: "json".into(), + to: "DynamicAlgorithmRef".into(), + })?; + + let dynamic_split = api_models::routing::RoutingVolumeSplit { + routing_type: api_models::routing::RoutingType::Dynamic, + split: dynamic_routing_config + .dynamic_routing_volume_split + .unwrap_or_default(), + }; + + let static_split = api_models::routing::RoutingVolumeSplit { + routing_type: api_models::routing::RoutingType::Static, + split: crate::consts::DYNAMIC_ROUTING_MAX_VOLUME + - dynamic_routing_config + .dynamic_routing_volume_split + .unwrap_or_default(), + }; + + let routing_choice = + perform_dynamic_routing_volume_split(vec![dynamic_split, static_split], None) + .change_context(errors::RoutingError::VolumeSplitFailed) + .attach_printable("failed to perform volume split on routing type")?; + + let connectors = self + .resolve_dynamic_connectors(&input, &routing_choice.routing_type) + .await?; + + Ok(ConnectorOutcome { connectors }) + }) + } + + fn routing_approach(&self) -> common_enums::RoutingApproach { + common_enums::RoutingApproach::SuccessRateExploration + } +} + +pub async fn static_routing_v1( + routing_algorithm: &CachedAlgorithm, + transaction_data: &routing::TransactionData<'_>, +) -> RoutingResult> { + logger::debug!("euclid_routing: performing routing for connector selection"); + let backend_input = match transaction_data { + routing::TransactionData::Payment(payment_data) => make_dsl_input(payment_data)?, + #[cfg(feature = "payouts")] + routing::TransactionData::Payout(payout_data) => make_dsl_input_for_payouts(payout_data)?, + }; + + let routable_connectors = match routing_algorithm { + CachedAlgorithm::Single(conn) => vec![(**conn).clone()], + CachedAlgorithm::Priority(plist) => plist.clone(), + CachedAlgorithm::VolumeSplit(splits) => perform_volume_split(splits.to_vec()) + .change_context(errors::RoutingError::ConnectorSelectionFailed)?, + CachedAlgorithm::Advanced(interpreter) => { + execute_dsl_and_get_connector_v1(backend_input, interpreter)? + } + }; + Ok(routable_connectors) +} + pub async fn perform_static_routing_v1( state: &SessionState, merchant_id: &common_utils::id_type::MerchantId, @@ -633,7 +810,7 @@ pub async fn perform_static_routing_v1( )) } -async fn ensure_algorithm_cached_v1( +pub async fn ensure_algorithm_cached_v1( state: &SessionState, merchant_id: &common_utils::id_type::MerchantId, algorithm_id: &common_utils::id_type::RoutingId, @@ -1734,18 +1911,18 @@ pub fn make_dsl_input_for_surcharge( Ok(backend_input) } +pub struct DynamicRoutingResult { + pub connectors: Vec, + pub routing_approach: Option, +} + #[cfg(all(feature = "v1", feature = "dynamic_routing"))] -pub async fn perform_dynamic_routing_with_open_router( +pub async fn perform_dynamic_routing_with_open_router( state: &SessionState, routable_connectors: Vec, profile: &domain::Profile, payment_data: oss_storage::PaymentAttempt, - old_payment_data: &mut D, -) -> RoutingResult> -where - F: Send + Clone, - D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, -{ +) -> RoutingResult { let dynamic_routing_algo_ref: api_routing::DynamicRoutingAlgorithmRef = profile .dynamic_routing_algorithm .clone() @@ -1777,7 +1954,6 @@ where profile.get_id(), &payment_data, is_elimination_enabled, - old_payment_data, ) .await?; @@ -1785,7 +1961,7 @@ where // This will initiate the elimination process for the connector. // Penalize the elimination score of the connector before making a payment. // Once the payment is made, we will update the score based on the payment status - if let Some(connector) = connectors.first() { + if let Some(connector) = connectors.connectors.first() { logger::debug!( "penalizing the elimination score of the gateway with id {} in open_router for profile {}", connector, profile.get_id().get_string_repr() @@ -1803,7 +1979,10 @@ where } connectors } else { - routable_connectors + DynamicRoutingResult { + connectors: routable_connectors, + routing_approach: Some(utils::RoutingApproach::Default.into()), + } }; Ok(connectors) @@ -2007,18 +2186,13 @@ where #[cfg(all(feature = "v1", feature = "dynamic_routing"))] #[instrument(skip_all)] -pub async fn perform_decide_gateway_call_with_open_router( +pub async fn perform_decide_gateway_call_with_open_router( state: &SessionState, mut routable_connectors: Vec, profile_id: &common_utils::id_type::ProfileId, payment_attempt: &oss_storage::PaymentAttempt, is_elimination_enabled: bool, - old_payment_data: &mut D, -) -> RoutingResult> -where - F: Send + Clone, - D: OperationSessionGetters + OperationSessionSetters + Send + Sync + Clone, -{ +) -> RoutingResult { logger::debug!( "performing decide_gateway call with open_router for profile {}", profile_id.get_string_repr() @@ -2075,11 +2249,11 @@ where .to_string(), ); - old_payment_data.set_routing_approach_in_attempt(Some( + let routing_approach = Some( common_enums::RoutingApproach::from_decision_engine_approach( &decided_gateway.routing_approach, ), - )); + ); if let Some(gateway_priority_map) = decided_gateway.gateway_priority_map { logger::debug!(gateway_priority_map=?gateway_priority_map, routing_approach=decided_gateway.routing_approach, "open_router decide_gateway call response"); @@ -2101,7 +2275,10 @@ where routing_event.set_routable_connectors(routable_connectors.clone()); state.event_handler().log_event(&routing_event); - Ok(routable_connectors) + Ok(DynamicRoutingResult { + connectors: routable_connectors, + routing_approach, + }) } Err(err) => { logger::error!("open_router_error_response: {:?}", err); @@ -2112,7 +2289,10 @@ where } }?; - Ok(sr_sorted_connectors) + Ok(DynamicRoutingResult { + connectors: sr_sorted_connectors.connectors, + routing_approach: sr_sorted_connectors.routing_approach, + }) } #[cfg(all(feature = "v1", feature = "dynamic_routing"))] diff --git a/crates/router/src/core/routing.rs b/crates/router/src/core/routing.rs index 41eee6443a4..e1626ef8898 100644 --- a/crates/router/src/core/routing.rs +++ b/crates/router/src/core/routing.rs @@ -2721,3 +2721,8 @@ pub async fn update_gateway_score_open_router( response, )) } + +pub fn transaction_type_from_payments_dsl(input: &PaymentsDslInput<'_>) -> enums::TransactionType { + let txn_data = TransactionData::Payment(input.clone()); + enums::TransactionType::from(&txn_data) +}