1414import java .util .UUID ;
1515import java .util .concurrent .ConcurrentHashMap ;
1616import java .util .concurrent .ConcurrentMap ;
17+ import java .util .concurrent .locks .Lock ;
18+ import java .util .concurrent .locks .ReentrantReadWriteLock ;
1719import org .slf4j .Logger ;
1820import org .slf4j .LoggerFactory ;
1921
2527public class TabletMap {
2628 private static final Logger logger = LoggerFactory .getLogger (TabletMap .class );
2729
28- private final ConcurrentMap <KeyspaceTableNamePair , NavigableSet <Tablet >> mapping ;
30+ // There are no additional locking mechanisms for the mapping field itself, however each TabletSet
31+ // inside has its own ReadWriteLock that should be used when dealing with its internals.
32+ private final ConcurrentMap <KeyspaceTableNamePair , TabletSet > mapping ;
2933
3034 private final Cluster .Manager cluster ;
3135
@@ -34,7 +38,7 @@ public class TabletMap {
3438 private TypeCodec <TupleValue > tabletPayloadCodec = null ;
3539
3640 public TabletMap (
37- Cluster .Manager cluster , ConcurrentMap <KeyspaceTableNamePair , NavigableSet < Tablet > > mapping ) {
41+ Cluster .Manager cluster , ConcurrentMap <KeyspaceTableNamePair , TabletSet > mapping ) {
3842 this .cluster = cluster ;
3943 this .mapping = mapping ;
4044 }
@@ -46,9 +50,9 @@ public static TabletMap emptyMap(Cluster.Manager cluster) {
4650 /**
4751 * Returns the mapping of tables to their tablets.
4852 *
49- * @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
53+ * @return the Map keyed by (keyspace,table) pairs with {@link TabletSet} as value type.
5054 */
51- public Map <KeyspaceTableNamePair , NavigableSet < Tablet > > getMapping () {
55+ public Map <KeyspaceTableNamePair , TabletSet > getMapping () {
5256 return mapping ;
5357 }
5458
@@ -68,28 +72,34 @@ public Set<UUID> getReplicas(String keyspace, String table, long token) {
6872 return Collections .emptySet ();
6973 }
7074
71- NavigableSet < Tablet > set = mapping .get (key );
72- if (set == null ) {
75+ TabletSet tabletSet = mapping .get (key );
76+ if (tabletSet == null ) {
7377 logger .trace (
7478 "There is no tablets for {}.{} in this mapping. Returning empty set." , keyspace , table );
7579 return Collections .emptySet ();
7680 }
77- Tablet row = mapping .get (key ).ceiling (Tablet .malformedTablet (token ));
78- if (row == null || row .firstToken >= token ) {
79- logger .trace (
80- "Could not find tablet for {}.{} that owns token {}. Returning empty set." ,
81- keyspace ,
82- table ,
83- token );
84- return Collections .emptySet ();
85- }
81+ Lock readLock = tabletSet .lock .readLock ();
82+ try {
83+ readLock .lock ();
84+ Tablet row = mapping .get (key ).tablets .ceiling (Tablet .malformedTablet (token ));
85+ if (row == null || row .firstToken >= token ) {
86+ logger .trace (
87+ "Could not find tablet for {}.{} that owns token {}. Returning empty set." ,
88+ keyspace ,
89+ table ,
90+ token );
91+ return Collections .emptySet ();
92+ }
8693
87- HashSet <UUID > uuidSet = new HashSet <>();
88- for (HostShardPair hostShardPair : row .replicas ) {
89- if (cluster .metadata .getHost (hostShardPair .getHost ()) != null )
90- uuidSet .add (hostShardPair .getHost ());
94+ HashSet <UUID > uuidSet = new HashSet <>();
95+ for (HostShardPair hostShardPair : row .replicas ) {
96+ if (cluster .metadata .getHost (hostShardPair .getHost ()) != null )
97+ uuidSet .add (hostShardPair .getHost ());
98+ }
99+ return uuidSet ;
100+ } finally {
101+ readLock .unlock ();
91102 }
92- return uuidSet ;
93103 }
94104
95105 /**
@@ -121,46 +131,47 @@ void processTabletsRoutingV1Payload(String keyspace, String table, ByteBuffer pa
121131 HostShardPair hostShardPair = new HostShardPair (tuple .getUUID (0 ), tuple .getInt (1 ));
122132 replicas .add (hostShardPair );
123133 }
124-
125- // Working on a copy to avoid concurrent modification of the same set
126- NavigableSet <Tablet > existingTablets =
127- new TreeSet <>(mapping .computeIfAbsent (ktPair , k -> new TreeSet <>()));
128-
129- // Single tablet token range is represented by (firstToken, lastToken] interval
130- // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing
131- // tablets
132- // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted
133- // according
134- // to their lastTokens.
135-
136- // First sweep: remove all tablets whose lastToken is inside this interval
137- Iterator <Tablet > it =
138- existingTablets .headSet (Tablet .malformedTablet (lastToken ), true ).descendingIterator ();
139- while (it .hasNext ()) {
140- Tablet tablet = it .next ();
141- if (tablet .lastToken <= firstToken ) {
142- break ;
134+ Tablet newTablet = new Tablet (keyspace , null , table , firstToken , lastToken , replicas );
135+
136+ TabletSet tabletSet = mapping .computeIfAbsent (ktPair , k -> new TabletSet ());
137+ Lock writeLock = tabletSet .lock .writeLock ();
138+ try {
139+ writeLock .lock ();
140+ NavigableSet <Tablet > currentTablets = tabletSet .tablets ;
141+ // Single tablet token range is represented by (firstToken, lastToken] interval
142+ // We need to do two sweeps: remove overlapping tablets by looking at lastToken of existing
143+ // tablets
144+ // and then by looking at firstToken of existing tablets. Currently, the tablets are sorted
145+ // according
146+ // to their lastTokens.
147+
148+ // First sweep: remove all tablets whose lastToken is inside this interval
149+ Iterator <Tablet > it = currentTablets .headSet (newTablet , true ).descendingIterator ();
150+ while (it .hasNext ()) {
151+ Tablet tablet = it .next ();
152+ if (tablet .lastToken <= firstToken ) {
153+ break ;
154+ }
155+ it .remove ();
143156 }
144- it .remove ();
145- }
146157
147- // Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken,
148- // lastToken]
149- // After the first sweep, this theoretically should remove at most 1 tablet
150- it = existingTablets .tailSet (Tablet .malformedTablet (lastToken ), true ).iterator ();
151- while (it .hasNext ()) {
152- Tablet tablet = it .next ();
153- if (tablet .firstToken >= lastToken ) {
154- break ;
158+ // Second sweep: remove all tablets whose firstToken is inside this tuple's (firstToken,
159+ // lastToken]
160+ // After the first sweep, this theoretically should remove at most 1 tablet
161+ it = currentTablets .tailSet (newTablet , true ).iterator ();
162+ while (it .hasNext ()) {
163+ Tablet tablet = it .next ();
164+ if (tablet .firstToken >= lastToken ) {
165+ break ;
166+ }
167+ it .remove ();
155168 }
156- it .remove ();
157- }
158169
159- // Add new (now) non-overlapping tablet
160- existingTablets .add (new Tablet ( keyspace , null , table , firstToken , lastToken , replicas ) );
161-
162- // Set the updated result in the main map
163- mapping . put ( ktPair , existingTablets );
170+ // Add new (now) non-overlapping tablet
171+ currentTablets .add (newTablet );
172+ } finally {
173+ writeLock . unlock ();
174+ }
164175 }
165176
166177 public TupleType getPayloadOuterTuple () {
@@ -258,6 +269,18 @@ public int hashCode() {
258269 }
259270 }
260271
272+ /**
273+ * Set of tablets bundled with ReadWriteLock to allow concurrent modification for different sets.
274+ */
275+ public static class TabletSet {
276+ final NavigableSet <Tablet > tablets ;
277+ final ReentrantReadWriteLock lock = new ReentrantReadWriteLock ();
278+
279+ public TabletSet () {
280+ this .tablets = new TreeSet <>();
281+ }
282+ }
283+
261284 /**
262285 * Represents a single tablet created from tablets-routing-v1 custom payload. Its {@code
263286 * compareTo} implementation intentionally relies solely on {@code lastToken} in order to allow
0 commit comments