5050#include "dyn_util.h"
5151
5252static rstatus_t msg_quorum_rsp_handler (struct context * ctx , struct msg * req , struct msg * rsp );
53+ static rstatus_t msg_each_quorum_rsp_handler (struct context * ctx , struct msg * req ,
54+ struct msg * rsp );
5355static msg_response_handler_t msg_get_rsp_handler (struct context * ctx , struct msg * req );
5456
5557static rstatus_t rewrite_query_if_necessary (struct msg * * req ,
@@ -653,8 +655,10 @@ rstatus_t req_forward_to_peer(struct context *ctx, struct conn *c_conn,
653655 }
654656
655657 if (!(same_dc && same_rack ) || force_swallow ) {
656- // Swallow responses from remote racks or DCs.
657- rack_msg -> swallow = true;
658+ if (req -> consistency != DC_EACH_SAFE_QUORUM || force_swallow ) {
659+ // Swallow responses from remote racks or DCs.
660+ rack_msg -> swallow = true;
661+ }
658662 }
659663
660664 // Get a connection to the node.
@@ -701,13 +705,20 @@ rstatus_t req_forward_to_peer(struct context *ctx, struct conn *c_conn,
701705 return status ;
702706}
703707
704- void req_forward_all_local_racks (struct context * ctx , struct conn * c_conn ,
708+ void req_forward_all_racks_for_dc (struct context * ctx , struct conn * c_conn ,
705709 struct msg * req , struct mbuf * orig_mbuf ,
706710 uint8_t * key , uint32_t keylen ,
707711 struct datacenter * dc ) {
708712 uint8_t rack_cnt = (uint8_t )array_n (& dc -> racks );
709713 uint8_t rack_index ;
710- init_response_mgr (& req -> rspmgr , req , req -> is_read , rack_cnt , c_conn );
714+
715+ if (req -> rspmgrs_inited == false) {
716+ if (req -> consistency == DC_EACH_SAFE_QUORUM ) {
717+ init_response_mgr_all_dcs (ctx , req , c_conn , dc );
718+ } else {
719+ init_response_mgr (req , & req -> rspmgr , rack_cnt , c_conn );
720+ }
721+ }
711722 log_info ("%s %s same DC racks:%d expect replies %d" , print_obj (c_conn ),
712723 print_obj (req ), rack_cnt , req -> rspmgr .max_responses );
713724
@@ -732,6 +743,10 @@ static bool request_send_to_all_dcs(struct msg *req) {
732743 // There is a routing override
733744 if (req -> msg_routing != ROUTING_NORMAL ) return false;
734745
746+ // Under DC_EACH_SAFE_QUORUM, we need to send reads and writes to all
747+ // DCs.
748+ if (req -> consistency == DC_EACH_SAFE_QUORUM ) return true;
749+
735750 // Reads are not propagated
736751 if (req -> is_read ) return false;
737752
@@ -753,8 +768,11 @@ static bool request_send_to_all_local_racks(struct msg *req) {
753768 // A write should go to all racks
754769 if (!req -> is_read ) return true;
755770
756- if ((req -> consistency == DC_QUORUM ) || (req -> consistency == DC_SAFE_QUORUM ))
771+ if ((req -> consistency == DC_QUORUM )
772+ || (req -> consistency == DC_SAFE_QUORUM )
773+ || (req -> consistency == DC_EACH_SAFE_QUORUM )) {
757774 return true;
775+ }
758776 return false;
759777}
760778
@@ -794,6 +812,17 @@ static void req_forward_remote_dc(struct context *ctx, struct conn *c_conn,
794812 const uint32_t rack_cnt = array_n (& dc -> racks );
795813 if (rack_cnt == 0 ) return ;
796814
815+ if (req -> consistency == DC_EACH_SAFE_QUORUM ) {
816+ // Under 'DC_EACH_SAFE_QUORUM', we want to hear back from at least
817+ // quorum racks in each DC, so send it to all racks in remote DCs.
818+ req_forward_all_racks_for_dc (ctx , c_conn , req , orig_mbuf , key , keylen , dc );
819+ return ;
820+ }
821+
822+ // If we're not expecting a consistency level of 'DC_EACH_SAFE_QUORUM', then
823+ // we send it to only to the preselected rack in the remote DCs. If that's not
824+ // reachable, we failover to another in the remote DC.
825+
797826 // Pick the preferred pre-selected rack for this DC.
798827 struct rack * rack = dc -> preselected_rack_for_replication ;
799828 if (rack == NULL ) rack = array_get (& dc -> racks , 0 );
@@ -846,7 +875,7 @@ static void req_forward_local_dc(struct context *ctx, struct conn *c_conn,
846875 req -> rsp_handler = msg_get_rsp_handler (ctx , req );
847876 if (request_send_to_all_local_racks (req )) {
848877 // send request to all local racks
849- req_forward_all_local_racks (ctx , c_conn , req , orig_mbuf , key , keylen , dc );
878+ req_forward_all_racks_for_dc (ctx , c_conn , req , orig_mbuf , key , keylen , dc );
850879 } else {
851880 // send request to only local token owner
852881 struct rack * rack =
@@ -1002,7 +1031,7 @@ rstatus_t rewrite_query_if_necessary(struct msg **req, struct context *ctx) {
10021031 * the new query and free up the original msg.
10031032 *
10041033 */
1005- rstatus_t rewrite_query_with_timestamp_md (struct msg * * req , struct context * ctx ) {
1034+ static rstatus_t rewrite_query_with_timestamp_md (struct msg * * req , struct context * ctx ) {
10061035
10071036 if (is_read_repairs_enabled () == false) return DN_OK ;
10081037
@@ -1099,6 +1128,8 @@ static msg_response_handler_t msg_get_rsp_handler(struct context *ctx, struct ms
10991128 // Check if its quorum
11001129 if ((req -> consistency == DC_QUORUM ) || (req -> consistency == DC_SAFE_QUORUM )) {
11011130 return msg_quorum_rsp_handler ;
1131+ } else if (req -> consistency == DC_EACH_SAFE_QUORUM ) {
1132+ return msg_each_quorum_rsp_handler ;
11021133 }
11031134 }
11041135
@@ -1149,6 +1180,104 @@ static rstatus_t msg_quorum_rsp_handler(struct context *ctx, struct msg *req,
11491180 return DN_OK ;
11501181}
11511182
1183+ static int find_rspmgr_idx (struct context * ctx , struct response_mgr * * rspmgrs ,
1184+ struct string * target_dc_name ) {
1185+ int num_dcs = (int ) array_n (& ctx -> pool .datacenters );
1186+
1187+ int i = 0 ;
1188+ for (i = 0 ; i < num_dcs ; ++ i ) {
1189+ struct response_mgr * rspmgr = rspmgrs [i ];
1190+ if (string_compare (& rspmgr -> dc_name , target_dc_name ) == 0 ) {
1191+ return i ;
1192+ }
1193+ }
1194+ return -1 ;
1195+ }
1196+
1197+ static bool all_rspmgrs_done (struct context * ctx , struct response_mgr * * rspmgrs ) {
1198+ int num_dcs = (int ) array_n (& ctx -> pool .datacenters );
1199+ int i = 0 ;
1200+ for (i = 0 ; i < num_dcs ; ++ i ) {
1201+ struct response_mgr * rspmgr = rspmgrs [i ];
1202+ if (!rspmgr -> done ) return false;
1203+ }
1204+
1205+ return true;
1206+ }
1207+
1208+ static struct msg * all_rspmgrs_get_response (struct context * ctx , struct msg * req ) {
1209+ int num_dcs = (int ) array_n (& ctx -> pool .datacenters );
1210+ struct msg * rsp = NULL ;
1211+ int i ;
1212+ for (i = 0 ; i < num_dcs ; ++ i ) {
1213+ struct response_mgr * rspmgr = req -> additional_each_rspmgrs [i ];
1214+ struct msg * dc_rsp = NULL ;
1215+ if (!rsp ) {
1216+ rsp = rspmgr_get_response (ctx , rspmgr );
1217+ ASSERT (rsp );
1218+ } else if (rsp -> is_error ) {
1219+ // If any of the DCs errored out, we just clean up responses from the
1220+ // remaining DCs.
1221+ rspmgr_free_other_responses (rspmgr , NULL );
1222+ continue ;
1223+ } else {
1224+ ASSERT (rsp -> is_error == false);
1225+ // If the DCs we've processed so far have not seen errors, we need to
1226+ // make sure that the remaining DCs don't have errors too.
1227+ dc_rsp = rspmgr_get_response (ctx , rspmgr );
1228+ ASSERT (dc_rsp );
1229+ if (dc_rsp -> is_error ) {
1230+ rsp_put (rsp );
1231+ rsp = dc_rsp ;
1232+ } else {
1233+ // If it's not an error, clear all responses from this DC.
1234+ rspmgr_free_other_responses (rspmgr , NULL );
1235+ continue ;
1236+ }
1237+ }
1238+
1239+ rspmgr_free_other_responses (rspmgr , rsp );
1240+ rsp -> peer = req ;
1241+ req -> selected_rsp = rsp ;
1242+ req -> error_code = rsp -> error_code ;
1243+ req -> is_error = rsp -> is_error ;
1244+ req -> dyn_error_code = rsp -> dyn_error_code ;
1245+
1246+ }
1247+
1248+ return rsp ;
1249+ }
1250+
1251+ static rstatus_t msg_each_quorum_rsp_handler (struct context * ctx , struct msg * req ,
1252+ struct msg * rsp ) {
1253+
1254+ if (all_rspmgrs_done (ctx , req -> additional_each_rspmgrs )) return swallow_extra_rsp (req , rsp );
1255+
1256+ int rspmgr_idx = -1 ;
1257+ struct conn * rsp_conn = rsp -> owner ;
1258+ if (rsp_conn == NULL ) {
1259+ rspmgr_idx = 0 ;
1260+ } else if (rsp_conn -> type == CONN_DNODE_PEER_SERVER ) {
1261+ struct node * peer_instance = (struct node * ) rsp_conn -> owner ;
1262+ struct string * peer_dc_name = & peer_instance -> dc ;
1263+ rspmgr_idx = find_rspmgr_idx (ctx , req -> additional_each_rspmgrs , peer_dc_name );
1264+ if (rspmgr_idx == -1 ) {
1265+ log_error ("Could not find which DC response was from" );
1266+ }
1267+ } else if (rsp_conn -> type == CONN_SERVER ) {
1268+ // If this is a 'CONN_SERVER' connection, then it is from the same DC.
1269+ rspmgr_idx = 0 ;
1270+ }
1271+
1272+ struct response_mgr * rspmgr = req -> additional_each_rspmgrs [rspmgr_idx ];
1273+ rspmgr_submit_response (rspmgr , rsp );
1274+ if (!rspmgr_check_is_done (rspmgr )) return DN_EAGAIN ;
1275+ if (!all_rspmgrs_done (ctx , req -> additional_each_rspmgrs )) return DN_EAGAIN ;
1276+
1277+ rsp = all_rspmgrs_get_response (ctx , req );
1278+ return DN_OK ;
1279+ }
1280+
11521281static void req_client_enqueue_omsgq (struct context * ctx , struct conn * conn ,
11531282 struct msg * req ) {
11541283 ASSERT (req -> is_request );
0 commit comments