1- using System ;
2- using System . Reactive ;
3- using System . Reactive . Linq ;
4- using System . Reactive . Subjects ;
5- using System . Reactive . Threading . Tasks ;
6- using System . Threading . Tasks ;
7- using WampSharp . Auxiliary . Client ;
8- using WampSharp . Core . Client ;
9- using WampSharp . Core . Contracts . V1 ;
10- using WampSharp . Core . Listener ;
11- using WampSharp . PubSub . Client ;
12- using WampSharp . Rpc . Client ;
13-
14- namespace WampSharp
15- {
16- public class WampChannel < TMessage > : IWampChannel < TMessage >
17- {
18- private readonly IControlledWampConnection < TMessage > mConnection ;
19- private readonly IWampRpcClientFactory < TMessage > mRpcClientFactory ;
20- private readonly IWampPubSubClientFactory < TMessage > mPubSubClientFactory ;
21- private readonly WampServerProxyBuilder < TMessage , IWampClient < TMessage > , IWampServer > mServerProxyBuilder ;
22- private readonly IWampClientConnectionMonitor mConnectionMonitor ;
23-
24- public WampChannel ( IControlledWampConnection < TMessage > connection ,
25- IWampRpcClientFactory < TMessage > rpcClientFactory ,
26- IWampPubSubClientFactory < TMessage > pubSubClientFactory ,
27- WampServerProxyBuilder < TMessage , IWampClient < TMessage > , IWampServer > serverProxyBuilder ,
28- IWampAuxiliaryClientFactory < TMessage > connectionMonitorFactory )
29- {
30- mConnection = connection ;
31- mRpcClientFactory = rpcClientFactory ;
32- mPubSubClientFactory = pubSubClientFactory ;
33- mServerProxyBuilder = serverProxyBuilder ;
34- mConnectionMonitor = connectionMonitorFactory . CreateMonitor ( connection ) ;
35- }
36-
37- public IWampServer GetServerProxy ( IWampClient < TMessage > callbackClient )
38- {
39- return mServerProxyBuilder . Create ( callbackClient , mConnection ) ;
40- }
41-
42- public TProxy GetRpcProxy < TProxy > ( ) where TProxy : class
43- {
44- return mRpcClientFactory . GetClient < TProxy > ( mConnection ) ;
45- }
46-
47- public dynamic GetDynamicRpcProxy ( )
48- {
49- return mRpcClientFactory . GetDynamicClient ( mConnection ) ;
50- }
51-
52- public ISubject < T > GetSubject < T > ( string topicUri )
53- {
54- return mPubSubClientFactory . GetSubject < T > ( topicUri , mConnection ) ;
55- }
56-
57- public void MapPrefix ( string prefix , string uri )
58- {
59- mConnectionMonitor . MapPrefix ( prefix , uri ) ;
60- }
61-
62- public IWampClientConnectionMonitor GetMonitor ( )
63- {
64- return mConnectionMonitor ;
65- }
66-
67- public void Open ( )
68- {
69- Task task = OpenAsync ( ) ;
70- task . Wait ( ) ;
71- }
72-
73- public Task OpenAsync ( )
74- {
75- var connectedObservable =
76- Observable . FromEventPattern < WampConnectionEstablishedEventArgs >
77- ( x => mConnectionMonitor . ConnectionEstablished += x ,
78- x => mConnectionMonitor . ConnectionEstablished -= x )
79- . Select ( x => Unit . Default ) ;
80-
81- var errorObservable =
1+ using System ;
2+ using System . Reactive ;
3+ using System . Reactive . Linq ;
4+ using System . Reactive . Subjects ;
5+ using System . Reactive . Threading . Tasks ;
6+ using System . Threading . Tasks ;
7+ using WampSharp . Auxiliary . Client ;
8+ using WampSharp . Core . Client ;
9+ using WampSharp . Core . Contracts . V1 ;
10+ using WampSharp . Core . Listener ;
11+ using WampSharp . PubSub . Client ;
12+ using WampSharp . Rpc . Client ;
13+
14+ namespace WampSharp
15+ {
16+ public class WampChannel < TMessage > : IWampChannel < TMessage >
17+ {
18+ private readonly IControlledWampConnection < TMessage > mConnection ;
19+ private readonly IWampRpcClientFactory < TMessage > mRpcClientFactory ;
20+ private readonly IWampPubSubClientFactory < TMessage > mPubSubClientFactory ;
21+ private readonly WampServerProxyBuilder < TMessage , IWampClient < TMessage > , IWampServer > mServerProxyBuilder ;
22+ private readonly IWampClientConnectionMonitor mConnectionMonitor ;
23+
24+ public WampChannel ( IControlledWampConnection < TMessage > connection ,
25+ IWampRpcClientFactory < TMessage > rpcClientFactory ,
26+ IWampPubSubClientFactory < TMessage > pubSubClientFactory ,
27+ WampServerProxyBuilder < TMessage , IWampClient < TMessage > , IWampServer > serverProxyBuilder ,
28+ IWampAuxiliaryClientFactory < TMessage > connectionMonitorFactory )
29+ {
30+ mConnection = connection ;
31+ mRpcClientFactory = rpcClientFactory ;
32+ mPubSubClientFactory = pubSubClientFactory ;
33+ mServerProxyBuilder = serverProxyBuilder ;
34+ mConnectionMonitor = connectionMonitorFactory . CreateMonitor ( connection ) ;
35+ }
36+
37+ public IWampServer GetServerProxy ( IWampClient < TMessage > callbackClient )
38+ {
39+ return mServerProxyBuilder . Create ( callbackClient , mConnection ) ;
40+ }
41+
42+ public TProxy GetRpcProxy < TProxy > ( ) where TProxy : class
43+ {
44+ return mRpcClientFactory . GetClient < TProxy > ( mConnection ) ;
45+ }
46+
47+ public dynamic GetDynamicRpcProxy ( )
48+ {
49+ return mRpcClientFactory . GetDynamicClient ( mConnection ) ;
50+ }
51+
52+ public ISubject < T > GetSubject < T > ( string topicUri )
53+ {
54+ return mPubSubClientFactory . GetSubject < T > ( topicUri , mConnection ) ;
55+ }
56+
57+ public void MapPrefix ( string prefix , string uri )
58+ {
59+ mConnectionMonitor . MapPrefix ( prefix , uri ) ;
60+ }
61+
62+ public IWampClientConnectionMonitor GetMonitor ( )
63+ {
64+ return mConnectionMonitor ;
65+ }
66+
67+ public void Open ( )
68+ {
69+ Task task = OpenAsync ( ) ;
70+
71+ try
72+ {
73+ task . Wait ( ) ;
74+ }
75+ catch ( AggregateException ex )
76+ {
77+ throw ex . InnerException ;
78+ }
79+ }
80+
81+ public Task OpenAsync ( )
82+ {
83+ var connectedObservable =
84+ Observable . FromEventPattern < WampConnectionEstablishedEventArgs >
85+ ( x => mConnectionMonitor . ConnectionEstablished += x ,
86+ x => mConnectionMonitor . ConnectionEstablished -= x )
87+ . Select ( x => Unit . Default ) ;
88+
89+ var errorObservable =
8290 Observable . FromEventPattern < WampConnectionErrorEventArgs >
8391 ( x => mConnectionMonitor . ConnectionError += x ,
8492 x => mConnectionMonitor . ConnectionError -= x )
85- . Select ( x => Unit . Default ) ;
93+ . Select ( x => Observable . Throw < Unit > ( x . EventArgs . Exception ) )
94+ . SelectMany ( x => x ) ;
8695
8796 var completedObservable =
8897 Observable . FromEventPattern
8998 ( x => mConnectionMonitor . ConnectionLost += x ,
9099 x => mConnectionMonitor . ConnectionLost -= x )
91- . Select ( x => Unit . Default ) ;
100+ . Select ( x => Unit . Default ) ;
92101
93102 // Combining the observables and propagating the one that reatcs first
94103 // because we have to complete the task either when a connection is established or
95104 // an error (i.e. exception) occurs.
96- IObservable < Unit > combined = connectedObservable . Amb ( errorObservable ) . Amb ( completedObservable ) ;
105+ var combined = connectedObservable . Amb ( errorObservable ) . Amb ( completedObservable ) ;
97106
98- IObservable < Unit > firstConnectionOrError =
107+ var firstConnectionOrError =
99108 combined . Take ( 1 ) ;
100109
101- Task task = firstConnectionOrError . ToTask ( ) ;
102-
103- mConnection . Connect ( ) ;
104-
105- return task ;
106- }
107-
108- public void Close ( )
109- {
110- mConnection . OnCompleted ( ) ;
111- }
112- }
110+ Task task = firstConnectionOrError . ToTask ( ) ;
111+
112+ mConnection . Connect ( ) ;
113+
114+ return task ;
115+ }
116+
117+ public void Close ( )
118+ {
119+ mConnection . OnCompleted ( ) ;
120+ }
121+ }
113122}
0 commit comments