1
+ /* Copyright 2010-2015 MongoDB Inc.
2
+ *
3
+ * Licensed under the Apache License, Version 2.0 (the "License");
4
+ * you may not use this file except in compliance with the License.
5
+ * You may obtain a copy of the License at
6
+ *
7
+ * http://www.apache.org/licenses/LICENSE-2.0
8
+ *
9
+ * Unless required by applicable law or agreed to in writing, software
10
+ * distributed under the License is distributed on an "AS IS" BASIS,
11
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ * See the License for the specific language governing permissions and
13
+ * limitations under the License.
14
+ */
15
+
16
+ using System ;
17
+ using System . Collections . Generic ;
18
+ using System . IO ;
19
+ using System . Linq ;
20
+ using System . Threading ;
21
+ using System . Threading . Tasks ;
22
+ using MongoDB . Bson ;
23
+ using MongoDB . Bson . Serialization . Serializers ;
24
+ using MongoDB . Driver . Core . Async ;
25
+ using MongoDB . Driver . Core . Bindings ;
26
+ using MongoDB . Driver . Core . Clusters ;
27
+ using MongoDB . Driver . Core . Configuration ;
28
+ using MongoDB . Driver . Core . Events . Diagnostics ;
29
+ using MongoDB . Driver . Core . Misc ;
30
+ using MongoDB . Driver . Core . Operations ;
31
+ using MongoDB . Driver . Core . Operations . ElementNameValidators ;
32
+ using MongoDB . Driver . Core . WireProtocol . Messages . Encoders ;
33
+
34
+ namespace MongoDB . Driver . TestConsoleApplication
35
+ {
36
+ public class CoreApiSync
37
+ {
38
+ private CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource ( ) ;
39
+ private CollectionNamespace _collection = new CollectionNamespace ( "foo" , "bar" ) ;
40
+ private MessageEncoderSettings _messageEncoderSettings = new MessageEncoderSettings ( ) ;
41
+
42
+ public void Run ( int numConcurrentWorkers , Action < ClusterBuilder > configurator )
43
+ {
44
+ try
45
+ {
46
+ var clusterBuilder = new ClusterBuilder ( ) ;
47
+ configurator ( clusterBuilder ) ;
48
+
49
+ using ( var cluster = clusterBuilder . BuildCluster ( ) )
50
+ {
51
+ Run ( numConcurrentWorkers , cluster ) ;
52
+ }
53
+ }
54
+ catch ( Exception ex )
55
+ {
56
+ Console . WriteLine ( "Unhandled exception:" ) ;
57
+ Console . WriteLine ( ex . ToString ( ) ) ;
58
+ }
59
+
60
+ Console . WriteLine ( "Press Enter to exit" ) ;
61
+ Console . ReadLine ( ) ;
62
+ }
63
+
64
+ private void Run ( int numConcurrentWorkers , ICluster cluster )
65
+ {
66
+ Console . WriteLine ( "Press Enter to begin" ) ;
67
+ Console . ReadLine ( ) ;
68
+
69
+ Console . WriteLine ( "Clearing Data" ) ;
70
+ ClearData ( cluster ) ;
71
+ Console . WriteLine ( "Inserting Seed Data" ) ;
72
+ InsertData ( cluster ) ;
73
+
74
+ Console . WriteLine ( "Running CRUD (errors will show up as + (query error) or * (insert/update error))" ) ;
75
+ List < Task > tasks = new List < Task > ( ) ;
76
+ for ( int i = 0 ; i < numConcurrentWorkers ; i ++ )
77
+ {
78
+ tasks . Add ( Task . Run ( ( ) => DoWork ( cluster ) ) ) ;
79
+ }
80
+
81
+ Console . WriteLine ( "Press Enter to shutdown" ) ;
82
+ Console . ReadLine ( ) ;
83
+
84
+ _cancellationTokenSource . Cancel ( ) ;
85
+ Task . WaitAll ( tasks . ToArray ( ) ) ;
86
+ }
87
+
88
+ private void ClearData ( ICluster cluster )
89
+ {
90
+ using ( var binding = new WritableServerBinding ( cluster ) )
91
+ {
92
+ var commandOp = new DropDatabaseOperation ( _collection . DatabaseNamespace , _messageEncoderSettings ) ;
93
+ commandOp . Execute ( binding , CancellationToken . None ) ;
94
+ }
95
+ }
96
+
97
+ private void InsertData ( ICluster cluster )
98
+ {
99
+ using ( var binding = new WritableServerBinding ( cluster ) )
100
+ {
101
+ for ( int i = 0 ; i < 100 ; i ++ )
102
+ {
103
+ Insert ( binding , new BsonDocument ( "i" , i ) ) ;
104
+ }
105
+ }
106
+ }
107
+
108
+ private void DoWork ( ICluster cluster )
109
+ {
110
+ var rand = new Random ( ) ;
111
+ using ( var binding = new WritableServerBinding ( cluster ) )
112
+ {
113
+ while ( ! _cancellationTokenSource . IsCancellationRequested )
114
+ {
115
+ var i = rand . Next ( 0 , 10000 ) ;
116
+ IReadOnlyList < BsonDocument > docs ;
117
+ using ( var cursor = Query ( binding , new BsonDocument ( "i" , i ) ) )
118
+ {
119
+ try
120
+ {
121
+ if ( cursor . MoveNext ( _cancellationTokenSource . Token ) )
122
+ {
123
+ docs = cursor . Current . ToList ( ) ;
124
+ }
125
+ else
126
+ {
127
+ docs = null ;
128
+ }
129
+ //Console.Write(".");
130
+ }
131
+ catch
132
+ {
133
+ Console . Write ( "+" ) ;
134
+ continue ;
135
+ }
136
+ }
137
+
138
+
139
+ if ( docs == null || docs . Count == 0 )
140
+ {
141
+ try
142
+ {
143
+ Insert ( binding , new BsonDocument ( ) . Add ( "i" , i ) ) ;
144
+ //Console.Write(".");
145
+ }
146
+ catch ( Exception )
147
+ {
148
+ Console . Write ( "*" ) ;
149
+ }
150
+ }
151
+ else
152
+ {
153
+ try
154
+ {
155
+ var filter = new BsonDocument ( "_id" , docs [ 0 ] [ "_id" ] ) ;
156
+ var update = new BsonDocument ( "$set" , new BsonDocument ( "i" , i + 1 ) ) ;
157
+ Update ( binding , filter , update ) ;
158
+ //Console.Write(".");
159
+ }
160
+ catch ( Exception )
161
+ {
162
+ Console . Write ( "*" ) ;
163
+ }
164
+ }
165
+ }
166
+ }
167
+ }
168
+
169
+ private void Insert ( IWriteBinding binding , BsonDocument document )
170
+ {
171
+ var documentSource = new BatchableSource < BsonDocument > ( new [ ] { document } ) ;
172
+ var insertOp = new InsertOpcodeOperation < BsonDocument > ( _collection , documentSource , BsonDocumentSerializer . Instance , _messageEncoderSettings ) ;
173
+
174
+ using ( var timeout = new CancellationTokenSource ( TimeSpan . FromSeconds ( 30 ) ) )
175
+ using ( var linked = CancellationTokenSource . CreateLinkedTokenSource ( timeout . Token , _cancellationTokenSource . Token ) )
176
+ {
177
+ insertOp . Execute ( binding , linked . Token ) ;
178
+ }
179
+ }
180
+
181
+ private IAsyncCursor < BsonDocument > Query ( IReadBinding binding , BsonDocument filter )
182
+ {
183
+ var findOp = new FindOperation < BsonDocument > ( _collection , BsonDocumentSerializer . Instance , _messageEncoderSettings )
184
+ {
185
+ Filter = filter ,
186
+ Limit = - 1
187
+ } ;
188
+
189
+ using ( var timeout = new CancellationTokenSource ( TimeSpan . FromSeconds ( 30 ) ) )
190
+ using ( var linked = CancellationTokenSource . CreateLinkedTokenSource ( timeout . Token , _cancellationTokenSource . Token ) )
191
+ {
192
+ return findOp . Execute ( binding , linked . Token ) ;
193
+ }
194
+ }
195
+
196
+ private void Update ( IWriteBinding binding , BsonDocument filter , BsonDocument update )
197
+ {
198
+ var updateOp = new UpdateOpcodeOperation (
199
+ _collection ,
200
+ new UpdateRequest ( UpdateType . Update , filter , update ) ,
201
+ _messageEncoderSettings ) ;
202
+
203
+ using ( var timeout = new CancellationTokenSource ( TimeSpan . FromSeconds ( 30 ) ) )
204
+ using ( var linked = CancellationTokenSource . CreateLinkedTokenSource ( timeout . Token , _cancellationTokenSource . Token ) )
205
+ {
206
+ updateOp . Execute ( binding , linked . Token ) ;
207
+ }
208
+ }
209
+ }
210
+ }
0 commit comments