@@ -50,8 +50,6 @@ defmodule Mongo do
50
50
import Mongo.Utils
51
51
import Mongo.WriteConcern
52
52
53
- require Logger
54
-
55
53
use Bitwise
56
54
use Mongo.Messages
57
55
alias Mongo.Query
@@ -60,6 +58,10 @@ defmodule Mongo do
60
58
alias Mongo.UrlParser
61
59
alias Mongo.Session
62
60
alias Mongo.ReadPreference
61
+ alias Mongo.Events
62
+ alias Mongo.Events.CommandSucceededEvent
63
+ alias Mongo.Events.CommandFailedEvent
64
+ alias Mongo.Error
63
65
64
66
@ timeout 15000 # 5000
65
67
@@ -379,26 +381,52 @@ defmodule Mongo do
379
381
380
382
end
381
383
384
+ def admin_command ( topology_pid , cmd ) do
385
+ with { :ok , doc } <- issue_command ( topology_pid , cmd , :write , database: "admin" , retryable_writes: false ) do
386
+ { :ok , doc }
387
+ end
388
+ end
389
+
382
390
@ doc """
383
391
This function is very fundamental.
384
392
"""
385
- def issue_command ( topology_pid , cmd , type , opts ) do
393
+ def issue_command ( topology_pid , cmd , :read , opts ) do
386
394
387
- new_cmd = case type do
388
- :read -> ReadPreference . add_read_preference ( cmd , opts )
389
- :write -> cmd
390
- end
395
+ new_cmd = ReadPreference . add_read_preference ( cmd , opts )
391
396
392
- Logger . debug ( "issue_command: #{ inspect type } #{ inspect new_cmd } " )
397
+ ## check, if retryable reads are enabled
398
+ opts = Mongo . retryable_reads ( opts )
393
399
394
- with { :ok , session } <- Session . start_implicit_session ( topology_pid , type , opts ) ,
400
+ with { :ok , session } <- Session . start_implicit_session ( topology_pid , :read , opts ) ,
395
401
result <- exec_command_session ( session , new_cmd , opts ) ,
396
402
:ok <- Session . end_implict_session ( topology_pid , session ) do
403
+ case result do
404
+ { :error , error } ->
405
+ case Error . should_retry_read ( error , cmd , opts ) do
406
+ true -> issue_command ( topology_pid , cmd , :read , Keyword . put ( opts , :read_counter , 2 ) )
407
+ false -> { :error , error }
408
+ end
409
+ _other -> result
410
+ end
411
+ else
412
+ { :new_connection , _server } ->
413
+ :timer . sleep ( 1000 )
414
+ issue_command ( topology_pid , cmd , :read , opts )
415
+ end
416
+ end
417
+ def issue_command ( topology_pid , cmd , :write , opts ) do
418
+
419
+ ## check, if retryable reads are enabled
420
+ opts = Mongo . retryable_writes ( opts , acknowledged? ( cmd [ :writeConcerns ] ) )
421
+
422
+ with { :ok , session } <- Session . start_implicit_session ( topology_pid , :write , opts ) ,
423
+ result <- exec_command_session ( session , cmd , opts ) ,
424
+ :ok <- Session . end_implict_session ( topology_pid , session ) do
397
425
result
398
426
else
399
427
{ :new_connection , _server } ->
400
428
:timer . sleep ( 1000 )
401
- issue_command ( topology_pid , cmd , type , opts )
429
+ issue_command ( topology_pid , cmd , :write , opts )
402
430
end
403
431
end
404
432
@@ -688,33 +716,61 @@ defmodule Mongo do
688
716
@ doc false
689
717
@ spec exec_command_session ( GenServer . server , BSON . document , Keyword . t ) :: { :ok , BSON . document | nil } | { :error , Mongo.Error . t }
690
718
def exec_command_session ( session , cmd , opts ) do
691
-
692
- Logger . debug ( "Executing cmd with session: #{ inspect cmd } " )
693
-
694
- with { :ok , conn , cmd } <- Session . bind_session ( session , cmd ) ,
695
- { :ok , _cmd , doc } <- DBConnection . execute ( conn , % Query { action: :command } , [ cmd ] , defaults ( opts ) ) ,
696
- doc <- Session . update_session ( session , doc , opts ) ,
697
- { :ok , doc } <- check_for_error ( doc ) do
719
+ with { :ok , conn , new_cmd } <- Session . bind_session ( session , cmd ) ,
720
+ { :ok , _cmd , { doc , event } } <- DBConnection . execute ( conn , % Query { action: :command } , [ new_cmd ] , defaults ( opts ) ) ,
721
+ doc <- Session . update_session ( session , doc , opts ) ,
722
+ { :ok , doc } <- check_for_error ( doc , event ) do
698
723
{ :ok , doc }
724
+ else
725
+ { :error , error } ->
726
+ ## todo update Topology
727
+ case Error . should_retry_write ( error , cmd , opts ) do
728
+ true ->
729
+ with :ok <- Session . select_server ( session , opts ) do
730
+ exec_command_session ( session , cmd , Keyword . put ( opts , :write_counter , 2 ) )
731
+ end
732
+ false -> { :error , error }
733
+ end
699
734
end
700
735
701
736
end
702
737
703
738
@ doc false
704
739
@ spec exec_command ( GenServer . server , BSON . document , Keyword . t ) :: { :ok , BSON . document | nil } | { :error , Mongo.Error . t }
705
740
def exec_command ( conn , cmd , opts ) do
706
-
707
- Logger . debug ( "Executing cmd: #{ inspect cmd } " )
708
-
709
- with { :ok , _cmd , doc } <- DBConnection . execute ( conn , % Query { action: :command } , [ cmd ] , defaults ( opts ) ) ,
710
- { :ok , doc } <- check_for_error ( doc ) do
741
+ with { :ok , _cmd , { doc , event } } <- DBConnection . execute ( conn , % Query { action: :command } , [ cmd ] , defaults ( opts ) ) ,
742
+ { :ok , doc } <- check_for_error ( doc , event ) do
711
743
{ :ok , doc }
712
744
end
713
745
714
746
end
715
747
716
- defp check_for_error ( % { "ok" => ok } = response ) when ok == 1 , do: { :ok , response }
717
- defp check_for_error ( doc ) , do: { :error , Mongo.Error . exception ( doc ) }
748
+ defp check_for_error ( % { "ok" => ok } = response , { event , duration } ) when ok == 1 do
749
+ Events . notify ( % CommandSucceededEvent {
750
+ reply: response ,
751
+ duration: duration ,
752
+ command_name: event . command_name ,
753
+ request_id: event . request_id ,
754
+ operation_id: event . operation_id ,
755
+ connection_id: event . connection_id
756
+ } , :commands )
757
+ { :ok , response }
758
+ end
759
+ defp check_for_error ( doc , { event , duration } ) do
760
+
761
+ error = Mongo.Error . exception ( doc )
762
+
763
+ Events . notify ( % CommandFailedEvent {
764
+ failure: error ,
765
+ duration: duration ,
766
+ command_name: event . command_name ,
767
+ request_id: event . request_id ,
768
+ operation_id: event . operation_id ,
769
+ connection_id: event . connection_id
770
+ } , :commands )
771
+
772
+ { :error , error }
773
+ end
718
774
719
775
@ doc """
720
776
Returns the wire version of the database
@@ -1149,6 +1205,22 @@ defmodule Mongo do
1149
1205
end
1150
1206
end
1151
1207
1208
+ @ doc """
1209
+ Convenient function that drops the database `name`.
1210
+ """
1211
+ @ spec drop_database ( GenServer . server , String . t ) :: :ok | { :error , Mongo.Error . t }
1212
+ def drop_database ( topology_pid , name \\ nil )
1213
+ def drop_database ( topology_pid , nil ) do
1214
+ with { :ok , _ } <- Mongo . issue_command ( topology_pid , [ dropDatabase: 1 ] , :write , [ ] ) do
1215
+ :ok
1216
+ end
1217
+ end
1218
+ def drop_database ( topology_pid , name ) do
1219
+ with { :ok , _ } <- Mongo . issue_command ( topology_pid , [ dropDatabase: 1 ] , :write , [ database: name ] ) do
1220
+ :ok
1221
+ end
1222
+ end
1223
+
1152
1224
@ doc """
1153
1225
Getting Collection Names
1154
1226
"""
@@ -1168,6 +1240,63 @@ defmodule Mongo do
1168
1240
|> Stream . map ( fn coll -> coll [ "name" ] end )
1169
1241
end
1170
1242
1243
+ @ doc """
1244
+ In case of retryable reads are enabled, the keyword `:read_counter` is added with the value of 1.
1245
+
1246
+ In other cases like
1247
+
1248
+ * `:retryable_reads` is false or nil
1249
+ * `:session` is nil
1250
+ * `:read_counter` is nil
1251
+
1252
+ the `opts` is unchanged
1253
+
1254
+ ## Example
1255
+
1256
+ iex> Mongo.retryable_reads([retryable_reads: true])
1257
+ [retryable_reads: true, read_counter: 1]
1258
+
1259
+ """
1260
+ def retryable_reads ( opts ) do
1261
+ case opts [ :read_counter ] do
1262
+ nil -> case opts [ :retryable_reads ] == true && opts [ :session ] == nil do
1263
+ true -> opts ++ [ read_counter: 1 ]
1264
+ false -> opts
1265
+ end
1266
+ _other -> opts
1267
+ end
1268
+ end
1269
+
1270
+ @ doc """
1271
+ In case of retryable writes are enabled, the keyword `:write_counter` is added with the value of 1.
1272
+
1273
+ In other cases like
1274
+
1275
+ * `:retryable_writes` is false or nil
1276
+ * `:session` is nil
1277
+ * `:write_counter` is nil
1278
+
1279
+ the `opts` is unchanged
1280
+
1281
+ ## Example
1282
+
1283
+ iex> Mongo.retryable_writes([retryable_writes: true], true)
1284
+ [retryable_writes: true, write_counter: 1]
1285
+
1286
+ """
1287
+ def retryable_writes ( opts , true ) do
1288
+ case opts [ :write_counter ] do
1289
+ nil -> case Keyword . get ( opts , :retryable_writes , true ) == true && opts [ :session ] == nil do
1290
+ true -> opts ++ [ write_counter: 1 ]
1291
+ false -> opts
1292
+ end
1293
+ _other -> opts
1294
+ end
1295
+ end
1296
+ def retryable_writes ( opts , false ) do
1297
+ Keyword . put ( opts , :retryable_writes , false )
1298
+ end
1299
+
1171
1300
defp get_stream ( topology_pid , cmd , opts ) do
1172
1301
Mongo.Stream . new ( topology_pid , cmd , opts )
1173
1302
end
0 commit comments