You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
HelpText="The hostname on which RabbitMQ is running. The default is `localhost`.")]
22
+
publicstringRabbitMQHost{get;set;}="localhost";
23
+
24
+
[SeqAppSetting(
25
+
DisplayName="RabbitMQ Virtual Host",
26
+
IsOptional=true,
27
+
HelpText="The virtual host in RabbitMQ. The default is `/`.")]
28
+
publicstringRabbitMQVHost{get;set;}="/";
29
+
30
+
[SeqAppSetting(
31
+
DisplayName="RabbitMQ port",
32
+
IsOptional=true,
33
+
HelpText="The port on which the RabbitMQ server is listening. The default is `5672`.")]
34
+
publicintRabbitMQPort{get;set;}=5672;
35
+
36
+
[SeqAppSetting(
37
+
DisplayName="RabbitMQ user",
38
+
IsOptional=true,
39
+
HelpText="The username provided when connecting to RabbitMQ. The default is `guest`.")]
40
+
publicstringRabbitMQUser{get;set;}="guest";
41
+
42
+
[SeqAppSetting(
43
+
DisplayName="RabbitMQ password",
44
+
IsOptional=true,
45
+
InputType=SettingInputType.Password,
46
+
HelpText="The password provided when connecting to RabbitMQ. The default is `guest`.")]
47
+
publicstringRabbitMQPassword{get;set;}="guest";
48
+
49
+
[SeqAppSetting(
50
+
DisplayName="RabbitMQ queue",
51
+
IsOptional=true,
52
+
HelpText="The RabbitMQ queue name to receive events from. The default is `Logs`.")]
53
+
publicstringRabbitMQQueue{get;set;}="logs";
54
+
55
+
[SeqAppSetting(
56
+
DisplayName="Require SSL",
57
+
IsOptional=true,
58
+
HelpText="Whether or not the connection is with SSL. The default is false.")]
59
+
publicboolIsSsl{get;set;}
60
+
61
+
[SeqAppSetting(
62
+
DisplayName="Durable",
63
+
IsOptional=true,
64
+
HelpText="Whether or not the queue is durable. The default is false.")]
65
+
publicboolIsQueueDurable{get;set;}
66
+
67
+
[SeqAppSetting(
68
+
DisplayName="Exclusive",
69
+
IsOptional=true,
70
+
HelpText="Whether or not the queue is exclusive. The default is false.")]
71
+
publicboolIsQueueExclusive{get;set;}
72
+
73
+
[SeqAppSetting(
74
+
DisplayName="Auto-delete",
75
+
IsOptional=true,
76
+
HelpText="Whether or not the queue subscription is durable. The default is false.")]
77
+
publicboolIsQueueAutoDelete{get;set;}
78
+
79
+
[SeqAppSetting(
80
+
DisplayName="Auto-ACK",
81
+
IsOptional=true,
82
+
HelpText="Whether or not messages should be auto-acknowledged. The default is true.")]
83
+
publicboolIsReceiveAutoAck{get;set;}=true;
84
+
85
+
[SeqAppSetting(
86
+
DisplayName="Dead Letter Exchange",
87
+
IsOptional=true,
88
+
HelpText="The name of the dead letter exchange associated with this queue. If specified, the exchange will be used when declaring the queue, otherwise no dead lettering will be configured.")]
89
+
publicstringDlx{get;set;}
90
+
91
+
publicvoidStart(TextWriterinputWriter)
13
92
{
14
-
RabbitMQListener_listener;
15
-
16
-
[SeqAppSetting(
17
-
DisplayName="RabbitMQ host",
18
-
IsOptional=true,
19
-
HelpText="The hostname on which RabbitMQ is running. The default is `localhost`.")]
20
-
publicstringRabbitMQHost{get;set;}="localhost";
21
-
22
-
[SeqAppSetting(
23
-
DisplayName="RabbitMQ Virtual Host",
24
-
IsOptional=true,
25
-
HelpText="The virtual host in RabbitMQ. The default is `/`.")]
26
-
publicstringRabbitMQVHost{get;set;}="/";
27
-
28
-
[SeqAppSetting(
29
-
DisplayName="RabbitMQ port",
30
-
IsOptional=true,
31
-
HelpText="The port on which the RabbitMQ server is listening. The default is `5672`.")]
32
-
publicintRabbitMQPort{get;set;}=5672;
33
-
34
-
[SeqAppSetting(
35
-
DisplayName="RabbitMQ user",
36
-
IsOptional=true,
37
-
HelpText="The username provided when connecting to RabbitMQ. The default is `guest`.")]
38
-
publicstringRabbitMQUser{get;set;}="guest";
39
-
40
-
[SeqAppSetting(
41
-
DisplayName="RabbitMQ password",
42
-
IsOptional=true,
43
-
InputType=SettingInputType.Password,
44
-
HelpText="The password provided when connecting to RabbitMQ. The default is `guest`.")]
45
-
publicstringRabbitMQPassword{get;set;}="guest";
46
-
47
-
[SeqAppSetting(
48
-
DisplayName="RabbitMQ queue",
49
-
IsOptional=true,
50
-
HelpText="The RabbitMQ queue name to receive events from. The default is `Logs`.")]
51
-
publicstringRabbitMQQueue{get;set;}="logs";
52
-
53
-
[SeqAppSetting(
54
-
DisplayName="Require SSL",
55
-
IsOptional=true,
56
-
HelpText="Whether or not the connection is with SSL. The default is false.")]
57
-
publicboolIsSsl{get;set;}
58
-
59
-
[SeqAppSetting(
60
-
DisplayName="Durable",
61
-
IsOptional=true,
62
-
HelpText="Whether or not the queue is durable. The default is false.")]
63
-
publicboolIsQueueDurable{get;set;}
64
-
65
-
[SeqAppSetting(
66
-
DisplayName="Exclusive",
67
-
IsOptional=true,
68
-
HelpText="Whether or not the queue is exclusive. The default is false.")]
69
-
publicboolIsQueueExclusive{get;set;}
70
-
71
-
[SeqAppSetting(
72
-
DisplayName="Auto-delete",
73
-
IsOptional=true,
74
-
HelpText="Whether or not the queue subscription is durable. The default is false.")]
75
-
publicboolIsQueueAutoDelete{get;set;}
76
-
77
-
[SeqAppSetting(
78
-
DisplayName="Auto-ACK",
79
-
IsOptional=true,
80
-
HelpText="Whether or not messages should be auto-acknowledged. The default is true.")]
81
-
publicboolIsReceiveAutoAck{get;set;}=true;
82
-
83
-
[SeqAppSetting(
84
-
DisplayName="Dead Letter Exchange",
85
-
IsOptional=true,
86
-
HelpText="The name of the dead letter exchange associated with this queue. If specified, the exchange will be used when declaring the queue, otherwise no dead lettering will be configured.")]
87
-
publicstringDlx{get;set;}
88
-
89
-
publicvoidStart(TextWriterinputWriter)
93
+
varsync=newobject();
94
+
TaskReceiveAsync(ReadOnlyMemory<byte>body)
90
95
{
91
-
varsync=newobject();
92
-
voidReceive(ReadOnlyMemory<byte>body)
96
+
try
93
97
{
94
-
try
95
-
{
96
-
lock(sync)
97
-
{
98
-
varclef=Encoding.UTF8.GetString(body.ToArray());
99
-
inputWriter.WriteLine(clef);
100
-
}
101
-
}
102
-
catch(Exceptionex)
98
+
lock(sync)
103
99
{
104
-
Log.Error(ex,"A received message could not be decoded");
100
+
varclef=Encoding.UTF8.GetString(body.ToArray());
101
+
inputWriter.WriteLine(clef);
105
102
}
106
103
}
107
-
108
-
_listener=newRabbitMQListener(
109
-
Receive,
110
-
RabbitMQHost,
111
-
RabbitMQVHost,
112
-
RabbitMQPort,
113
-
RabbitMQUser,
114
-
RabbitMQPassword,
115
-
RabbitMQQueue,
116
-
IsSsl,
117
-
IsQueueDurable,
118
-
IsQueueAutoDelete,
119
-
IsQueueExclusive,
120
-
IsReceiveAutoAck,
121
-
Dlx);
104
+
catch(Exceptionex)
105
+
{
106
+
Log.Error(ex,"A received message could not be decoded");
107
+
}
108
+
109
+
returnTask.CompletedTask;
122
110
}
123
111
124
-
publicvoidStop()
125
-
{
126
-
_listener.Close();
127
-
}
112
+
// Not a deadlock risk on .NET 8, but ideally we'll introduce `IPublishJsonAsync` and provide
113
+
// async start/stop/dispose variants.
114
+
_listener=RabbitMQListener.CreateAsync(
115
+
ReceiveAsync,
116
+
RabbitMQHost,
117
+
RabbitMQVHost,
118
+
RabbitMQPort,
119
+
RabbitMQUser,
120
+
RabbitMQPassword,
121
+
RabbitMQQueue,
122
+
IsSsl,
123
+
IsQueueDurable,
124
+
IsQueueAutoDelete,
125
+
IsQueueExclusive,
126
+
IsReceiveAutoAck,
127
+
Dlx).Result;
128
+
}
128
129
129
-
publicvoidDispose()
130
-
{
131
-
_listener?.Dispose();
132
-
}
130
+
publicvoidStop()
131
+
{
132
+
// Not a deadlock risk on .NET 8, but ideally we'll introduce `IPublishJsonAsync` and provide
0 commit comments