22{
33 using System ;
44 using System . Collections . Generic ;
5+ using System . Diagnostics . CodeAnalysis ;
56 using System . Linq ;
67 using System . Threading ;
78 using System . Threading . Tasks ;
@@ -57,11 +58,6 @@ internal set
5758 /// </summary>
5859 public string QueueNamePrefix { get ; set ; }
5960
60- /// <summary>
61- /// Disable native delayed delivery infrastructure
62- /// </summary>
63- internal bool DisableDelayedDelivery { get ; set ; } = false ;
64-
6561 /// <summary>
6662 /// Specifies a lambda function that allows to take control of the queue name generation logic.
6763 /// This is useful to overcome any limitations imposed by SQS.
@@ -96,6 +92,7 @@ public TimeSpan MaxTimeToLive
9692 {
9793 throw new ArgumentException ( $ "Max TTL needs to be between { MaxTimeToLiveLowerBound } and { MaxTimeToLiveUpperBound } .") ;
9894 }
95+
9996 maxTimeToLive = value ;
10097 }
10198 }
@@ -235,8 +232,38 @@ public SqsTransport()
235232 snsClient = DefaultClientFactories . SnsFactory ( ) ;
236233 }
237234
238- internal SqsTransport ( IAmazonSQS sqsClient , IAmazonSimpleNotificationService snsClient , bool supportsPublishSubscribe )
239- : base ( TransportTransactionMode . ReceiveOnly , true , supportsPublishSubscribe , true )
235+ /// <summary>
236+ /// Creates a new instance of the SQS transport definition.
237+ /// </summary>
238+ [ Experimental ( DiagnosticDescriptors . ExperimentalDisableDelayedDelivery ) ]
239+ public SqsTransport (
240+ IAmazonSQS sqsClient ,
241+ IAmazonSimpleNotificationService snsClient ,
242+ bool enableDelayedDelivery
243+ )
244+ : this (
245+ sqsClient ,
246+ snsClient ,
247+ supportsPublishSubscribe : true ,
248+ enableDelayedDelivery : enableDelayedDelivery
249+ )
250+ {
251+ this . sqsClient = sqsClient ;
252+ this . snsClient = snsClient ;
253+ }
254+
255+ internal SqsTransport (
256+ IAmazonSQS sqsClient ,
257+ IAmazonSimpleNotificationService snsClient ,
258+ bool supportsPublishSubscribe ,
259+ bool enableDelayedDelivery = true
260+ )
261+ : base (
262+ TransportTransactionMode . ReceiveOnly ,
263+ enableDelayedDelivery ,
264+ supportsPublishSubscribe ,
265+ supportsTTBR : true
266+ )
240267 {
241268 this . sqsClient = sqsClient ;
242269 this . snsClient = snsClient ;
@@ -252,15 +279,38 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
252279 {
253280 AssertQueueNameGeneratorIdempotent ( queueNameGenerator ) ;
254281
255- var topicCache = new TopicCache ( SnsClient , hostSettings . CoreSettings , eventToTopicsMappings , eventToEventsMappings , topicNameGenerator , topicNamePrefix ) ;
256- var infra = new SqsTransportInfrastructure ( hostSettings , receivers , SqsClient , SnsClient , QueueCache , topicCache , S3 , Policies , QueueDelayTime , topicNamePrefix , DoNotWrapOutgoingMessages , ! externallyManagedSqsClient , ! externallyManagedSnsClient , DisableDelayedDelivery ) ;
282+ var topicCache = new TopicCache (
283+ SnsClient ,
284+ hostSettings . CoreSettings ,
285+ eventToTopicsMappings ,
286+ eventToEventsMappings ,
287+ topicNameGenerator ,
288+ topicNamePrefix
289+ ) ;
290+
291+ var infra = new SqsTransportInfrastructure (
292+ hostSettings ,
293+ receivers ,
294+ SqsClient ,
295+ SnsClient ,
296+ QueueCache ,
297+ topicCache ,
298+ S3 ,
299+ Policies ,
300+ QueueDelayTime ,
301+ topicNamePrefix ,
302+ DoNotWrapOutgoingMessages ,
303+ ! externallyManagedSqsClient ,
304+ ! externallyManagedSnsClient ,
305+ ! SupportsDelayedDelivery
306+ ) ;
257307
258308 if ( hostSettings . SetupInfrastructure )
259309 {
260310 var queueCreator = new QueueCreator ( SqsClient , QueueCache , S3 , maxTimeToLive , QueueDelayTime ) ;
261311
262312 var createQueueTasks = sendingAddresses . Select ( x => queueCreator . CreateQueueIfNecessary ( x , false , cancellationToken ) )
263- . Concat ( infra . Receivers . Values . Select ( x => queueCreator . CreateQueueIfNecessary ( x . ReceiveAddress , ! DisableDelayedDelivery , cancellationToken ) ) ) . ToArray ( ) ;
313+ . Concat ( infra . Receivers . Values . Select ( x => queueCreator . CreateQueueIfNecessary ( x . ReceiveAddress , SupportsDelayedDelivery , cancellationToken ) ) ) . ToArray ( ) ;
264314
265315 await Task . WhenAll ( createQueueTasks ) . ConfigureAwait ( false ) ;
266316 }
0 commit comments