@@ -12,11 +12,12 @@ defmodule Mongo.MongoDBConnection do
12
12
alias Mongo.Events.CommandStartedEvent
13
13
alias Mongo.MongoDBConnection.Utils
14
14
alias Mongo.Events.MoreToComeEvent
15
+ alias Mongo.StableVersion
15
16
16
17
@ timeout 5_000
17
18
@ find_one_flags ~w( slave_ok exhaust partial) a
18
19
@ write_concern ~w( w j wtimeout) a
19
- @ insecure_cmds [ :authenticate , :saslStart , :saslContinue , :getnonce , :createUser , :updateUser , :copydbgetnonce , :copydbsaslstart , :copydb , :isMaster , :ismaster ]
20
+ @ insecure_cmds [ :authenticate , :saslStart , :saslContinue , :getnonce , :createUser , :updateUser , :copydbgetnonce , :copydbsaslstart , :copydb , :isMaster , :ismaster , :hello ]
20
21
21
22
@ impl true
22
23
def connect ( opts ) do
@@ -34,12 +35,74 @@ defmodule Mongo.MongoDBConnection do
34
35
auth_mechanism: opts [ :auth_mechanism ] || nil ,
35
36
connection_type: Keyword . fetch! ( opts , :connection_type ) ,
36
37
topology_pid: Keyword . fetch! ( opts , :topology_pid ) ,
38
+ stable_api: Keyword . get ( opts , :stable_api ) ,
39
+ use_op_msg: Keyword . get ( opts , :stable_api ) != nil ,
40
+ hello_ok: Keyword . get ( opts , :stable_api ) != nil ,
37
41
ssl: opts [ :ssl ] || opts [ :tls ] || false
38
42
}
39
43
40
44
connect ( opts , state )
41
45
end
42
46
47
+ @ impl true
48
+ def disconnect ( _error , % { connection: { mod , socket } , connection_type: type , topology_pid: pid , host: host } ) do
49
+ GenServer . cast ( pid , { :disconnect , type , host } )
50
+ mod . close ( socket )
51
+ :ok
52
+ end
53
+
54
+ @ impl true
55
+ def checkout ( state ) , do: { :ok , state }
56
+ @ impl true
57
+ def handle_begin ( _opts , state ) , do: { :ok , nil , state }
58
+ @ impl true
59
+ def handle_close ( _query , _opts , state ) , do: { :ok , nil , state }
60
+ @ impl true
61
+ def handle_commit ( _opts , state ) , do: { :ok , nil , state }
62
+ @ impl true
63
+ def handle_deallocate ( _query , _cursor , _opts , state ) , do: { :ok , nil , state }
64
+ @ impl true
65
+ def handle_declare ( query , _params , _opts , state ) , do: { :ok , query , nil , state }
66
+ @ impl true
67
+ def handle_fetch ( _query , _cursor , _opts , state ) , do: { :halt , nil , state }
68
+ @ impl true
69
+ def handle_prepare ( query , _opts , state ) , do: { :ok , query , state }
70
+ @ impl true
71
+ def handle_rollback ( _opts , state ) , do: { :ok , nil , state }
72
+ @ impl true
73
+ def handle_status ( _opts , state ) , do: { :idle , state }
74
+
75
+ @ impl true
76
+ def ping ( % { connection_type: :client } = state ) do
77
+ cmd = [ ping: 1 ]
78
+
79
+ case Utils . command ( - 1 , cmd , state ) do
80
+ { :ok , _flags , % { "ok" => ok } } when ok == 1 ->
81
+ { :ok , state }
82
+
83
+ { :ok , _flags , % { "ok" => ok , "errmsg" => msg , "code" => code } } when ok == 0 ->
84
+ err = Mongo.Error . exception ( message: msg , code: code )
85
+ { :disconnect , err , state }
86
+
87
+ { :disconnect , _ , _ } = error ->
88
+ error
89
+ end
90
+ end
91
+
92
+ @ impl true
93
+ def ping ( state ) do
94
+ { :ok , state }
95
+ end
96
+
97
+ @ impl true
98
+ def handle_execute ( % Mongo.Query { action: action } = query , params , opts , original_state ) do
99
+ tmp_state = % { original_state | database: Keyword . get ( opts , :database , original_state . database ) }
100
+
101
+ with { :ok , reply , tmp_state } <- execute_action ( action , params , opts , tmp_state ) do
102
+ { :ok , query , reply , Map . put ( tmp_state , :database , original_state . database ) }
103
+ end
104
+ end
105
+
43
106
defp connect ( opts , state ) do
44
107
result =
45
108
with { :ok , state } <- tcp_connect ( opts , state ) ,
@@ -123,12 +186,12 @@ defmodule Mongo.MongoDBConnection do
123
186
end
124
187
end
125
188
126
- defp post_hello_command ( state , client ) do
127
- cmd = [ ismaster: 1 , client: client ]
189
+ defp hand_shake ( opts , state ) do
190
+ cmd = handshake_command ( state , client ( opts [ :appname ] || "elixir-driver" ) )
128
191
129
192
case Utils . command ( - 1 , cmd , state ) do
130
- { :ok , _flags , % { "ok" => ok , "maxWireVersion" => version } } when ok == 1 ->
131
- { :ok , % { state | wire_version: version } }
193
+ { :ok , _flags , % { "ok" => ok , "maxWireVersion" => version } = response } when ok == 1 ->
194
+ { :ok , % { state | wire_version: version , use_op_msg: version >= 6 , hello_ok: Map . get ( response , "helloOk" , false ) } }
132
195
133
196
{ :ok , _flags , % { "ok" => ok } } when ok == 1 ->
134
197
{ :ok , % { state | wire_version: 0 } }
@@ -142,11 +205,7 @@ defmodule Mongo.MongoDBConnection do
142
205
end
143
206
end
144
207
145
- defp hand_shake ( opts , state ) do
146
- post_hello_command ( state , driver ( opts [ :appname ] || "My killer app" ) )
147
- end
148
-
149
- defp driver ( appname ) do
208
+ defp client ( app_name ) do
150
209
driver_version =
151
210
case :application . get_key ( :mongodb_driver , :vsn ) do
152
211
{ :ok , version } -> to_string ( version )
@@ -167,7 +226,7 @@ defmodule Mongo.MongoDBConnection do
167
226
168
227
% {
169
228
client: % {
170
- application: % { name: appname }
229
+ application: % { name: app_name }
171
230
} ,
172
231
driver: % {
173
232
name: "mongodb_driver" ,
@@ -195,76 +254,50 @@ defmodule Mongo.MongoDBConnection do
195
254
defp pretty_name ( "apple" ) , do: "Mac OS X"
196
255
defp pretty_name ( name ) , do: name
197
256
198
- @ impl true
199
- def disconnect ( _error , % { connection: { mod , socket } , connection_type: type , topology_pid: pid , host: host } ) do
200
- GenServer . cast ( pid , { :disconnect , type , host } )
201
- mod . close ( socket )
202
- :ok
257
+ defp provide_cmd_data ( [ { command_name , _ } | _ ] = cmd ) do
258
+ case Enum . member? ( @ insecure_cmds , command_name ) do
259
+ true -> { command_name , % { } }
260
+ false -> { command_name , cmd }
261
+ end
203
262
end
204
263
205
- @ impl true
206
- def checkout ( state ) , do: { :ok , state }
207
- @ impl true
208
- def handle_begin ( _opts , state ) , do: { :ok , nil , state }
209
- @ impl true
210
- def handle_close ( _query , _opts , state ) , do: { :ok , nil , state }
211
- @ impl true
212
- def handle_commit ( _opts , state ) , do: { :ok , nil , state }
213
- @ impl true
214
- def handle_deallocate ( _query , _cursor , _opts , state ) , do: { :ok , nil , state }
215
- @ impl true
216
- def handle_declare ( query , _params , _opts , state ) , do: { :ok , query , nil , state }
217
- @ impl true
218
- def handle_fetch ( _query , _cursor , _opts , state ) , do: { :halt , nil , state }
219
- @ impl true
220
- def handle_prepare ( query , _opts , state ) , do: { :ok , query , state }
221
- @ impl true
222
- def handle_rollback ( _opts , state ) , do: { :ok , nil , state }
223
- @ impl true
224
- def handle_status ( _opts , state ) , do: { :idle , state }
225
-
226
- @ impl true
227
- def ping ( % { connection_type: :client } = state ) do
228
- cmd = [ ping: 1 ]
229
-
230
- case Utils . command ( - 1 , cmd , state ) do
231
- { :ok , _flags , % { "ok" => ok } } when ok == 1 ->
232
- { :ok , state }
264
+ ##
265
+ # Executes a hello or the legacy hello command
266
+ ##
267
+ defp execute_action ( { :exec_hello , cmd } , _params , opts , % { use_op_msg: true } = state ) do
268
+ db = opts [ :database ] || state . database
269
+ timeout = Keyword . get ( opts , :timeout , state . timeout )
270
+ flags = Keyword . get ( opts , :flags , 0x0 )
233
271
234
- { :ok , _flags , % { "ok" => ok , "errmsg" => msg , "code" => code } } when ok == 0 ->
235
- err = Mongo.Error . exception ( message: msg , code: code )
236
- { :disconnect , err , state }
272
+ cmd = hello_command ( cmd , state ) ++ [ "$db": db ]
237
273
238
- { :disconnect , _ , _ } = error ->
239
- error
240
- end
241
- end
274
+ event = % CommandStartedEvent {
275
+ command: :hello ,
276
+ command_name: :hello ,
277
+ database_name: db ,
278
+ request_id: state . request_id ,
279
+ operation_id: opts [ :operation_id ] ,
280
+ connection_id: self ( )
281
+ }
242
282
243
- @ impl true
244
- def ping ( state ) do
245
- { :ok , state }
246
- end
283
+ Events . notify ( event , :commands )
247
284
248
- @ impl true
249
- def handle_execute ( % Mongo.Query { action: action } = query , params , opts , original_state ) do
250
- tmp_state = % { original_state | database: Keyword . get ( opts , :database , original_state . database ) }
285
+ op = op_msg ( flags: flags , sections: [ section ( payload_type: 0 , payload: payload ( doc: cmd ) ) ] )
251
286
252
- with { :ok , reply , tmp_state } <- execute_action ( action , params , opts , tmp_state ) do
253
- { :ok , query , reply , Map . put ( tmp_state , :database , original_state . database ) }
254
- end
255
- end
287
+ case :timer . tc ( fn -> Utils . post_request ( op , state . request_id , % { state | timeout: timeout } ) end ) do
288
+ { duration , { :ok , flags , doc } } ->
289
+ state = % { state | request_id: state . request_id + 1 }
290
+ { :ok , { doc , event , flags , duration } , state }
256
291
257
- defp provide_cmd_data ( [ { command_name , _ } | _ ] = cmd ) do
258
- case Enum . member? ( @ insecure_cmds , command_name ) do
259
- true -> { command_name , % { } }
260
- false -> { command_name , cmd }
292
+ { _duration , error } ->
293
+ error
261
294
end
262
295
end
263
296
264
297
##
265
298
# Executes a more to come command
266
299
##
267
- defp execute_action ( :command , [ :more_to_come ] , opts , % { wire_version: version } = state ) when version >= 6 do
300
+ defp execute_action ( :command , [ :more_to_come ] , opts , % { use_op_msg: true } = state ) do
268
301
event = % MoreToComeEvent { command: :more_to_come , command_name: opts [ :command_name ] || :more_to_come }
269
302
270
303
timeout = Keyword . get ( opts , :timeout , state . timeout )
@@ -280,7 +313,7 @@ defmodule Mongo.MongoDBConnection do
280
313
end
281
314
end
282
315
283
- defp execute_action ( :command , [ cmd ] , opts , % { wire_version: version } = state ) when version >= 6 do
316
+ defp execute_action ( :command , [ cmd ] , opts , % { use_op_msg: true } = state ) do
284
317
{ command_name , data } = provide_cmd_data ( cmd )
285
318
db = opts [ :database ] || state . database
286
319
cmd = cmd ++ [ "$db": db ]
@@ -371,4 +404,24 @@ defmodule Mongo.MongoDBConnection do
371
404
{ _flag , false } , acc -> acc
372
405
end )
373
406
end
407
+
408
+ defp handshake_command ( % { stable_api: nil } , client ) do
409
+ [ ismaster: 1 , helloOk: 1 , client: client ]
410
+ end
411
+
412
+ defp handshake_command ( % { stable_api: stable_api } , client ) do
413
+ StableVersion . merge_stable_api ( [ hello: 1 , client: client ] , stable_api )
414
+ end
415
+
416
+ defp hello_command ( cmd , % { hello_ok: false } ) do
417
+ cmd
418
+ |> Keyword . put ( :ismaster , 1 )
419
+ |> Keyword . put ( :helloOk , 1 )
420
+ end
421
+
422
+ defp hello_command ( cmd , % { hello_ok: true , stable_api: stable_api } ) do
423
+ cmd
424
+ |> Keyword . put ( :hello , 1 )
425
+ |> StableVersion . merge_stable_api ( stable_api )
426
+ end
374
427
end
0 commit comments