33
44using System ;
55using System . Collections . Concurrent ;
6+ using System . Collections . Generic ;
7+ using System . Linq ;
68using System . Threading ;
79using System . Threading . Tasks ;
810using Microsoft . Azure . Cosmos . Table ;
911using Microsoft . Azure . WebJobs . Host . Executors ;
12+ using Microsoft . Azure . WebJobs . Hosting ;
1013using Microsoft . Azure . WebJobs . Logging ;
1114using Microsoft . Azure . WebJobs . Script . WebHost . Helpers ;
1215using Microsoft . Extensions . Configuration ;
16+ using Microsoft . Extensions . DependencyInjection ;
1317using Microsoft . Extensions . Logging ;
1418
1519namespace Microsoft . Azure . WebJobs . Script . WebHost . Diagnostics
@@ -25,26 +29,31 @@ public class DiagnosticEventTableStorageRepository : IDiagnosticEventRepository,
2529 private readonly IHostIdProvider _hostIdProvider ;
2630 private readonly IEnvironment _environment ;
2731 private readonly ILogger < DiagnosticEventTableStorageRepository > _logger ;
32+ private readonly IServiceProvider _serviceProvider ;
2833 private readonly object _syncLock = new object ( ) ;
2934
3035 private ConcurrentDictionary < string , DiagnosticEvent > _events = new ConcurrentDictionary < string , DiagnosticEvent > ( ) ;
3136 private CloudTableClient _tableClient ;
3237 private CloudTable _diagnosticEventsTable ;
3338 private string _hostId ;
3439 private bool _disposed = false ;
40+ private bool _purged = false ;
3541 private string _tableName ;
3642
37- internal DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , ILogger < DiagnosticEventTableStorageRepository > logger , int logFlushInterval )
43+ internal DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , IScriptHostManager scriptHostManager ,
44+ ILogger < DiagnosticEventTableStorageRepository > logger , int logFlushInterval )
3845 {
3946 _configuration = configuration ;
4047 _hostIdProvider = hostIdProvider ;
4148 _environment = environment ;
49+ _serviceProvider = scriptHostManager as IServiceProvider ;
4250 _logger = logger ;
4351 _flushLogsTimer = new Timer ( OnFlushLogs , null , logFlushInterval , logFlushInterval ) ;
4452 }
4553
46- public DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , ILogger < DiagnosticEventTableStorageRepository > logger )
47- : this ( configuration , hostIdProvider , environment , logger , LogFlushInterval ) { }
54+ public DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , IScriptHostManager scriptHost ,
55+ ILogger < DiagnosticEventTableStorageRepository > logger )
56+ : this ( configuration , hostIdProvider , environment , scriptHost , logger , LogFlushInterval ) { }
4857
4958 internal CloudTableClient TableClient
5059 {
@@ -88,7 +97,7 @@ internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
8897 if ( TableClient != null )
8998 {
9099 now = now ?? DateTime . UtcNow ;
91- string currentTableName = GetCurrentTableName ( now . Value ) ;
100+ string currentTableName = GetTableName ( now . Value ) ;
92101
93102 // update the table reference when date rolls over to a new month
94103 if ( _diagnosticEventsTable == null || currentTableName != _tableName )
@@ -101,43 +110,106 @@ internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
101110 return _diagnosticEventsTable ;
102111 }
103112
104- private static string GetCurrentTableName ( DateTime now )
113+ private static string GetTableName ( DateTime date )
105114 {
106- return $ "{ TableNamePrefix } { now : yyyyMM} ";
115+ return $ "{ TableNamePrefix } { date : yyyyMM} ";
107116 }
108117
109118 protected internal virtual async void OnFlushLogs ( object state )
110119 {
111120 await FlushLogs ( ) ;
112121 }
113122
123+ private async Task PurgePreviousEventVersions ( )
124+ {
125+ _logger . LogDebug ( "Purging diagnostic events with versions older than '{currentEventVersion}'." , DiagnosticEvent . CurrentEventVersion ) ;
126+
127+ bool tableDeleted = false ;
128+
129+ await Utility . InvokeWithRetriesAsync ( async ( ) =>
130+ {
131+ try
132+ {
133+ var tables = ( await TableStorageHelpers . ListTablesAsync ( TableClient , TableNamePrefix ) ) . ToList ( ) ;
134+
135+ foreach ( var table in tables )
136+ {
137+ var tableRecords = await table . ExecuteQuerySegmentedAsync ( new TableQuery < DiagnosticEvent > ( ) , null ) ;
138+
139+ // Skip tables that have 0 records
140+ if ( tableRecords . Results . Count == 0 )
141+ {
142+ continue ;
143+ }
144+
145+ // Delete table if it doesn't have records with EventVersion
146+ var eventVersionDoesNotExists = tableRecords . Results . Any ( record => string . IsNullOrEmpty ( record . EventVersion ) == true ) ;
147+ if ( eventVersionDoesNotExists )
148+ {
149+ _logger . LogDebug ( "Deleting table '{tableName}' as it contains records without an EventVersion." , table . Name ) ;
150+ await table . DeleteIfExistsAsync ( ) ;
151+ tableDeleted = true ;
152+ continue ;
153+ }
154+
155+ // If the table does have EventVersion, query if it is an outdated version
156+ var eventVersionOutdated = tableRecords . Results . Any ( record => string . Compare ( DiagnosticEvent . CurrentEventVersion , record . EventVersion , StringComparison . Ordinal ) > 0 ) ;
157+ if ( eventVersionOutdated )
158+ {
159+ _logger . LogDebug ( "Deleting table '{tableName}' as it contains records with an outdated EventVersion." , table . Name ) ;
160+ await table . DeleteIfExistsAsync ( ) ;
161+ tableDeleted = true ;
162+ }
163+ }
164+
165+ _purged = true ;
166+ }
167+ catch ( Exception ex )
168+ {
169+ _logger . LogError ( ex , "Error occurred when attempting to purge previous diagnostic event versions." ) ;
170+ }
171+ } , maxRetries : 5 , retryInterval : TimeSpan . FromSeconds ( 5 ) ) ;
172+
173+ if ( tableDeleted )
174+ {
175+ // Wait for 30 seconds to allow the table to be deleted before proceeding to avoid a potential race.
176+ await Task . Delay ( TimeSpan . FromSeconds ( 30 ) ) ;
177+ }
178+ }
179+
114180 internal virtual async Task FlushLogs ( CloudTable table = null )
115181 {
116182 if ( _environment . IsPlaceholderModeEnabled ( ) )
117183 {
118184 return ;
119185 }
120186
187+ if ( IsPrimaryHost ( ) && ! _purged )
188+ {
189+ await PurgePreviousEventVersions ( ) ;
190+ }
191+
121192 try
122193 {
123194 table = table ?? GetDiagnosticEventsTable ( ) ;
124195
125196 if ( table == null )
126197 {
127- _logger . LogError ( "Unable to get table reference. Aborting write operation" ) ;
198+ _logger . LogError ( "Unable to get table reference. Aborting write operation. " ) ;
128199 StopTimer ( ) ;
129200 return ;
130201 }
131202
132203 bool tableCreated = await TableStorageHelpers . CreateIfNotExistsAsync ( table , TableCreationMaxRetryCount ) ;
133204 if ( tableCreated )
134205 {
206+ _logger . LogDebug ( "Queueing background table purge." ) ;
135207 TableStorageHelpers . QueueBackgroundTablePurge ( table , TableClient , TableNamePrefix , _logger ) ;
136208 }
137209 }
138210 catch ( Exception ex )
139211 {
140- _logger . LogError ( ex , $ "Unable to get table reference or create table. Aborting write operation.") ;
212+ _logger . LogError ( ex , "Unable to get table reference or create table. Aborting write operation." ) ;
141213 // Clearing the memory cache to avoid memory build up.
142214 _events . Clear ( ) ;
143215 return ;
@@ -169,9 +241,9 @@ internal async Task ExecuteBatchAsync(ConcurrentDictionary<string, DiagnosticEve
169241 await table . ExecuteBatchAsync ( batch ) ;
170242 events . Clear ( ) ;
171243 }
172- catch ( Exception e )
244+ catch ( Exception ex )
173245 {
174- _logger . LogError ( e , $ "Unable to write diagnostic events to table storage: { e } ") ;
246+ _logger . LogError ( ex , "Unable to write diagnostic events to table storage. " ) ;
175247 }
176248 }
177249
@@ -202,9 +274,21 @@ public void WriteDiagnosticEvent(DateTime timestamp, string errorCode, LogLevel
202274 }
203275 }
204276
277+ private bool IsPrimaryHost ( )
278+ {
279+ var primaryHostStateProvider = _serviceProvider ? . GetService < IPrimaryHostStateProvider > ( ) ;
280+ if ( primaryHostStateProvider is null )
281+ {
282+ _logger . LogDebug ( "PrimaryHostStateProvider is not available. Skipping the check for primary host." ) ;
283+ return false ;
284+ }
285+
286+ return primaryHostStateProvider . IsPrimary ;
287+ }
288+
205289 private void StopTimer ( )
206290 {
207- _logger . LogInformation ( "Stopping the flush logs timer" ) ;
291+ _logger . LogInformation ( "Stopping the flush logs timer. " ) ;
208292 _flushLogsTimer ? . Change ( Timeout . Infinite , Timeout . Infinite ) ;
209293 }
210294
0 commit comments