1+ using System . Collections . Concurrent ;
12using System . Globalization ;
3+ using System . Linq ;
24using System . Net . Http . Headers ;
35using System . ServiceModel . Syndication ;
46using System . Text ;
@@ -32,6 +34,9 @@ public class SyndicationAction : IngestAction<SyndicationOptions>
3234{
3335 #region Variables
3436 private readonly IHttpRequestClient _httpClient ;
37+ private readonly object _lock = new object ( ) ;
38+ private string ? _etag ;
39+ private readonly ConcurrentDictionary < string , int ? > _sourceMediaTypeMap = new ConcurrentDictionary < string , int ? > ( ) ;
3540 #endregion
3641
3742 #region Properties
@@ -66,6 +71,7 @@ public override async Task<ServiceActionResult> PerformActionAsync<T>(IIngestAct
6671
6772 // This ingest has just begun running.
6873 await manager . UpdateIngestStateFailedAttemptsAsync ( manager . Ingest . FailedAttempts ) ;
74+ await GetSourcesConfigurationAsync ( ) ;
6975
7076 var url = GetUrl ( manager . Ingest ) ;
7177
@@ -143,6 +149,45 @@ private async Task ImportFeedAsync(IIngestActionManager manager, SyndicationFeed
143149 }
144150 }
145151
152+ /// <summary>
153+ /// Make a request to the API for all the source configurations.
154+ /// Store their mediaTypeId override value in memory.
155+ /// </summary>
156+ /// <returns></returns>
157+ private async Task GetSourcesConfigurationAsync ( )
158+ {
159+ try
160+ {
161+ // Fetch the latest configuration values.
162+ var response = String . IsNullOrWhiteSpace ( _etag ) ? await this . Api . GetSourcesResponseAsync ( ) : await this . Api . GetSourcesResponseWithEtagAsync ( _etag ) ;
163+ var etag = this . Api . GetResponseEtag ( response ) ;
164+ lock ( _lock )
165+ {
166+ _etag = etag ;
167+ }
168+ var sources = await this . Api . GetResponseDataAsync < SourceModel [ ] > ( response ) ;
169+ if ( sources != null )
170+ {
171+ foreach ( var source in sources )
172+ {
173+ _sourceMediaTypeMap . AddOrUpdate ( source . Code , code => source . MediaTypeId , ( code , oldId ) => source . MediaTypeId ) ;
174+ }
175+ var current = _sourceMediaTypeMap . Keys . ToArray ( ) ?? [ ] ;
176+ var latest = sources . Select ( s => s . Code ) . ToArray ( ) ;
177+ var removeSources = current . Where ( code => ! latest . Contains ( code ) ) ;
178+ foreach ( var source in removeSources )
179+ {
180+ // Remove any sources that no longer exist.
181+ _sourceMediaTypeMap . Remove ( source , out int ? value ) ;
182+ }
183+ }
184+ }
185+ catch ( Exception ex )
186+ {
187+ this . Logger . LogError ( ex , "Failed to fetch sources" ) ;
188+ }
189+ }
190+
146191 /// <summary>
147192 /// Parse the date time value and handle a common formatting issues.
148193 /// </summary>
@@ -274,6 +319,19 @@ private string DetermineSource(IngestModel ingest, SyndicationItem item)
274319 return defaultSource ;
275320 }
276321
322+ /// <summary>
323+ /// Determine the media type for the specified ingest and source.
324+ /// If the source has an override, use it.
325+ /// </summary>
326+ /// <param name="ingest"></param>
327+ /// <param name="source"></param>
328+ /// <returns></returns>
329+ private int DetermineMediaType ( IngestModel ingest , string source )
330+ {
331+ _sourceMediaTypeMap . TryGetValue ( source , out var mediaTypeId ) ;
332+ return mediaTypeId ?? ingest . MediaTypeId ;
333+ }
334+
277335 /// <summary>
278336 /// Create a SourceContent object that can be sent to Kafka.
279337 /// </summary>
@@ -287,6 +345,7 @@ private SourceContent CreateSourceContent(IngestModel ingest, SyndicationItem it
287345 var contentType = ingest . IngestType ? . ContentType ?? throw new InvalidOperationException ( $ "Ingest '{ ingest . Name } ' is missing ingest content type.") ;
288346 var publishedOn = item . PublishDate . UtcDateTime != DateTime . MinValue ? item . PublishDate . UtcDateTime : ( DateTime ? ) null ;
289347 var uid = Runners . BaseService . GetContentHash ( source , title , publishedOn ) ;
348+ var mediaTypeId = DetermineMediaType ( ingest , source ) ;
290349 string ? media = null ;
291350
292351 // Extract values from namespaces. I don't know a better way to do this.
@@ -304,7 +363,7 @@ private SourceContent CreateSourceContent(IngestModel ingest, SyndicationItem it
304363 this . Options . DataLocation ,
305364 source ,
306365 contentType ,
307- ingest . MediaTypeId ,
366+ mediaTypeId ,
308367 uid ,
309368 title ,
310369 StringExtensions . ConvertTextToParagraphs ( summary , @"[\r\n]+" ) ,
0 commit comments