@@ -15,17 +15,26 @@ namespace Holon.Protocol
1515 internal class ObservableConsumer : IBasicConsumer , IObservable < InboundMessage >
1616 {
1717 #region Fields
18- private IModel _channel ;
18+ private Broker _broker ;
1919 private int _cancelled ;
2020 private List < SubscribedObserver > _subscriptions = new List < SubscribedObserver > ( 1 ) ;
2121 #endregion
2222
23+ /// <summary>
24+ /// Gets the underlying broker.
25+ /// </summary>
26+ public Broker Broker {
27+ get {
28+ return _broker ;
29+ }
30+ }
31+
2332 /// <summary>
2433 /// Gets the underlying channel.
2534 /// </summary>
2635 public IModel Model {
2736 get {
28- return _channel ;
37+ return _broker . Channel ;
2938 }
3039 }
3140
@@ -87,7 +96,7 @@ public void HandleBasicConsumeOk(string consumerTag) {
8796 public void HandleBasicDeliver ( string consumerTag , ulong deliveryTag , bool redelivered , string exchange , string routingKey , IBasicProperties properties , byte [ ] body ) {
8897 lock ( _subscriptions ) {
8998 foreach ( SubscribedObserver observer in _subscriptions )
90- observer . Observer . OnNext ( new InboundMessage ( _channel , deliveryTag , redelivered , exchange , routingKey , properties , body ) ) ;
99+ observer . Observer . OnNext ( new InboundMessage ( Model , deliveryTag , redelivered , exchange , routingKey , properties , body ) ) ;
91100 }
92101 }
93102
@@ -170,9 +179,9 @@ struct SubscribedObserver
170179 /// <summary>
171180 /// Creates a new observable consumer.
172181 /// </summary>
173- /// <param name="channel ">The channel .</param>
174- public ObservableConsumer ( IModel channel ) {
175- _channel = channel ;
182+ /// <param name="broker ">The broker .</param>
183+ public ObservableConsumer ( Broker broker ) {
184+ _broker = broker ;
176185 }
177186 #endregion
178187 }
0 commit comments