3
3
4
4
using System ;
5
5
using System . Collections . Concurrent ;
6
+ using System . Collections . Generic ;
7
+ using System . Linq ;
6
8
using System . Threading ;
7
9
using System . Threading . Tasks ;
8
10
using Microsoft . Azure . Cosmos . Table ;
9
11
using Microsoft . Azure . WebJobs . Host . Executors ;
12
+ using Microsoft . Azure . WebJobs . Hosting ;
10
13
using Microsoft . Azure . WebJobs . Logging ;
11
14
using Microsoft . Azure . WebJobs . Script . WebHost . Helpers ;
12
15
using Microsoft . Extensions . Configuration ;
16
+ using Microsoft . Extensions . DependencyInjection ;
13
17
using Microsoft . Extensions . Logging ;
14
18
15
19
namespace Microsoft . Azure . WebJobs . Script . WebHost . Diagnostics
@@ -25,26 +29,31 @@ public class DiagnosticEventTableStorageRepository : IDiagnosticEventRepository,
25
29
private readonly IHostIdProvider _hostIdProvider ;
26
30
private readonly IEnvironment _environment ;
27
31
private readonly ILogger < DiagnosticEventTableStorageRepository > _logger ;
32
+ private readonly IServiceProvider _serviceProvider ;
28
33
private readonly object _syncLock = new object ( ) ;
29
34
30
35
private ConcurrentDictionary < string , DiagnosticEvent > _events = new ConcurrentDictionary < string , DiagnosticEvent > ( ) ;
31
36
private CloudTableClient _tableClient ;
32
37
private CloudTable _diagnosticEventsTable ;
33
38
private string _hostId ;
34
39
private bool _disposed = false ;
40
+ private bool _purged = false ;
35
41
private string _tableName ;
36
42
37
- internal DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , ILogger < DiagnosticEventTableStorageRepository > logger , int logFlushInterval )
43
+ internal DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , IScriptHostManager scriptHostManager ,
44
+ ILogger < DiagnosticEventTableStorageRepository > logger , int logFlushInterval )
38
45
{
39
46
_configuration = configuration ;
40
47
_hostIdProvider = hostIdProvider ;
41
48
_environment = environment ;
49
+ _serviceProvider = scriptHostManager as IServiceProvider ;
42
50
_logger = logger ;
43
51
_flushLogsTimer = new Timer ( OnFlushLogs , null , logFlushInterval , logFlushInterval ) ;
44
52
}
45
53
46
- public DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , ILogger < DiagnosticEventTableStorageRepository > logger )
47
- : this ( configuration , hostIdProvider , environment , logger , LogFlushInterval ) { }
54
+ public DiagnosticEventTableStorageRepository ( IConfiguration configuration , IHostIdProvider hostIdProvider , IEnvironment environment , IScriptHostManager scriptHost ,
55
+ ILogger < DiagnosticEventTableStorageRepository > logger )
56
+ : this ( configuration , hostIdProvider , environment , scriptHost , logger , LogFlushInterval ) { }
48
57
49
58
internal CloudTableClient TableClient
50
59
{
@@ -88,7 +97,7 @@ internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
88
97
if ( TableClient != null )
89
98
{
90
99
now = now ?? DateTime . UtcNow ;
91
- string currentTableName = GetCurrentTableName ( now . Value ) ;
100
+ string currentTableName = GetTableName ( now . Value ) ;
92
101
93
102
// update the table reference when date rolls over to a new month
94
103
if ( _diagnosticEventsTable == null || currentTableName != _tableName )
@@ -101,43 +110,106 @@ internal CloudTable GetDiagnosticEventsTable(DateTime? now = null)
101
110
return _diagnosticEventsTable ;
102
111
}
103
112
104
- private static string GetCurrentTableName ( DateTime now )
113
+ private static string GetTableName ( DateTime date )
105
114
{
106
- return $ "{ TableNamePrefix } { now : yyyyMM} ";
115
+ return $ "{ TableNamePrefix } { date : yyyyMM} ";
107
116
}
108
117
109
118
protected internal virtual async void OnFlushLogs ( object state )
110
119
{
111
120
await FlushLogs ( ) ;
112
121
}
113
122
123
+ private async Task PurgePreviousEventVersions ( )
124
+ {
125
+ _logger . LogDebug ( "Purging diagnostic events with versions older than '{currentEventVersion}'." , DiagnosticEvent . CurrentEventVersion ) ;
126
+
127
+ bool tableDeleted = false ;
128
+
129
+ await Utility . InvokeWithRetriesAsync ( async ( ) =>
130
+ {
131
+ try
132
+ {
133
+ var tables = ( await TableStorageHelpers . ListTablesAsync ( TableClient , TableNamePrefix ) ) . ToList ( ) ;
134
+
135
+ foreach ( var table in tables )
136
+ {
137
+ var tableRecords = await table . ExecuteQuerySegmentedAsync ( new TableQuery < DiagnosticEvent > ( ) , null ) ;
138
+
139
+ // Skip tables that have 0 records
140
+ if ( tableRecords . Results . Count == 0 )
141
+ {
142
+ continue ;
143
+ }
144
+
145
+ // Delete table if it doesn't have records with EventVersion
146
+ var eventVersionDoesNotExists = tableRecords . Results . Any ( record => string . IsNullOrEmpty ( record . EventVersion ) == true ) ;
147
+ if ( eventVersionDoesNotExists )
148
+ {
149
+ _logger . LogDebug ( "Deleting table '{tableName}' as it contains records without an EventVersion." , table . Name ) ;
150
+ await table . DeleteIfExistsAsync ( ) ;
151
+ tableDeleted = true ;
152
+ continue ;
153
+ }
154
+
155
+ // If the table does have EventVersion, query if it is an outdated version
156
+ var eventVersionOutdated = tableRecords . Results . Any ( record => string . Compare ( DiagnosticEvent . CurrentEventVersion , record . EventVersion , StringComparison . Ordinal ) > 0 ) ;
157
+ if ( eventVersionOutdated )
158
+ {
159
+ _logger . LogDebug ( "Deleting table '{tableName}' as it contains records with an outdated EventVersion." , table . Name ) ;
160
+ await table . DeleteIfExistsAsync ( ) ;
161
+ tableDeleted = true ;
162
+ }
163
+ }
164
+
165
+ _purged = true ;
166
+ }
167
+ catch ( Exception ex )
168
+ {
169
+ _logger . LogError ( ex , "Error occurred when attempting to purge previous diagnostic event versions." ) ;
170
+ }
171
+ } , maxRetries : 5 , retryInterval : TimeSpan . FromSeconds ( 5 ) ) ;
172
+
173
+ if ( tableDeleted )
174
+ {
175
+ // Wait for 30 seconds to allow the table to be deleted before proceeding to avoid a potential race.
176
+ await Task . Delay ( TimeSpan . FromSeconds ( 30 ) ) ;
177
+ }
178
+ }
179
+
114
180
internal virtual async Task FlushLogs ( CloudTable table = null )
115
181
{
116
182
if ( _environment . IsPlaceholderModeEnabled ( ) )
117
183
{
118
184
return ;
119
185
}
120
186
187
+ if ( IsPrimaryHost ( ) && ! _purged )
188
+ {
189
+ await PurgePreviousEventVersions ( ) ;
190
+ }
191
+
121
192
try
122
193
{
123
194
table = table ?? GetDiagnosticEventsTable ( ) ;
124
195
125
196
if ( table == null )
126
197
{
127
- _logger . LogError ( "Unable to get table reference. Aborting write operation" ) ;
198
+ _logger . LogError ( "Unable to get table reference. Aborting write operation. " ) ;
128
199
StopTimer ( ) ;
129
200
return ;
130
201
}
131
202
132
203
bool tableCreated = await TableStorageHelpers . CreateIfNotExistsAsync ( table , TableCreationMaxRetryCount ) ;
133
204
if ( tableCreated )
134
205
{
206
+ _logger . LogDebug ( "Queueing background table purge." ) ;
135
207
TableStorageHelpers . QueueBackgroundTablePurge ( table , TableClient , TableNamePrefix , _logger ) ;
136
208
}
137
209
}
138
210
catch ( Exception ex )
139
211
{
140
- _logger . LogError ( ex , $ "Unable to get table reference or create table. Aborting write operation.") ;
212
+ _logger . LogError ( ex , "Unable to get table reference or create table. Aborting write operation." ) ;
141
213
// Clearing the memory cache to avoid memory build up.
142
214
_events . Clear ( ) ;
143
215
return ;
@@ -169,9 +241,9 @@ internal async Task ExecuteBatchAsync(ConcurrentDictionary<string, DiagnosticEve
169
241
await table . ExecuteBatchAsync ( batch ) ;
170
242
events . Clear ( ) ;
171
243
}
172
- catch ( Exception e )
244
+ catch ( Exception ex )
173
245
{
174
- _logger . LogError ( e , $ "Unable to write diagnostic events to table storage: { e } ") ;
246
+ _logger . LogError ( ex , "Unable to write diagnostic events to table storage. " ) ;
175
247
}
176
248
}
177
249
@@ -202,9 +274,21 @@ public void WriteDiagnosticEvent(DateTime timestamp, string errorCode, LogLevel
202
274
}
203
275
}
204
276
277
+ private bool IsPrimaryHost ( )
278
+ {
279
+ var primaryHostStateProvider = _serviceProvider ? . GetService < IPrimaryHostStateProvider > ( ) ;
280
+ if ( primaryHostStateProvider is null )
281
+ {
282
+ _logger . LogDebug ( "PrimaryHostStateProvider is not available. Skipping the check for primary host." ) ;
283
+ return false ;
284
+ }
285
+
286
+ return primaryHostStateProvider . IsPrimary ;
287
+ }
288
+
205
289
private void StopTimer ( )
206
290
{
207
- _logger . LogInformation ( "Stopping the flush logs timer" ) ;
291
+ _logger . LogInformation ( "Stopping the flush logs timer. " ) ;
208
292
_flushLogsTimer ? . Change ( Timeout . Infinite , Timeout . Infinite ) ;
209
293
}
210
294
0 commit comments