40
40
41
41
using System ;
42
42
using System . Collections ;
43
+ using System . Collections . Concurrent ;
43
44
using System . IO ;
44
45
45
- #if NETFX_CORE || NET4 // For Windows 8 Store, but could be .NET 4.0 and greater
46
+ using System . Threading ;
46
47
using System . Threading . Tasks ;
47
- #endif
48
48
49
49
using RabbitMQ . Client . Events ;
50
50
using RabbitMQ . Client . Exceptions ;
@@ -59,7 +59,7 @@ namespace RabbitMQ.Client.MessagePatterns
59
59
///</para>
60
60
///<para>
61
61
/// Once created, the Subscription consumes from a queue (using a
62
- /// QueueingBasicConsumer ). Received deliveries can be retrieved
62
+ /// EventingBasicConsumer ). Received deliveries can be retrieved
63
63
/// by calling Next(), or by using the Subscription as an
64
64
/// IEnumerator in, for example, a foreach loop.
65
65
///</para>
@@ -76,8 +76,16 @@ namespace RabbitMQ.Client.MessagePatterns
76
76
public class Subscription : ISubscription
77
77
{
78
78
protected readonly object m_eventLock = new object ( ) ;
79
- protected volatile QueueingBasicConsumer m_consumer ;
79
+ protected volatile EventingBasicConsumer m_consumer ;
80
+ private BlockingCollection < BasicDeliverEventArgs > m_queue =
81
+ new BlockingCollection < BasicDeliverEventArgs > ( new ConcurrentQueue < BasicDeliverEventArgs > ( ) ) ;
82
+
83
+ private CancellationTokenSource m_queueCts = new CancellationTokenSource ( ) ;
80
84
85
+ #if NETFX_CORE || NET4
86
+ private ConcurrentQueue < TaskCompletionSource < BasicDeliverEventArgs > > m_waiting =
87
+ new ConcurrentQueue < TaskCompletionSource < BasicDeliverEventArgs > > ( ) ;
88
+ #endif
81
89
///<summary>Creates a new Subscription in "noAck" mode,
82
90
///consuming from a named queue.</summary>
83
91
public Subscription ( IModel model , string queueName )
@@ -92,7 +100,12 @@ public Subscription(IModel model, string queueName, bool noAck)
92
100
Model = model ;
93
101
QueueName = queueName ;
94
102
NoAck = noAck ;
95
- m_consumer = new QueueingBasicConsumer ( Model ) ;
103
+ m_consumer = new EventingBasicConsumer ( Model ) ;
104
+ #if NETFX_CORE || NET4
105
+ m_consumer . Received += ( sender , args ) => QueueAdd ( args ) ;
106
+ #else
107
+ m_consumer . Received += ( sender , args ) => m_queue . Add ( args ) ;
108
+ #endif
96
109
ConsumerTag = Model . BasicConsume ( QueueName , NoAck , m_consumer ) ;
97
110
m_consumer . ConsumerCancelled += HandleConsumerCancelled ;
98
111
LatestEvent = null ;
@@ -105,8 +118,9 @@ public Subscription(IModel model, string queueName, bool noAck, string consumerT
105
118
Model = model ;
106
119
QueueName = queueName ;
107
120
NoAck = noAck ;
108
- m_consumer = new QueueingBasicConsumer ( Model ) ;
121
+ m_consumer = new EventingBasicConsumer ( Model ) ;
109
122
m_consumer . ConsumerCancelled += HandleConsumerCancelled ;
123
+ m_consumer . Received += ( sender , args ) => m_queue . Add ( args ) ;
110
124
ConsumerTag = Model . BasicConsume ( QueueName , NoAck , consumerTag , m_consumer ) ;
111
125
LatestEvent = null ;
112
126
}
@@ -228,6 +242,17 @@ public void Close()
228
242
229
243
ConsumerTag = null ;
230
244
}
245
+
246
+ m_queueCts . Cancel ( true ) ;
247
+ m_queue . Dispose ( ) ;
248
+ m_queue = null ;
249
+ #if NETFX_CORE || NET4
250
+ var exn = new EndOfStreamException ( "Subscription closed" ) ;
251
+ foreach ( var tsc in m_waiting )
252
+ {
253
+ tsc . TrySetException ( exn ) ;
254
+ }
255
+ #endif
231
256
}
232
257
catch ( OperationInterruptedException )
233
258
{
@@ -301,7 +326,7 @@ public BasicDeliverEventArgs Next()
301
326
// Alias the pointer as otherwise it may change out
302
327
// from under us by the operation of Close() from
303
328
// another thread.
304
- QueueingBasicConsumer consumer = m_consumer ;
329
+ EventingBasicConsumer consumer = m_consumer ;
305
330
try
306
331
{
307
332
if ( consumer == null || Model . IsClosed )
@@ -310,7 +335,7 @@ public BasicDeliverEventArgs Next()
310
335
}
311
336
else
312
337
{
313
- BasicDeliverEventArgs bdea = consumer . Queue . Dequeue ( ) ;
338
+ BasicDeliverEventArgs bdea = m_queue . Take ( m_queueCts . Token ) ;
314
339
MutateLatestEvent ( bdea ) ;
315
340
}
316
341
}
@@ -322,31 +347,49 @@ public BasicDeliverEventArgs Next()
322
347
}
323
348
324
349
#if NETFX_CORE || NET4
325
- public async Task < BasicDeliverEventArgs > NextAsync ( ) {
326
- try {
350
+ public Task < BasicDeliverEventArgs > NextAsync ( )
351
+ {
352
+ try
353
+ {
327
354
// Alias the pointer as otherwise it may change out
328
355
// from under us by the operation of Close() from
329
356
// another thread.
330
- QueueingBasicConsumer consumer = m_consumer ;
331
- if ( consumer == null ) {
357
+ var queue = m_queue ;
358
+ if ( queue == null || Model . IsClosed )
359
+ {
332
360
// Closed!
333
361
MutateLatestEvent ( null ) ;
334
362
}
335
- else {
336
- MutateLatestEvent ( await consumer . Queue . DequeueAsync ( ) ) ;
363
+ else
364
+ {
365
+ BasicDeliverEventArgs evt = null ;
366
+ if ( queue . TryTake ( out evt ) )
367
+ {
368
+ MutateLatestEvent ( evt ) ;
369
+ }
370
+ else
371
+ {
372
+ var tcs = new TaskCompletionSource < BasicDeliverEventArgs > ( ) ;
373
+ m_waiting . Enqueue ( tcs ) ;
374
+ return tcs . Task ;
375
+ }
337
376
}
338
377
}
339
- catch ( AggregateException ex ) {
378
+ catch ( AggregateException ex )
379
+ {
340
380
// since tasks wrap exceptions as AggregateException,
341
381
// reach in and check if the EndOfStream exception is what happened
342
- if ( ex . InnerException is EndOfStreamException ) {
382
+ if ( ex . InnerException is EndOfStreamException )
383
+ {
343
384
MutateLatestEvent ( null ) ;
344
385
}
345
386
}
346
- catch ( EndOfStreamException ) {
387
+ catch ( EndOfStreamException )
388
+ {
347
389
MutateLatestEvent ( null ) ;
348
390
}
349
- return LatestEvent ;
391
+
392
+ return Task . FromResult ( LatestEvent ) ;
350
393
}
351
394
#endif
352
395
@@ -401,7 +444,7 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
401
444
// Alias the pointer as otherwise it may change out
402
445
// from under us by the operation of Close() from
403
446
// another thread.
404
- QueueingBasicConsumer consumer = m_consumer ;
447
+ var consumer = m_consumer ;
405
448
if ( consumer == null || Model . IsClosed )
406
449
{
407
450
MutateLatestEvent ( null ) ;
@@ -411,7 +454,7 @@ public bool Next(int millisecondsTimeout, out BasicDeliverEventArgs result)
411
454
else
412
455
{
413
456
BasicDeliverEventArgs qValue ;
414
- if ( ! consumer . Queue . Dequeue ( millisecondsTimeout , out qValue ) )
457
+ if ( ! m_queue . TryTake ( out qValue , millisecondsTimeout ) )
415
458
{
416
459
result = null ;
417
460
return false ;
@@ -483,5 +526,22 @@ private void HandleConsumerCancelled(object sender, ConsumerEventArgs e)
483
526
MutateLatestEvent ( null ) ;
484
527
}
485
528
}
529
+
530
+ #if NETFX_CORE || NET4
531
+ private void QueueAdd ( BasicDeliverEventArgs args )
532
+ {
533
+ //NB: as long as there are async awaiters sync callers will never be served
534
+ //this is not ideal but consistent with how SharedQueue behaves
535
+ TaskCompletionSource < BasicDeliverEventArgs > tsc ;
536
+ if ( m_waiting . TryDequeue ( out tsc ) && tsc . TrySetResult ( args ) )
537
+ {
538
+ return ;
539
+ }
540
+ else
541
+ {
542
+ m_queue . Add ( args ) ;
543
+ }
544
+ }
545
+ #endif
486
546
}
487
547
}
0 commit comments