11using System ;
22using System . Collections . Generic ;
3+ using System . Threading ;
34using System . Threading . Tasks ;
45using Microsoft . Azure . Documents ;
56using TableStorage . Abstractions . Models ;
@@ -436,5 +437,70 @@ public static IPocoTableStore<T, TIndexPartitionKey, TIndexRowKey> Index<T, TPar
436437 }
437438
438439
440+ /// <summary>
441+ /// Reindexes a table. You will need to call this for existing tables that have never had an index, or if things get out of sync. This can take a while.
442+ /// </summary>
443+ /// <typeparam name="T"></typeparam>
444+ /// <typeparam name="TPartitionKey">The type of the t partition key.</typeparam>
445+ /// <typeparam name="TRowKey">The type of the t row key.</typeparam>
446+ /// <param name="tableStore">The table store.</param>
447+ /// <param name="indexName">Name of the index.</param>
448+ /// <param name="maxDegreeOfParallelism">The maximum degree of parallelism.</param>
449+ /// <param name="recordsIndexedCallback">The records indexed callback.</param>
450+ /// <param name="failedIndexCallback">The failed index callback.</param>
451+ /// <exception cref="ArgumentException"></exception>
452+ public static async Task ReindexAsync < T , TPartitionKey , TRowKey > ( this IPocoTableStore < T , TPartitionKey , TRowKey > tableStore , string indexName , int ? maxDegreeOfParallelism = null , Action < int > recordsIndexedCallback = null , Action < T , Exception > failedIndexCallback = null )
453+ {
454+ maxDegreeOfParallelism = maxDegreeOfParallelism ?? Environment . ProcessorCount * 20 ;
455+
456+ string pageToken = null ;
457+ int count = 0 ;
458+ using ( var semaphore = new SemaphoreSlim ( maxDegreeOfParallelism . Value , maxDegreeOfParallelism . Value ) )
459+ {
460+ try
461+ {
462+ dynamic indexStore = _indexes [ indexName ] ;
463+ do
464+ {
465+ var result = await indexStore . GetAllRecordsPagedAsync ( 1000 , pageToken ) ;
466+ pageToken = result . ContinuationToken ;
467+
468+ if ( result . Items . Count > 0 )
469+ {
470+ foreach ( var record in result . Items )
471+ {
472+ await semaphore . WaitAsync ( TimeSpan . FromSeconds ( 20 ) ) ;
473+ Task task = indexStore . InsertOrReplaceAsync ( record ) ;
474+ task . ContinueWith ( r =>
475+ {
476+ if ( r . IsFaulted )
477+ {
478+ failedIndexCallback ? . Invoke ( record , r . Exception ) ;
479+ }
480+ Interlocked . Increment ( ref count ) ;
481+ semaphore . Release ( 1 ) ;
482+ } ) ;
483+
484+ }
485+ }
486+
487+ recordsIndexedCallback ? . Invoke ( count ) ;
488+ } while ( pageToken != null ) ;
489+
490+ while ( semaphore . CurrentCount < maxDegreeOfParallelism )
491+ {
492+
493+ await Task . Delay ( 5 ) ;
494+ }
495+ recordsIndexedCallback ? . Invoke ( count ) ;
496+ }
497+ catch ( KeyNotFoundException e )
498+ {
499+ throw new ArgumentException ( $ "{ indexName } is not a defined secondary index") ;
500+ }
501+ }
502+ }
503+
504+
439505 }
440506}
0 commit comments