Skip to content

Commit 17a8721

Browse files
EtherZazarusz
authored andcommitted
#373 Replace ICurrentTimeProvider with TimeProvider
Signed-off-by: Richard Pringle <richardpringle@gmail.com>
1 parent cf2f3b7 commit 17a8721

File tree

52 files changed

+1169
-1195
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+1169
-1195
lines changed

src/SlimMessageBus.Host.AmazonSQS/SqsMessageBus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void AddConsumerFrom(string path, PathKind pathKind, IMessageProcessor<Message>
7575
Settings.RequestResponse,
7676
messageProvider: MessageProvider,
7777
PendingRequestStore,
78-
CurrentTimeProvider);
78+
TimeProvider);
7979

8080
AddConsumerFrom(
8181
path,

src/SlimMessageBus.Host.AzureEventHub/Consumer/EhPartitionConsumerForResponses.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@ public EhPartitionConsumerForResponses(
1515
GroupPathPartitionId pathGroupPartition,
1616
MessageProvider<EventData> messageProvider,
1717
IPendingRequestStore pendingRequestStore,
18-
ICurrentTimeProvider currentTimeProvider)
18+
TimeProvider timeProvider)
1919
: base(messageBus, pathGroupPartition)
2020
{
2121
if (requestResponseSettings == null) throw new ArgumentNullException(nameof(requestResponseSettings));
2222

23-
MessageProcessor = new ResponseMessageProcessor<EventData>(MessageBus.LoggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, currentTimeProvider);
23+
MessageProcessor = new ResponseMessageProcessor<EventData>(MessageBus.LoggerFactory, requestResponseSettings, messageProvider, pendingRequestStore, timeProvider);
2424
CheckpointTrigger = new CheckpointTrigger(requestResponseSettings, MessageBus.LoggerFactory);
2525
}
2626
}

src/SlimMessageBus.Host.AzureEventHub/EventHubMessageBus.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,8 @@ protected override async Task CreateConsumers()
7272
object MessageProvider(Type messageType, EventData transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Body.ToArray());
7373

7474
_logger.LogInformation("Creating response consumer for Path: {Path}, Group: {Group}", groupPath.Path, groupPath.Group);
75-
AddConsumer(new EhGroupConsumer([Settings.RequestResponse], this, groupPath, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, MessageProvider, PendingRequestStore, CurrentTimeProvider)));
75+
AddConsumer(new EhGroupConsumer([Settings.RequestResponse], this, groupPath, groupPathPartition => new EhPartitionConsumerForResponses(this, Settings.RequestResponse, groupPathPartition, MessageProvider, PendingRequestStore, TimeProvider)));
7676
}
77-
7877
}
7978

8079
protected override async ValueTask DisposeAsyncCore()

src/SlimMessageBus.Host.AzureServiceBus/ServiceBusMessageBus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ void AddConsumerFrom(TopicSubscriptionParams topicSubscription, IMessageProcesso
120120
Settings.RequestResponse,
121121
MessageProvider,
122122
PendingRequestStore,
123-
CurrentTimeProvider);
123+
TimeProvider);
124124

125125
AddConsumerFrom(topicSubscription, messageProcessor, [Settings.RequestResponse]);
126126
}

src/SlimMessageBus.Host.Configuration/Settings/MessageBusSettings.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
namespace SlimMessageBus.Host;
1+
namespace SlimMessageBus.Host;
22

3-
public class MessageBusSettings : HasProviderExtensions
3+
public class MessageBusSettings : HasProviderExtensions
44
{
55
private IServiceProvider _serviceProvider;
66
private readonly IList<MessageBusSettings> _children;
@@ -16,11 +16,11 @@ public IServiceProvider ServiceProvider
1616

1717
/// <summary>
1818
/// The bus name.
19-
/// </summary>
19+
/// </summary>
2020
public string Name { get; set; }
21-
public IList<ProducerSettings> Producers { get; }
22-
public IList<ConsumerSettings> Consumers { get; }
23-
public RequestResponseSettings RequestResponse { get; set; }
21+
public IList<ProducerSettings> Producers { get; }
22+
public IList<ConsumerSettings> Consumers { get; }
23+
public RequestResponseSettings RequestResponse { get; set; }
2424
public Type SerializerType { get; set; }
2525
public Type MessageTypeResolverType { get; set; }
2626

@@ -39,20 +39,20 @@ public IServiceProvider ServiceProvider
3939
/// </summary>
4040
public bool AutoStartConsumers { get; set; }
4141

42-
public MessageBusSettings(MessageBusSettings parent = null)
42+
public MessageBusSettings(MessageBusSettings parent = null)
4343
{
4444
_children = [];
45-
Producers = [];
46-
Consumers = [];
45+
Producers = [];
46+
Consumers = [];
4747
SerializerType = typeof(IMessageSerializerProvider);
4848
AutoStartConsumers = true;
4949

5050
if (parent != null)
5151
{
5252
Parent = parent;
5353
parent._children.Add(this);
54-
}
55-
}
54+
}
55+
}
5656

5757
public virtual void MergeFrom(MessageBusSettings settings)
5858
{

src/SlimMessageBus.Host.Kafka/Consumer/KafkaPartitionConsumerForResponses.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public KafkaPartitionConsumerForResponses(
1616
IKafkaCommitController commitController,
1717
MessageProvider<ConsumeResult> messageProvider,
1818
IPendingRequestStore pendingRequestStore,
19-
ICurrentTimeProvider currentTimeProvider,
19+
TimeProvider timeProvider,
2020
IMessageSerializer headerSerializer)
2121
: base(
2222
loggerFactory,
@@ -30,7 +30,7 @@ public KafkaPartitionConsumerForResponses(
3030
requestResponseSettings,
3131
messageProvider,
3232
pendingRequestStore,
33-
currentTimeProvider))
33+
timeProvider))
3434
{
3535
}
3636
}

src/SlimMessageBus.Host.Kafka/KafkaMessageBus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ IKafkaPartitionConsumer ResponseProcessorFactory(TopicPartition tp, IKafkaCommit
7676
var messageSerializer = SerializerProvider.GetSerializer(tp.Topic);
7777
object MessageProvider(Type messageType, ConsumeResult<Ignore, byte[]> transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Message.Value);
7878

79-
return new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, MessageProvider, PendingRequestStore, CurrentTimeProvider, headerSerializer);
79+
return new KafkaPartitionConsumerForResponses(LoggerFactory, Settings.RequestResponse, Settings.RequestResponse.GetGroup(), tp, cc, MessageProvider, PendingRequestStore, TimeProvider, headerSerializer);
8080
}
8181

8282
foreach (var consumersByGroup in Settings.Consumers.GroupBy(x => x.GetGroup()))

src/SlimMessageBus.Host.Mqtt/MqttMessageBus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ void AddTopicConsumer(IEnumerable<AbstractConsumerSettings> consumerSettings, st
8787
Settings.RequestResponse,
8888
messageProvider: MessageProvider,
8989
PendingRequestStore,
90-
CurrentTimeProvider);
90+
TimeProvider);
9191

9292
AddTopicConsumer([Settings.RequestResponse], path, processor);
9393
}

src/SlimMessageBus.Host.Nats/NatsMessageBus.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ protected override async Task CreateConsumers()
7878
var messageSerializer = SerializerProvider.GetSerializer(subject);
7979
object MessageProvider(Type messageType, NatsMsg<byte[]> transportMessage) => messageSerializer.Deserialize(messageType, transportMessage.Data);
8080

81-
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, MessageProvider, PendingRequestStore, CurrentTimeProvider);
81+
var processor = new ResponseMessageProcessor<NatsMsg<byte[]>>(LoggerFactory, Settings.RequestResponse, MessageProvider, PendingRequestStore, TimeProvider);
8282
AddSubjectConsumer([], subject, processor);
8383
}
8484
}

src/SlimMessageBus.Host.Outbox.Sql.DbContext/Configuration/MessageBusBuilderExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public static MessageBusBuilder AddOutboxUsingDbContext<TDbContext>(this Message
1616
settings,
1717
svp.GetRequiredService<SqlOutboxTemplate>(),
1818
settings.IdGeneration.GuidGenerator ?? (IGuidGenerator)svp.GetRequiredService(settings.IdGeneration.GuidGeneratorType),
19-
svp.GetRequiredService<ICurrentTimeProvider>(),
19+
svp.GetRequiredService<TimeProvider>(),
2020
svp.GetRequiredService<IInstanceIdProvider>(),
2121
svp.GetRequiredService<TDbContext>(),
2222
svp.GetRequiredService<ISqlTransactionService>()

0 commit comments

Comments
 (0)