1919package org .apache .cassandra .service .paxos .cleanup ;
2020
2121import java .util .Collection ;
22+ import java .util .Comparator ;
2223import java .util .Map ;
24+ import java .util .PriorityQueue ;
25+ import java .util .Queue ;
2326import java .util .UUID ;
2427import java .util .concurrent .ConcurrentHashMap ;
2528
2629import com .google .common .base .Preconditions ;
27- import com .google .common .util .concurrent .Uninterruptibles ;
2830import org .slf4j .Logger ;
2931import org .slf4j .LoggerFactory ;
3032
33+ import org .apache .cassandra .concurrent .ScheduledExecutors ;
3134import org .apache .cassandra .config .DatabaseDescriptor ;
3235import org .apache .cassandra .db .ConsistencyLevel ;
3336import org .apache .cassandra .db .DecoratedKey ;
@@ -58,6 +61,37 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon
5861 private static final Logger logger = LoggerFactory .getLogger (PaxosCleanupLocalCoordinator .class );
5962 private static final UUID INTERNAL_SESSION = new UUID (0 , 0 );
6063
64+ private static class DelayedRepair
65+ {
66+ private final UncommittedPaxosKey uncommitted ;
67+ private final long scheduledAtNanos ;
68+
69+ private static Comparator <DelayedRepair > PRIORITY_COMPARATOR = new Comparator <DelayedRepair >()
70+ {
71+ @ Override
72+ public int compare (DelayedRepair o1 , DelayedRepair o2 )
73+ {
74+ long delta = o1 .scheduledAtNanos - o2 .scheduledAtNanos ;
75+ if (delta > 0 )
76+ return 1 ;
77+ if (delta < 0 )
78+ return -1 ;
79+ return 0 ;
80+ }
81+ };
82+
83+ public DelayedRepair (UncommittedPaxosKey uncommitted , long sleepMillis )
84+ {
85+ this .uncommitted = uncommitted ;
86+ this .scheduledAtNanos = Clock .Global .nanoTime () + MILLISECONDS .toNanos (sleepMillis );
87+ }
88+
89+ public boolean isRunnable ()
90+ {
91+ return Clock .Global .nanoTime () - scheduledAtNanos > 0 ;
92+ }
93+ }
94+
6195 private final UUID session ;
6296 private final TableId tableId ;
6397 private final TableMetadata table ;
@@ -69,6 +103,7 @@ public class PaxosCleanupLocalCoordinator extends AsyncFuture<PaxosCleanupRespon
69103 private final boolean autoRepair ;
70104
71105 private final Map <DecoratedKey , AbstractPaxosRepair > inflight = new ConcurrentHashMap <>();
106+ private final Queue <DelayedRepair > delayed = new PriorityQueue <>(DelayedRepair .PRIORITY_COMPARATOR );
72107 private final PaxosTableRepairs tableRepairs ;
73108
74109 private PaxosCleanupLocalCoordinator (SharedContext ctx , UUID session , TableId tableId , Collection <Range <Token >> ranges , CloseableIterator <UncommittedPaxosKey > uncommittedIter , boolean autoRepair )
@@ -125,11 +160,40 @@ public static PaxosCleanupLocalCoordinator createForAutoRepair(SharedContext ctx
125160 return new PaxosCleanupLocalCoordinator (ctx , INTERNAL_SESSION , tableId , ranges , iterator , true );
126161 }
127162
163+ /**
164+ * Wait to repair things that are still potentially executing at the original coordinator to avoid
165+ * causing timeouts. This should only have to happen at most a few times when the repair starts
166+ */
167+ private boolean maybeDelay (UncommittedPaxosKey uncommitted )
168+ {
169+ if (!DatabaseDescriptor .getPaxosRepairRaceWait ())
170+ return false ;
171+
172+
173+ long txnTimeoutMillis = Math .max (getCasContentionTimeout (MILLISECONDS ), getWriteRpcTimeout (MILLISECONDS ));
174+ long nowMillis = Clock .Global .currentTimeMillis ();
175+ long ballotElapsedMillis = nowMillis - MICROSECONDS .toMillis (uncommitted .ballot ().unixMicros ());
176+
177+ if (ballotElapsedMillis < 0 && Math .abs (ballotElapsedMillis ) > SECONDS .toMillis (1 ))
178+ logger .warn ("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}" , uncommitted .ballot ());
179+
180+ if (ballotElapsedMillis >= txnTimeoutMillis )
181+ return false ;
182+
183+ long sleepMillis = txnTimeoutMillis - ballotElapsedMillis ;
184+ logger .info ("Paxos auto repair encountered a potentially in progress ballot, sleeping {}ms to allow the in flight operation to finish" , sleepMillis );
185+
186+ delayed .add (new DelayedRepair (uncommitted , sleepMillis ));
187+ ScheduledExecutors .scheduledFastTasks .schedule (this ::scheduleKeyRepairsOrFinish , sleepMillis , MILLISECONDS );
188+
189+ return true ;
190+ }
191+
128192 /**
129193 * Schedule as many key repairs as we can, up to the paralellism limit. If no repairs are scheduled and
130194 * none are in flight when the iterator is exhausted, the session will be finished
131195 */
132- private void scheduleKeyRepairsOrFinish ()
196+ private synchronized void scheduleKeyRepairsOrFinish ()
133197 {
134198 int parallelism = DatabaseDescriptor .getPaxosRepairParallelism ();
135199 Preconditions .checkArgument (parallelism > 0 );
@@ -141,18 +205,33 @@ private void scheduleKeyRepairsOrFinish()
141205 return ;
142206 }
143207
144- long txnTimeoutMicros = Math .max (getCasContentionTimeout (MICROSECONDS ), getWriteRpcTimeout (MICROSECONDS ));
145- boolean waitForCoordinator = DatabaseDescriptor .getPaxosRepairRaceWait ();
146- while (inflight .size () < parallelism && uncommittedIter .hasNext ())
147- repairKey (uncommittedIter .next (), txnTimeoutMicros , waitForCoordinator );
148-
208+ while (inflight .size () < parallelism && !isDone ())
209+ {
210+ if (!delayed .isEmpty () && delayed .peek ().isRunnable ())
211+ {
212+ DelayedRepair delayedRepair = delayed .remove ();
213+ repairKey (delayedRepair .uncommitted );
214+ }
215+ else if (uncommittedIter .hasNext ())
216+ {
217+ UncommittedPaxosKey uncommitted = uncommittedIter .next ();
218+ if (!maybeDelay (uncommitted ))
219+ {
220+ repairKey (uncommitted );
221+ }
222+ }
223+ else
224+ {
225+ break ;
226+ }
227+ }
149228 }
150229
151- if (inflight .isEmpty ())
230+ if (inflight .isEmpty () && delayed . isEmpty () )
152231 finish ();
153232 }
154233
155- private boolean repairKey (UncommittedPaxosKey uncommitted , long txnTimeoutMicros , boolean waitForCoordinator )
234+ private boolean repairKey (UncommittedPaxosKey uncommitted )
156235 {
157236 logger .trace ("repairing {}" , uncommitted );
158237 Preconditions .checkState (!inflight .containsKey (uncommitted .getKey ()));
@@ -163,9 +242,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros
163242 if (consistency == null )
164243 return false ;
165244
166- if (waitForCoordinator )
167- maybeWaitForOriginalCoordinator (uncommitted , txnTimeoutMicros );
168-
169245 inflight .put (uncommitted .getKey (), tableRepairs .startOrGetOrQueue (uncommitted .getKey (), uncommitted .ballot (), uncommitted .getConsistencyLevel (), table , result -> {
170246 if (result .wasSuccessful ())
171247 onKeyFinish (uncommitted .getKey ());
@@ -175,24 +251,6 @@ private boolean repairKey(UncommittedPaxosKey uncommitted, long txnTimeoutMicros
175251 return true ;
176252 }
177253
178- /**
179- * Wait to repair things that are still potentially executing at the original coordinator to avoid
180- * causing timeouts. This should only have to happen at most a few times when the repair starts
181- */
182- private static void maybeWaitForOriginalCoordinator (UncommittedPaxosKey uncommitted , long txnTimeoutMicros )
183- {
184- long nowMicros = MILLISECONDS .toMicros (Clock .Global .currentTimeMillis ());
185- long ballotElapsedMicros = nowMicros - uncommitted .ballot ().unixMicros ();
186- if (ballotElapsedMicros < 0 && Math .abs (ballotElapsedMicros ) > SECONDS .toMicros (1 ))
187- logger .warn ("Encountered ballot that is more than 1 second in the future, is there a clock sync issue? {}" , uncommitted .ballot ());
188- if (ballotElapsedMicros < txnTimeoutMicros )
189- {
190- long sleepMicros = txnTimeoutMicros - ballotElapsedMicros ;
191- logger .info ("Paxos auto repair encountered a potentially in progress ballot, sleeping {}us to allow the in flight operation to finish" , sleepMicros );
192- Uninterruptibles .sleepUninterruptibly (sleepMicros , MICROSECONDS );
193- }
194- }
195-
196254 private synchronized void onKeyFinish (DecoratedKey key )
197255 {
198256 if (!inflight .containsKey (key ))
0 commit comments