3131
3232import accord .api .Scheduler ;
3333import accord .coordinate .CoordinateGloballyDurable ;
34- import accord .coordinate .CoordinateShardDurable ;
3534import accord .coordinate .CoordinationFailed ;
3635import accord .coordinate .ExecuteSyncPoint .SyncPointErased ;
3736import accord .local .Node ;
4847import accord .utils .async .AsyncChain ;
4948import accord .utils .async .AsyncResult ;
5049
50+ import static accord .coordinate .CoordinateShardDurable .coordinate ;
5151import static accord .coordinate .CoordinateSyncPoint .exclusiveSyncPoint ;
5252import static accord .primitives .Txn .Kind .ExclusiveSyncPoint ;
5353import static java .util .concurrent .TimeUnit .MICROSECONDS ;
54+ import static java .util .concurrent .TimeUnit .MILLISECONDS ;
55+ import static java .util .concurrent .TimeUnit .MINUTES ;
5456
5557/**
5658 * Helper methods and classes to invoke coordination to propagate information about durability.
@@ -85,15 +87,15 @@ public class CoordinateDurabilityScheduling
8587 private Scheduler .Scheduled scheduled ;
8688
8789 /*
88- * In each round at each node wait this amount of time between initiating new CoordinateShardDurable
90+ * In each cycle, attempt to split the range into this many pieces; if we fail, we increase the number of pieces
8991 */
90- private long frequencyMicros = TimeUnit . MILLISECONDS . toMicros ( 500L ) ;
92+ private int targetShardSplits = 64 ;
9193
9294 /*
9395 * In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
9496 * and coordinating the shard durability
9597 */
96- private long txnIdLagMicros = TimeUnit .SECONDS .toMicros (1L );
98+ private long txnIdLagMicros = TimeUnit .SECONDS .toMicros (5L );
9799
98100 /*
99101 * In each round at each node wait this amount of time between allocating a CoordinateShardDurable txnId
@@ -120,6 +122,10 @@ public class CoordinateDurabilityScheduling
120122 */
121123 private long globalCycleTimeMicros = TimeUnit .SECONDS .toMicros (30 );
122124
125+ private long defaultRetryDelayMicros = TimeUnit .SECONDS .toMicros (1 );
126+ private long maxRetryDelayMicros = TimeUnit .MINUTES .toMicros (1 );
127+ private int maxNumberOfSplits = 1 << 10 ;
128+
123129 private Topology currentGlobalTopology ;
124130 private final Map <Range , ShardScheduler > shardSchedulers = new HashMap <>();
125131 private int globalIndex ;
@@ -131,16 +137,15 @@ private class ShardScheduler
131137 {
132138 Shard shard ;
133139
134- // based on ideal number of splits
140+ // time to start a new cycle
135141 int nodeOffset ;
136142
137143 int index ;
138- int numberOfSplits , desiredNumberOfSplits ;
139- boolean defunct ;
140- long shardCycleTimeMicros ;
144+ int numberOfSplits ;
141145 Scheduler .Scheduled scheduled ;
142- long rangeStartedAtMicros ;
143- long cycleStartedAtMicros = -1 ;
146+ long rangeStartedAtMicros , cycleStartedAtMicros ;
147+ long retryDelayMicros = defaultRetryDelayMicros ;
148+ boolean defunct ;
144149
145150 private ShardScheduler ()
146151 {
@@ -150,32 +155,80 @@ synchronized void update(Shard shard, int offset)
150155 {
151156 this .shard = shard ;
152157 this .nodeOffset = offset ;
153- this .shardCycleTimeMicros = Math .max (CoordinateDurabilityScheduling .this .shardCycleTimeMicros , shard .rf () * 3L * frequencyMicros );
154- this .desiredNumberOfSplits = (int ) ((shardCycleTimeMicros + frequencyMicros - 1 ) / frequencyMicros );
155- if (numberOfSplits == 0 || numberOfSplits < desiredNumberOfSplits )
156- {
157- index = offset ;
158- numberOfSplits = desiredNumberOfSplits ;
159- }
158+ if (numberOfSplits == 0 || numberOfSplits < targetShardSplits )
159+ numberOfSplits = targetShardSplits ;
160160 }
161161
162162 synchronized void markDefunct ()
163163 {
164164 defunct = true ;
165+ logger .info ("Discarding defunct shard durability scheduler for {}" , shard );
165166 }
166167
167- synchronized void schedule ()
168+ synchronized void retryCoordinateDurability (Node node , SyncPoint <Range > exclusiveSyncPoint , int nextIndex )
169+ {
170+ if (defunct )
171+ return ;
172+
173+ // TODO (expected): back-off
174+ coordinateShardDurableAfterExclusiveSyncPoint (node , exclusiveSyncPoint , nextIndex );
175+ }
176+
177+ synchronized void restart ()
168178 {
169179 if (defunct )
170180 return ;
171181
172182 long nowMicros = node .elapsed (MICROSECONDS );
173- int cyclePosition = (nodeOffset + (((index * shard .rf ()) + numberOfSplits - 1 ) / numberOfSplits )) % shard .rf ();
174- long microsOffset = (cyclePosition * shardCycleTimeMicros ) / shard .rf ();
183+ long microsOffset = (nodeOffset * shardCycleTimeMicros ) / shard .rf ();
175184 long scheduleAt = nowMicros - (nowMicros % shardCycleTimeMicros ) + microsOffset ;
176185 if (nowMicros > scheduleAt )
177186 scheduleAt += shardCycleTimeMicros ;
178187
188+ if (numberOfSplits < targetShardSplits )
189+ numberOfSplits = targetShardSplits ;
190+
191+ cycleStartedAtMicros = scheduleAt ;
192+ scheduleAt (nowMicros , scheduleAt );
193+ }
194+
195+ synchronized void schedule ()
196+ {
197+ if (defunct )
198+ return ;
199+
200+ long nowMicros = node .elapsed (MICROSECONDS );
201+ long microsOffset = (index * shardCycleTimeMicros ) / numberOfSplits ;
202+ long scheduleAt = cycleStartedAtMicros + microsOffset ;
203+ if (retryDelayMicros > defaultRetryDelayMicros )
204+ {
205+ retryDelayMicros = Math .max (defaultRetryDelayMicros , (long )(0.9 * retryDelayMicros ));
206+ }
207+ if (numberOfSplits > targetShardSplits && index % 4 == 0 )
208+ {
209+ index /= 4 ;
210+ numberOfSplits /=4 ;
211+ }
212+ scheduleAt (nowMicros , scheduleAt );
213+ }
214+
215+ synchronized void retry ()
216+ {
217+ if (defunct )
218+ return ;
219+
220+ long nowMicros = node .elapsed (MICROSECONDS );
221+ long scheduleAt = nowMicros + retryDelayMicros ;
222+ retryDelayMicros += retryDelayMicros / 2 ;
223+ if (retryDelayMicros > maxRetryDelayMicros )
224+ {
225+ retryDelayMicros = maxRetryDelayMicros ;
226+ }
227+ scheduleAt (nowMicros , scheduleAt );
228+ }
229+
230+ synchronized void scheduleAt (long nowMicros , long scheduleAt )
231+ {
179232 ShardDistributor distributor = node .commandStores ().shardDistributor ();
180233 Range range ;
181234 int nextIndex ;
@@ -188,11 +241,13 @@ synchronized void schedule()
188241 nextIndex = i ;
189242 }
190243
191- scheduled = node . scheduler (). once ( () -> {
244+ Runnable schedule = () -> {
192245 // TODO (required): allocate stale HLC from a reservation of HLCs for this purpose
193246 TxnId syncId = node .nextTxnId (ExclusiveSyncPoint , Domain .Range );
194247 startShardSync (syncId , Ranges .of (range ), nextIndex );
195- }, scheduleAt - nowMicros , MICROSECONDS );
248+ };
249+ if (scheduleAt <= nowMicros ) schedule .run ();
250+ else scheduled = node .scheduler ().once (schedule , scheduleAt - nowMicros , MICROSECONDS );
196251 }
197252
198253 /**
@@ -204,6 +259,7 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex)
204259 scheduled = node .scheduler ().once (() -> node .withEpoch (syncId .epoch (), (ignored , withEpochFailure ) -> {
205260 if (withEpochFailure != null )
206261 {
262+ // don't wait on epoch failure - we aren't the cause of any problems
207263 startShardSync (syncId , ranges , nextIndex );
208264 Throwable wrapped = CoordinationFailed .wrap (withEpochFailure );
209265 logger .trace ("Exception waiting for epoch before coordinating exclusive sync point for local shard durability, epoch " + syncId .epoch (), wrapped );
@@ -219,10 +275,13 @@ private void startShardSync(TxnId syncId, Ranges ranges, int nextIndex)
219275 {
220276 synchronized (ShardScheduler .this )
221277 {
222- index *= 2 ;
223- numberOfSplits *= 2 ;
224278 // TODO (required): try to recover or invalidate prior sync point
225- schedule ();
279+ retry ();
280+ if (numberOfSplits * 2 <= maxNumberOfSplits )
281+ {
282+ index *= 2 ;
283+ numberOfSplits *= 2 ;
284+ }
226285 logger .warn ("{}: Exception coordinating ExclusiveSyncPoint for {} durability. Increased numberOfSplits to " + numberOfSplits , syncId , ranges , fail );
227286 }
228287 }
@@ -240,68 +299,92 @@ private void coordinateShardDurableAfterExclusiveSyncPoint(Node node, SyncPoint<
240299 scheduled = node .scheduler ().once (() -> {
241300 scheduled = null ;
242301 node .commandStores ().any ().execute (() -> {
243- CoordinateShardDurable .coordinate (node , exclusiveSyncPoint )
244- .addCallback ((success , fail ) -> {
245- if (fail != null && fail .getClass () != SyncPointErased .class )
246- {
247- logger .trace ("Exception coordinating local shard durability, will retry immediately" , fail );
248- coordinateShardDurableAfterExclusiveSyncPoint (node , exclusiveSyncPoint , nextIndex );
249- }
250- else
251- {
252- synchronized (ShardScheduler .this )
253- {
254- index = nextIndex ;
255- if (index >= numberOfSplits )
256- {
257- index = 0 ;
258- long nowMicros = node .elapsed (MICROSECONDS );
259- String reportTime = "" ;
260- if (cycleStartedAtMicros > 0 )
261- reportTime = "in " + MICROSECONDS .toSeconds (nowMicros - cycleStartedAtMicros ) + 's' ;
262- logger .info ("Successfully completed one cycle of durability scheduling for shard {}{}" , shard .range , reportTime );
263- if (numberOfSplits > desiredNumberOfSplits )
264- numberOfSplits = Math .max (desiredNumberOfSplits , (int )(numberOfSplits * 0.9 ));
265- cycleStartedAtMicros = nowMicros ;
266- }
267- else
268- {
269- long nowMicros = node .elapsed (MICROSECONDS );
270- logger .debug ("Successfully coordinated shard durability for range {} in {}s" , shard .range , MICROSECONDS .toSeconds (nowMicros - rangeStartedAtMicros ));
271- }
272-
273- schedule ();
274- }
275- }
276- });
302+ coordinate (node , exclusiveSyncPoint )
303+ .addCallback ((success , fail ) -> {
304+ if (fail != null && fail .getClass () != SyncPointErased .class )
305+ {
306+ logger .debug ("Exception coordinating shard durability for {}, will retry" , exclusiveSyncPoint .route .toRanges (), fail );
307+ retryCoordinateDurability (node , exclusiveSyncPoint , nextIndex );
308+ }
309+ else
310+ {
311+ try
312+ {
313+ synchronized (ShardScheduler .this )
314+ {
315+ int prevIndex = index ;
316+ index = nextIndex ;
317+ if (index >= numberOfSplits )
318+ {
319+ index = 0 ;
320+ long nowMicros = node .elapsed (MICROSECONDS );
321+ long timeTakenSeconds = MICROSECONDS .toSeconds (nowMicros - cycleStartedAtMicros );
322+ long targetTimeSeconds = MILLISECONDS .toSeconds (shardCycleTimeMicros );
323+ logger .info ("Successfully completed one cycle of durability scheduling for shard {} in {}s (vs {}s target)" , shard .range , timeTakenSeconds , targetTimeSeconds );
324+ restart ();
325+ }
326+ else
327+ {
328+ long nowMicros = node .elapsed (MICROSECONDS );
329+ int prevRfCycle = (prevIndex * shard .rf ()) / numberOfSplits ;
330+ int curRfCycle = (index * shard .rf ()) / numberOfSplits ;
331+ if (prevRfCycle != curRfCycle )
332+ {
333+ long targetTimeSeconds = MICROSECONDS .toSeconds ((index * shardCycleTimeMicros ) / numberOfSplits );
334+ long timeTakenSeconds = MICROSECONDS .toSeconds (nowMicros - cycleStartedAtMicros );
335+ logger .info ("Successfully completed {}/{} cycle of durability scheduling covering range {}. Completed in {}s (vs {}s target)." , curRfCycle , shard .rf (), exclusiveSyncPoint .route .toRanges (), timeTakenSeconds , targetTimeSeconds );
336+ }
337+ else if (logger .isTraceEnabled ())
338+ {
339+ logger .trace ("Successfully coordinated shard durability for range {} in {}s" , shard .range , MICROSECONDS .toSeconds (nowMicros - rangeStartedAtMicros ));
340+ }
341+ schedule ();
342+ }
343+ }
344+ }
345+ catch (Throwable t )
346+ {
347+ retry ();
348+ logger .error ("Unexpected exception handling durability scheduling callback; starting from scratch" , t );
349+ }
350+ }
351+ });
277352 });
278353 }, durabilityLagMicros , MICROSECONDS );
279354 }
280-
281355 }
282356
283-
284357 public CoordinateDurabilityScheduling (Node node )
285358 {
286359 this .node = node ;
287360 }
288361
289- public void setFrequency (int frequency , TimeUnit units )
362+ public void setTargetShardSplits (int targetShardSplits )
363+ {
364+ this .targetShardSplits = targetShardSplits ;
365+ }
366+
367+ public void setDefaultRetryDelay (long retryDelay , TimeUnit units )
290368 {
291- this .frequencyMicros = Ints . saturatedCast ( units .toMicros (frequency ) );
369+ this .defaultRetryDelayMicros = units .toMicros (retryDelay );
292370 }
293371
294- public void setTxnIdLag (int txnIdLag , TimeUnit units )
372+ public void setMaxRetryDelay (long retryDelay , TimeUnit units )
373+ {
374+ this .maxRetryDelayMicros = units .toMicros (retryDelay );
375+ }
376+
377+ public void setTxnIdLag (long txnIdLag , TimeUnit units )
295378 {
296379 this .txnIdLagMicros = Ints .saturatedCast (units .toMicros (txnIdLag ));
297380 }
298381
299- public void setDurabilityLag (int durabilityLag , TimeUnit units )
382+ public void setDurabilityLag (long durabilityLag , TimeUnit units )
300383 {
301384 this .durabilityLagMicros = Ints .saturatedCast (units .toMicros (durabilityLag ));
302385 }
303386
304- public void setShardCycleTime (int shardCycleTime , TimeUnit units )
387+ public void setShardCycleTime (long shardCycleTime , TimeUnit units )
305388 {
306389 this .shardCycleTimeMicros = Ints .saturatedCast (units .toMicros (shardCycleTime ));
307390 }
@@ -319,7 +402,7 @@ public synchronized void start()
319402 Invariants .checkState (!stop ); // cannot currently restart safely
320403 long nowMicros = node .elapsed (MICROSECONDS );
321404 setNextGlobalSyncTime (nowMicros );
322- scheduled = node .scheduler ().recurring (this ::run , frequencyMicros , MICROSECONDS );
405+ scheduled = node .scheduler ().recurring (this ::run , 1L , MINUTES );
323406 }
324407
325408 public void stop ()
@@ -351,7 +434,6 @@ private void run()
351434 }
352435 }
353436
354-
355437 private void startGlobalSync ()
356438 {
357439 try
@@ -369,7 +451,7 @@ private void startGlobalSync()
369451 }
370452 }
371453
372- private void updateTopology ()
454+ public synchronized void updateTopology ()
373455 {
374456 Topology latestGlobal = node .topology ().current ();
375457 if (latestGlobal == currentGlobalTopology )
@@ -395,7 +477,10 @@ private void updateTopology()
395477 shardSchedulers .put (shard .range , scheduler );
396478 scheduler .update (shard , shard .nodes .find (node .id ()));
397479 if (prevScheduler == null )
398- scheduler .schedule ();
480+ {
481+ logger .info ("Starting shard durability scheduler for {}" , shard );
482+ scheduler .restart ();
483+ }
399484 }
400485 prev .forEach ((r , s ) -> s .markDefunct ());
401486 }
@@ -432,6 +517,6 @@ private void setNextGlobalSyncTime(long nowMicros)
432517 if (targetTimeInCurrentRound < nowMicros )
433518 targetTime += totalRoundDuration ;
434519
435- nextGlobalSyncTimeMicros = targetTime - nowMicros ;
520+ nextGlobalSyncTimeMicros = targetTime ;
436521 }
437522}
0 commit comments