@@ -181,6 +181,9 @@ static inline char *clusterLinkGetHumanNodeName(clusterLink *link) {
181181
182182#define isSlotUnclaimed (slot ) \
183183 (server.cluster->slots[slot] == NULL || bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))
184+ /* Treating slot bitmaps as 8-byte words to speedup iteration */
185+ #define CLUSTER_SLOT_WORDS (CLUSTER_SLOTS / 64)
186+ #define SLOT_WORD_OFFSET (w ) ((w) << 3)
184187
185188#define RCVBUF_INIT_LEN 1024
186189#define RCVBUF_MIN_READ_LEN 14
@@ -3619,6 +3622,16 @@ int clusterIsValidPacket(clusterLink *link) {
36193622 return 1 ;
36203623}
36213624
3625+ /* When iterating through the slot bitmap, group every 64 bits as
3626+ * a word to speedup. */
3627+ static inline int clusterExtractSlotFromWord (uint64_t * slot_word , size_t slot_word_index ) {
3628+ /* Get the index of the least-significant set bit, in this 64-bit word */
3629+ const unsigned bit = (unsigned )__builtin_ctzll (* slot_word );
3630+ const int slot = (int )((slot_word_index << 6 ) | bit );
3631+ * slot_word &= * slot_word - 1 ; /* clear that bit */
3632+ return slot ;
3633+ }
3634+
36223635/* When this function is called, there is a packet to process starting
36233636 * at link->rcvbuf. Releasing the buffer is up to the caller, so this
36243637 * function should just handle the higher level stuff of processing the
@@ -4139,20 +4152,27 @@ int clusterProcessPacket(clusterLink *link) {
41394152 * new configuration, so other nodes that have an updated table must
41404153 * do it. In this way A will stop to act as a primary (or can try to
41414154 * failover if there are the conditions to win the election). */
4142- for (int j = 0 ; j < CLUSTER_SLOTS ; j ++ ) {
4143- if (bitmapTestBit (hdr -> myslots , j )) {
4144- if (server .cluster -> slots [j ] == sender || isSlotUnclaimed (j )) continue ;
4145- if (server .cluster -> slots [j ]-> configEpoch > sender_claimed_config_epoch ) {
4155+ bool found_new_owner = false;
4156+ for (size_t w = 0 ; w < CLUSTER_SLOT_WORDS && !found_new_owner ; w ++ ) {
4157+ uint64_t word ;
4158+ memcpy (& word , hdr -> myslots + SLOT_WORD_OFFSET (w ), sizeof (word ));
4159+ while (word ) {
4160+ const int slot = clusterExtractSlotFromWord (& word , w );
4161+
4162+ clusterNode * slot_owner = server .cluster -> slots [slot ];
4163+ if (slot_owner == sender || isSlotUnclaimed (slot )) continue ;
4164+ if (slot_owner -> configEpoch > sender_claimed_config_epoch ) {
41464165 serverLog (LL_VERBOSE ,
41474166 "Node %.40s (%s) has old slots configuration, sending "
41484167 "an UPDATE message about %.40s (%s)" ,
41494168 sender -> name , sender -> human_nodename ,
4150- server . cluster -> slots [ j ] -> name , server . cluster -> slots [ j ] -> human_nodename );
4151- clusterSendUpdate (sender -> link , server . cluster -> slots [ j ] );
4169+ slot_owner -> name , slot_owner -> human_nodename );
4170+ clusterSendUpdate (sender -> link , slot_owner );
41524171
41534172 /* TODO: instead of exiting the loop send every other
41544173 * UPDATE packet for other nodes that are the new owner
41554174 * of sender's slots. */
4175+ found_new_owner = true;
41564176 break ;
41574177 }
41584178 }
@@ -5050,7 +5070,8 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
50505070 uint64_t requestConfigEpoch = ntohu64 (request -> configEpoch );
50515071 unsigned char * claimed_slots = request -> myslots ;
50525072 int force_ack = request -> mflags [0 ] & CLUSTERMSG_FLAG0_FORCEACK ;
5053- int j ;
5073+ int slot ;
5074+ clusterNode * slot_owner ;
50545075
50555076 /* IF we are not a primary serving at least 1 slot, we don't have the
50565077 * right to vote, as the cluster size is the number
@@ -5096,30 +5117,35 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
50965117 /* The replica requesting the vote must have a configEpoch for the claimed
50975118 * slots that is >= the one of the primaries currently serving the same
50985119 * slots in the current configuration. */
5099- for (j = 0 ; j < CLUSTER_SLOTS ; j ++ ) {
5100- if (bitmapTestBit (claimed_slots , j ) == 0 ) continue ;
5101- if (isSlotUnclaimed (j ) || server .cluster -> slots [j ]-> configEpoch <= requestConfigEpoch ) {
5102- continue ;
5120+ for (size_t w = 0 ; w < CLUSTER_SLOT_WORDS ; w ++ ) {
5121+ uint64_t word ;
5122+ memcpy (& word , claimed_slots + SLOT_WORD_OFFSET (w ), sizeof (word ));
5123+ while (word ) {
5124+ slot = clusterExtractSlotFromWord (& word , w );
5125+
5126+ if (isSlotUnclaimed (slot ) || server .cluster -> slots [slot ]-> configEpoch <= requestConfigEpoch ) {
5127+ continue ;
5128+ }
5129+ slot_owner = server .cluster -> slots [slot ];
5130+ /* If we reached this point we found a slot that in our current slots
5131+ * is served by a primary with a greater configEpoch than the one claimed
5132+ * by the replica requesting our vote. Refuse to vote for this replica. */
5133+ serverLog (LL_WARNING ,
5134+ "Failover auth denied to %.40s (%s): "
5135+ "slot %d epoch (%llu) > reqConfigEpoch (%llu)" ,
5136+ node -> name , node -> human_nodename , slot , (unsigned long long )slot_owner -> configEpoch ,
5137+ (unsigned long long )requestConfigEpoch );
5138+
5139+ /* Send an UPDATE message to the replica. After receiving the UPDATE message,
5140+ * the replica will update the slots config so that it can initiate a failover
5141+ * again later. Otherwise the replica will never get votes if the primary is down. */
5142+ serverLog (LL_VERBOSE ,
5143+ "Node %.40s (%s) has old slots configuration, sending "
5144+ "an UPDATE message about %.40s (%s)" ,
5145+ node -> name , node -> human_nodename , slot_owner -> name , slot_owner -> human_nodename );
5146+ clusterSendUpdate (node -> link , slot_owner );
5147+ return ;
51035148 }
5104- /* If we reached this point we found a slot that in our current slots
5105- * is served by a primary with a greater configEpoch than the one claimed
5106- * by the replica requesting our vote. Refuse to vote for this replica. */
5107- serverLog (LL_WARNING ,
5108- "Failover auth denied to %.40s (%s): "
5109- "slot %d epoch (%llu) > reqConfigEpoch (%llu)" ,
5110- node -> name , node -> human_nodename , j , (unsigned long long )server .cluster -> slots [j ]-> configEpoch ,
5111- (unsigned long long )requestConfigEpoch );
5112-
5113- /* Send an UPDATE message to the replica. After receiving the UPDATE message,
5114- * the replica will update the slots config so that it can initiate a failover
5115- * again later. Otherwise the replica will never get votes if the primary is down. */
5116- serverLog (LL_VERBOSE ,
5117- "Node %.40s (%s) has old slots configuration, sending "
5118- "an UPDATE message about %.40s (%s)" ,
5119- node -> name , node -> human_nodename ,
5120- server .cluster -> slots [j ]-> name , server .cluster -> slots [j ]-> human_nodename );
5121- clusterSendUpdate (node -> link , server .cluster -> slots [j ]);
5122- return ;
51235149 }
51245150
51255151 /* We can vote for this replica. */
0 commit comments