@@ -563,6 +563,7 @@ void dlm_rsb_scan(struct timer_list *timer)
563
563
list_del (& r -> res_slow_list );
564
564
rhashtable_remove_fast (& ls -> ls_rsbtbl , & r -> res_node ,
565
565
dlm_rhash_rsb_params );
566
+ rsb_clear_flag (r , RSB_HASHED );
566
567
567
568
/* ls_rsbtbl_lock is not needed when calling send_remove() */
568
569
write_unlock (& ls -> ls_rsbtbl_lock );
@@ -636,8 +637,14 @@ int dlm_search_rsb_tree(struct rhashtable *rhash, const void *name, int len,
636
637
637
638
static int rsb_insert (struct dlm_rsb * rsb , struct rhashtable * rhash )
638
639
{
639
- return rhashtable_insert_fast (rhash , & rsb -> res_node ,
640
- dlm_rhash_rsb_params );
640
+ int rv ;
641
+
642
+ rv = rhashtable_insert_fast (rhash , & rsb -> res_node ,
643
+ dlm_rhash_rsb_params );
644
+ if (!rv )
645
+ rsb_set_flag (rsb , RSB_HASHED );
646
+
647
+ return rv ;
641
648
}
642
649
643
650
/*
@@ -752,11 +759,23 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
752
759
do_inactive :
753
760
write_lock_bh (& ls -> ls_rsbtbl_lock );
754
761
755
- /* retry lookup under write lock to see if its still in inactive state
756
- * if not it's in active state and we relookup - unlikely path.
762
+ /*
763
+ * The expectation here is that the rsb will have HASHED and
764
+ * INACTIVE flags set, and that the rsb can be moved from
765
+ * inactive back to active again. However, between releasing
766
+ * the read lock and acquiring the write lock, this rsb could
767
+ * have been removed from rsbtbl, and had HASHED cleared, to
768
+ * be freed. To deal with this case, we would normally need
769
+ * to repeat dlm_search_rsb_tree while holding the write lock,
770
+ * but rcu allows us to simply check the HASHED flag, because
771
+ * the rcu read lock means the rsb will not be freed yet.
772
+ * If the HASHED flag is not set, then the rsb is being freed,
773
+ * so we add a new rsb struct. If the HASHED flag is set,
774
+ * and INACTIVE is not set, it means another thread has
775
+ * made the rsb active, as we're expecting to do here, and
776
+ * we just repeat the lookup (this will be very unlikely.)
757
777
*/
758
- error = dlm_search_rsb_tree (& ls -> ls_rsbtbl , name , len , & r );
759
- if (!error ) {
778
+ if (rsb_flag (r , RSB_HASHED )) {
760
779
if (!rsb_flag (r , RSB_INACTIVE )) {
761
780
write_unlock_bh (& ls -> ls_rsbtbl_lock );
762
781
goto retry ;
@@ -926,11 +945,8 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
926
945
do_inactive :
927
946
write_lock_bh (& ls -> ls_rsbtbl_lock );
928
947
929
- /* retry lookup under write lock to see if its still inactive.
930
- * if it's active, repeat lookup - unlikely path.
931
- */
932
- error = dlm_search_rsb_tree (& ls -> ls_rsbtbl , name , len , & r );
933
- if (!error ) {
948
+ /* See comment in find_rsb_dir. */
949
+ if (rsb_flag (r , RSB_HASHED )) {
934
950
if (!rsb_flag (r , RSB_INACTIVE )) {
935
951
write_unlock_bh (& ls -> ls_rsbtbl_lock );
936
952
goto retry ;
@@ -1012,25 +1028,70 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
1012
1028
return error ;
1013
1029
}
1014
1030
1031
+ /*
1032
+ * rsb rcu usage
1033
+ *
1034
+ * While rcu read lock is held, the rsb cannot be freed,
1035
+ * which allows a lookup optimization.
1036
+ *
1037
+ * Two threads are accessing the same rsb concurrently,
1038
+ * the first (A) is trying to use the rsb, the second (B)
1039
+ * is trying to free the rsb.
1040
+ *
1041
+ * thread A thread B
1042
+ * (trying to use rsb) (trying to free rsb)
1043
+ *
1044
+ * A1. rcu read lock
1045
+ * A2. rsbtbl read lock
1046
+ * A3. look up rsb in rsbtbl
1047
+ * A4. rsbtbl read unlock
1048
+ * B1. rsbtbl write lock
1049
+ * B2. look up rsb in rsbtbl
1050
+ * B3. remove rsb from rsbtbl
1051
+ * B4. clear rsb HASHED flag
1052
+ * B5. rsbtbl write unlock
1053
+ * B6. begin freeing rsb using rcu...
1054
+ *
1055
+ * (rsb is inactive, so try to make it active again)
1056
+ * A5. read rsb HASHED flag (safe because rsb is not freed yet)
1057
+ * A6. the rsb HASHED flag is not set, which it means the rsb
1058
+ * is being removed from rsbtbl and freed, so don't use it.
1059
+ * A7. rcu read unlock
1060
+ *
1061
+ * B7. ...finish freeing rsb using rcu
1062
+ * A8. create a new rsb
1063
+ *
1064
+ * Without the rcu optimization, steps A5-8 would need to do
1065
+ * an extra rsbtbl lookup:
1066
+ * A5. rsbtbl write lock
1067
+ * A6. look up rsb in rsbtbl, not found
1068
+ * A7. rsbtbl write unlock
1069
+ * A8. create a new rsb
1070
+ */
1071
+
1015
1072
static int find_rsb (struct dlm_ls * ls , const void * name , int len ,
1016
1073
int from_nodeid , unsigned int flags ,
1017
1074
struct dlm_rsb * * r_ret )
1018
1075
{
1019
1076
int dir_nodeid ;
1020
1077
uint32_t hash ;
1078
+ int rv ;
1021
1079
1022
1080
if (len > DLM_RESNAME_MAXLEN )
1023
1081
return - EINVAL ;
1024
1082
1025
1083
hash = jhash (name , len , 0 );
1026
1084
dir_nodeid = dlm_hash2nodeid (ls , hash );
1027
1085
1086
+ rcu_read_lock ();
1028
1087
if (dlm_no_directory (ls ))
1029
- return find_rsb_nodir (ls , name , len , hash , dir_nodeid ,
1088
+ rv = find_rsb_nodir (ls , name , len , hash , dir_nodeid ,
1030
1089
from_nodeid , flags , r_ret );
1031
1090
else
1032
- return find_rsb_dir (ls , name , len , hash , dir_nodeid ,
1091
+ rv = find_rsb_dir (ls , name , len , hash , dir_nodeid ,
1033
1092
from_nodeid , flags , r_ret );
1093
+ rcu_read_unlock ();
1094
+ return rv ;
1034
1095
}
1035
1096
1036
1097
/* we have received a request and found that res_master_nodeid != our_nodeid,
@@ -1187,8 +1248,8 @@ static void __dlm_master_lookup(struct dlm_ls *ls, struct dlm_rsb *r, int our_no
1187
1248
* . dlm_master_lookup RECOVER_MASTER (fix_master 1, from_master 0)
1188
1249
*/
1189
1250
1190
- int dlm_master_lookup (struct dlm_ls * ls , int from_nodeid , const char * name ,
1191
- int len , unsigned int flags , int * r_nodeid , int * result )
1251
+ static int _dlm_master_lookup (struct dlm_ls * ls , int from_nodeid , const char * name ,
1252
+ int len , unsigned int flags , int * r_nodeid , int * result )
1192
1253
{
1193
1254
struct dlm_rsb * r = NULL ;
1194
1255
uint32_t hash ;
@@ -1315,6 +1376,16 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
1315
1376
return error ;
1316
1377
}
1317
1378
1379
+ int dlm_master_lookup (struct dlm_ls * ls , int from_nodeid , const char * name ,
1380
+ int len , unsigned int flags , int * r_nodeid , int * result )
1381
+ {
1382
+ int rv ;
1383
+ rcu_read_lock ();
1384
+ rv = _dlm_master_lookup (ls , from_nodeid , name , len , flags , r_nodeid , result );
1385
+ rcu_read_unlock ();
1386
+ return rv ;
1387
+ }
1388
+
1318
1389
static void dlm_dump_rsb_hash (struct dlm_ls * ls , uint32_t hash )
1319
1390
{
1320
1391
struct dlm_rsb * r ;
@@ -4293,6 +4364,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
4293
4364
list_del (& r -> res_slow_list );
4294
4365
rhashtable_remove_fast (& ls -> ls_rsbtbl , & r -> res_node ,
4295
4366
dlm_rhash_rsb_params );
4367
+ rsb_clear_flag (r , RSB_HASHED );
4296
4368
write_unlock_bh (& ls -> ls_rsbtbl_lock );
4297
4369
4298
4370
free_inactive_rsb (r );
0 commit comments