@@ -12,15 +12,21 @@ use diesel::{
1212 dsl:: max, BoolExpressionMethods , ExpressionMethods , QueryDsl , RunQueryDsl ,
1313 SelectableHelper ,
1414} ;
15- use fred:: interfaces:: KeysInterface ;
1615use itertools:: Itertools ;
1716use serde_json:: { json, Map , Value } ;
1817#[ cfg( feature = "jsonlogic" ) ]
1918use service_utils:: helpers:: extract_dimensions;
20- use service_utils:: service:: types:: {
21- AppHeader , AppState , DbConnection , SchemaName , WorkspaceContext ,
19+ use service_utils:: {
20+ redis:: {
21+ fetch_from_redis_else_writeback, AUDIT_ID_KEY_SUFFIX , CONFIG_KEY_SUFFIX ,
22+ CONFIG_VERSION_KEY_SUFFIX , LAST_MODIFIED_KEY_SUFFIX ,
23+ } ,
24+ service:: {
25+ get_db_connection,
26+ types:: { AppHeader , AppState , DbConnection , SchemaName , WorkspaceContext } ,
27+ } ,
2228} ;
23- use superposition_macros:: { bad_argument, db_error, unexpected_error} ;
29+ use superposition_macros:: { bad_argument, db_error, not_found , unexpected_error} ;
2430use superposition_types:: {
2531 api:: {
2632 config:: { ConfigQuery , ContextPayload , MergeStrategy , ResolveConfigQuery } ,
@@ -43,17 +49,11 @@ use superposition_types::{
4349} ;
4450use uuid:: Uuid ;
4551
46- use crate :: helpers:: {
47- calculate_context_weight, evaluate_remote_cohorts, generate_cac, AUDIT_ID_KEY_SUFFIX ,
48- CONFIG_VERSION_KEY_SUFFIX , LAST_MODIFIED_KEY_SUFFIX ,
49- } ;
50- use crate :: {
51- api:: {
52- context:: { self , helpers:: query_description} ,
53- dimension:: fetch_dimensions_info_map,
54- } ,
55- helpers:: get_config_from_redis,
52+ use crate :: api:: {
53+ context:: { self , helpers:: query_description} ,
54+ dimension:: fetch_dimensions_info_map,
5655} ;
56+ use crate :: helpers:: { calculate_context_weight, evaluate_remote_cohorts, generate_cac} ;
5757
5858use super :: helpers:: apply_prefix_filter_to_config;
5959
@@ -118,22 +118,18 @@ fn get_config_version(
118118 )
119119}
120120
121- pub fn add_audit_id_to_header (
121+ pub fn fetch_audit_id (
122122 conn : & mut DBConnection ,
123- resp_builder : & mut HttpResponseBuilder ,
124123 schema_name : & SchemaName ,
125- ) {
126- if let Ok ( uuid ) = event_log:: event_log
124+ ) -> Option < String > {
125+ event_log:: event_log
127126 . select ( event_log:: id)
128127 . filter ( event_log:: table_name. eq ( "contexts" ) )
129128 . order_by ( event_log:: timestamp. desc ( ) )
130129 . schema_name ( schema_name)
131130 . first :: < Uuid > ( conn)
132- {
133- resp_builder. insert_header ( ( AppHeader :: XAuditId . to_string ( ) , uuid. to_string ( ) ) ) ;
134- } else {
135- log:: error!( "Failed to fetch contexts from event_log" ) ;
136- }
131+ . map ( |uuid| uuid. to_string ( ) )
132+ . ok ( )
137133}
138134
139135fn add_last_modified_to_header (
@@ -645,72 +641,29 @@ async fn reduce_config(
645641async fn get_config (
646642 req : HttpRequest ,
647643 body : Option < Json < ContextPayload > > ,
648- db_conn : DbConnection ,
649644 dimension_params : DimensionQuery < QueryMap > ,
650645 query_filters : superposition_query:: Query < ConfigQuery > ,
651646 workspace_context : WorkspaceContext ,
652647 state : Data < AppState > ,
653648) -> superposition:: Result < HttpResponse > {
654- let DbConnection ( mut conn) = db_conn;
655-
656649 let mut response = HttpResponse :: Ok ( ) ;
657650 let is_smithy = req. method ( ) == actix_web:: http:: Method :: GET ;
658-
659- if let Some ( ref redis_pool) = state. redis {
660- let schema_name = workspace_context. schema_name ;
661- let client = redis_pool. next_connected ( ) ;
662- let last_modified_at_key = format ! ( "{}{LAST_MODIFIED_KEY_SUFFIX}" , * schema_name) ;
663- let audit_id_key = format ! ( "{}{AUDIT_ID_KEY_SUFFIX}" , * schema_name) ;
664- let config_version_key = format ! ( "{}{CONFIG_VERSION_KEY_SUFFIX}" , * schema_name) ;
665- let audit_id: String = client. get ( & audit_id_key) . await . map_err ( |e| {
666- log:: error!(
667- "failed to fetch audit id from redis for schema {}: {}" ,
668- * schema_name,
669- e
670- ) ;
671- unexpected_error ! ( "failed to fetch audit id from redis" )
672- } ) ?;
673- let last_modified_at = client
674- . get :: < String , String > ( last_modified_at_key)
675- . await
676- . map ( |time| {
677- DateTime :: parse_from_rfc2822 ( & time)
678- . map_err ( |err| {
679- log:: error!( "Error occurred while parsing last_modified: {}" , err)
680- } )
681- . ok ( )
682- . map ( |dt| dt. with_timezone ( & Utc ) )
651+ let schema_name = workspace_context. schema_name . clone ( ) ;
652+ let max_created_at = fetch_from_redis_else_writeback :: < DateTime < Utc > > (
653+ format ! ( "{}{LAST_MODIFIED_KEY_SUFFIX}" , schema_name. 0 ) ,
654+ & schema_name,
655+ state. redis . clone ( ) ,
656+ state. db_pool . clone ( ) ,
657+ |db_pool| {
658+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
659+ get_max_created_at ( & mut conn, & schema_name) . map_err ( |e| {
660+ log:: error!( "failed to fetch max timestamp from event_log: {e}" ) ;
661+ db_error ! ( e)
683662 } )
684- . map_err ( |e| {
685- log:: error!(
686- "failed to fetch last modified at from redis for schema {}: {}" ,
687- * schema_name,
688- e
689- ) ;
690- unexpected_error ! ( "failed to fetch last modified at from redis" )
691- } ) ?;
692- let version = client. get ( & config_version_key) . await . map_err ( |e| {
693- log:: error!(
694- "failed to fetch config version from redis for schema {}: {}" ,
695- * schema_name,
696- e
697- ) ;
698- unexpected_error ! ( "failed to fetch config version from redis" )
699- } ) ?;
700- let config =
701- get_config_from_redis ( & schema_name, redis_pool, Some ( client) ) . await ?;
702-
703- add_last_modified_to_header ( last_modified_at, is_smithy, & mut response) ;
704- response. insert_header ( ( AppHeader :: XAuditId . to_string ( ) , audit_id) ) ;
705- add_config_version_to_header ( & version, & mut response) ;
706- return Ok ( response. json ( config) ) ;
707- }
708-
709- // if fast mode isn't enabled, read from DB
710-
711- let max_created_at = get_max_created_at ( & mut conn, & workspace_context. schema_name )
712- . map_err ( |e| log:: error!( "failed to fetch max timestamp from event_log: {e}" ) )
713- . ok ( ) ;
663+ } ,
664+ )
665+ . await
666+ . ok ( ) ;
714667
715668 log:: info!( "Max created at: {max_created_at:?}" ) ;
716669
@@ -721,15 +674,35 @@ async fn get_config(
721674 }
722675
723676 let query_filters = query_filters. into_inner ( ) ;
724- let mut version =
725- get_config_version ( & query_filters. version , & workspace_context, & mut conn) ?;
726-
727- let mut config = generate_config_from_version (
728- & mut version,
729- & mut conn,
730- & workspace_context. schema_name ,
731- ) ?;
732-
677+ let mut version = fetch_from_redis_else_writeback :: < Option < i64 > > (
678+ format ! ( "{}{CONFIG_VERSION_KEY_SUFFIX}" , schema_name. 0 ) ,
679+ & schema_name,
680+ state. redis . clone ( ) ,
681+ state. db_pool . clone ( ) ,
682+ |db_pool| {
683+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
684+ get_config_version ( & query_filters. version , & workspace_context, & mut conn)
685+ } ,
686+ )
687+ . await
688+ . map_err ( |e| unexpected_error ! ( "Config version not found due to: {}" , e) ) ?;
689+
690+ let mut config = fetch_from_redis_else_writeback :: < Config > (
691+ format ! ( "{}{CONFIG_KEY_SUFFIX}" , schema_name. 0 ) ,
692+ & schema_name,
693+ state. redis . clone ( ) ,
694+ state. db_pool . clone ( ) ,
695+ |db_pool| {
696+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
697+ generate_config_from_version (
698+ & mut version,
699+ & mut conn,
700+ & workspace_context. schema_name ,
701+ )
702+ } ,
703+ )
704+ . await
705+ . map_err ( |e| unexpected_error ! ( "failed to generate config: {}" , e) ) ?;
733706 config = apply_prefix_filter_to_config ( & query_filters. prefix , config) ?;
734707 let context = if req. method ( ) == actix_web:: http:: Method :: GET {
735708 dimension_params. into_inner ( )
@@ -739,9 +712,23 @@ async fn get_config(
739712 if !context. is_empty ( ) {
740713 config = config. filter_by_dimensions ( & context) ;
741714 }
742-
743715 add_last_modified_to_header ( max_created_at, is_smithy, & mut response) ;
744- add_audit_id_to_header ( & mut conn, & mut response, & workspace_context. schema_name ) ;
716+ if let Some ( audit_id) = fetch_from_redis_else_writeback :: < String > (
717+ format ! ( "{}{AUDIT_ID_KEY_SUFFIX}" , schema_name. 0 ) ,
718+ & schema_name,
719+ state. redis . clone ( ) ,
720+ state. db_pool . clone ( ) ,
721+ |db_pool| {
722+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
723+ fetch_audit_id ( & mut conn, & workspace_context. schema_name )
724+ . ok_or ( not_found ! ( "Audit ID not found" ) )
725+ } ,
726+ )
727+ . await
728+ . ok ( )
729+ {
730+ response. insert_header ( ( AppHeader :: XAuditId . to_string ( ) , audit_id) ) ;
731+ }
745732 add_config_version_to_header ( & version, & mut response) ;
746733 Ok ( response. json ( config) )
747734}
@@ -753,31 +740,65 @@ async fn get_resolved_config(
753740 req : HttpRequest ,
754741 body : Option < Json < ContextPayload > > ,
755742 merge_strategy : Header < MergeStrategy > ,
756- db_conn : DbConnection ,
757743 dimension_params : DimensionQuery < QueryMap > ,
758744 query_filters : superposition_query:: Query < ResolveConfigQuery > ,
759745 workspace_context : WorkspaceContext ,
746+ state : Data < AppState > ,
760747) -> superposition:: Result < HttpResponse > {
761- let DbConnection ( mut conn) = db_conn;
762748 let query_filters = query_filters. into_inner ( ) ;
763-
764- let max_created_at = get_max_created_at ( & mut conn, & workspace_context. schema_name )
765- . map_err ( |e| log:: error!( "failed to fetch max timestamp from event_log : {e}" ) )
766- . ok ( ) ;
749+ let schema_name = workspace_context. schema_name . clone ( ) ;
750+
751+ let max_created_at = fetch_from_redis_else_writeback :: < DateTime < Utc > > (
752+ format ! ( "{}{LAST_MODIFIED_KEY_SUFFIX}" , schema_name. 0 ) ,
753+ & schema_name,
754+ state. redis . clone ( ) ,
755+ state. db_pool . clone ( ) ,
756+ |db_pool| {
757+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
758+ get_max_created_at ( & mut conn, & schema_name) . map_err ( |e| {
759+ log:: error!( "failed to fetch max timestamp from event_log: {e}" ) ;
760+ db_error ! ( e)
761+ } )
762+ } ,
763+ )
764+ . await
765+ . ok ( ) ;
767766
768767 let is_not_modified = is_not_modified ( max_created_at, & req) ;
769768
770769 if is_not_modified {
771770 return Ok ( HttpResponse :: NotModified ( ) . finish ( ) ) ;
772771 }
773772
774- let mut config_version =
775- get_config_version ( & query_filters. version , & workspace_context, & mut conn) ?;
776- let mut config = generate_config_from_version (
777- & mut config_version,
778- & mut conn,
779- & workspace_context. schema_name ,
780- ) ?;
773+ let mut config_version = fetch_from_redis_else_writeback :: < Option < i64 > > (
774+ format ! ( "{}{CONFIG_VERSION_KEY_SUFFIX}" , schema_name. 0 ) ,
775+ & schema_name,
776+ state. redis . clone ( ) ,
777+ state. db_pool . clone ( ) ,
778+ |db_pool| {
779+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
780+ get_config_version ( & query_filters. version , & workspace_context, & mut conn)
781+ } ,
782+ )
783+ . await
784+ . map_err ( |e| unexpected_error ! ( "Config version not found due to: {}" , e) ) ?;
785+
786+ let mut config = fetch_from_redis_else_writeback :: < Config > (
787+ format ! ( "{}{CONFIG_KEY_SUFFIX}" , schema_name. 0 ) ,
788+ & schema_name,
789+ state. redis . clone ( ) ,
790+ state. db_pool . clone ( ) ,
791+ |db_pool| {
792+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
793+ generate_config_from_version (
794+ & mut config_version,
795+ & mut conn,
796+ & workspace_context. schema_name ,
797+ )
798+ } ,
799+ )
800+ . await
801+ . map_err ( |e| unexpected_error ! ( "failed to generate config: {}" , e) ) ?;
781802
782803 config = apply_prefix_filter_to_config ( & query_filters. prefix , config) ?;
783804
@@ -813,6 +834,11 @@ async fn get_resolved_config(
813834 } ;
814835
815836 if query_filters. resolve_remote . unwrap_or_default ( ) {
837+ let DbConnection ( mut conn) =
838+ get_db_connection ( state. db_pool . clone ( ) ) . map_err ( |e| {
839+ log:: error!( "failed to get db connection: {}" , e) ;
840+ unexpected_error ! ( "failed to get db connection" )
841+ } ) ?;
816842 query_data = QueryMap :: from ( evaluate_remote_cohorts (
817843 & config. dimensions ,
818844 & query_data,
@@ -834,7 +860,22 @@ async fn get_resolved_config(
834860 } ;
835861 let mut resp = HttpResponse :: Ok ( ) ;
836862 add_last_modified_to_header ( max_created_at, is_smithy, & mut resp) ;
837- add_audit_id_to_header ( & mut conn, & mut resp, & workspace_context. schema_name ) ;
863+ if let Some ( audit_id) = fetch_from_redis_else_writeback :: < String > (
864+ format ! ( "{}{AUDIT_ID_KEY_SUFFIX}" , schema_name. 0 ) ,
865+ & schema_name,
866+ state. redis . clone ( ) ,
867+ state. db_pool . clone ( ) ,
868+ |db_pool| {
869+ let DbConnection ( mut conn) = get_db_connection ( db_pool) ?;
870+ fetch_audit_id ( & mut conn, & workspace_context. schema_name )
871+ . ok_or ( not_found ! ( "Audit ID not found" ) )
872+ } ,
873+ )
874+ . await
875+ . ok ( )
876+ {
877+ resp. insert_header ( ( AppHeader :: XAuditId . to_string ( ) , audit_id) ) ;
878+ }
838879 add_config_version_to_header ( & config_version, & mut resp) ;
839880
840881 Ok ( resp. json ( response) )
0 commit comments