Skip to content

Commit 557c314

Browse files
committed
Try to fix transport healthy check bug. #503
1 parent c1c4c44 commit 557c314

File tree

7 files changed

+17
-14
lines changed

7 files changed

+17
-14
lines changed

src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClient.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,7 @@ public void Dispose()
109109
_consumerClient?.CloseAsync().Wait(1500);
110110
}
111111

112-
#region private methods
113-
114-
private async Task ConnectAsync()
112+
public async Task ConnectAsync()
115113
{
116114
if (_consumerClient != null)
117115
{
@@ -157,6 +155,8 @@ private async Task ConnectAsync()
157155
}
158156
}
159157

158+
#region private methods
159+
160160
private Task OnConsumerReceived(Message message, CancellationToken token)
161161
{
162162
var header = message.UserProperties.ToDictionary(x => x.Key, y => y.Value?.ToString());

src/DotNetCore.CAP.AzureServiceBus/AzureServiceBusConsumerClientFactory.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ public IConsumerClient Create(string groupId)
2525
try
2626
{
2727
var logger = _loggerFactory.CreateLogger(typeof(AzureServiceBusConsumerClient));
28-
return new AzureServiceBusConsumerClient(logger, groupId, _asbOptions);
28+
var client = new AzureServiceBusConsumerClient(logger, groupId, _asbOptions);
29+
client.ConnectAsync().GetAwaiter().GetResult();
30+
return client;
2931
}
3032
catch (System.Exception e)
3133
{

src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,7 @@ public void Dispose()
9595
_consumerClient?.Dispose();
9696
}
9797

98-
#region private methods
99-
100-
private void Connect()
98+
public void Connect()
10199
{
102100
if (_consumerClient != null)
103101
{
@@ -134,7 +132,5 @@ private void ConsumerClient_OnConsumeError(IConsumer<string, byte[]> consumer, E
134132
};
135133
OnLog?.Invoke(null, logArgs);
136134
}
137-
138-
#endregion private methods
139135
}
140136
}

src/DotNetCore.CAP.Kafka/KafkaConsumerClientFactory.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ public IConsumerClient Create(string groupId)
1919
{
2020
try
2121
{
22-
return new KafkaConsumerClient(groupId, _kafkaOptions);
22+
var client = new KafkaConsumerClient(groupId, _kafkaOptions);
23+
client.Connect();
24+
return client;
2325
}
2426
catch (System.Exception e)
2527
{

src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,7 @@ public void Dispose()
9595
_connection?.Dispose();
9696
}
9797

98-
#region events
99-
100-
private void Connect()
98+
public void Connect()
10199
{
102100
if (_connection != null)
103101
{
@@ -129,6 +127,8 @@ private void Connect()
129127
}
130128
}
131129

130+
#region events
131+
132132
private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e)
133133
{
134134
var args = new LogMessageEventArgs

src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClientFactory.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,9 @@ public IConsumerClient Create(string groupId)
2121
{
2222
try
2323
{
24-
return new RabbitMQConsumerClient(groupId, _connectionChannelPool, _rabbitMQOptions);
24+
var client = new RabbitMQConsumerClient(groupId, _connectionChannelPool, _rabbitMQOptions);
25+
client.Connect();
26+
return client;
2527
}
2628
catch (System.Exception e)
2729
{

src/DotNetCore.CAP/Internal/IConsumerRegister.Default.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ private void WriteLog(object sender, LogMessageEventArgs logmsg)
235235
_logger.LogWarning("RabbitMQ consumer unregistered. --> " + logmsg.Reason);
236236
break;
237237
case MqLogType.ConsumerShutdown:
238+
_isHealthy = false;
238239
_logger.LogWarning("RabbitMQ consumer shutdown. --> " + logmsg.Reason);
239240
break;
240241
case MqLogType.ConsumeError:

0 commit comments

Comments
 (0)