Skip to content

Commit 9ca35e7

Browse files
committed
updated bulk write with session support, added better logging
1 parent 5334ad0 commit 9ca35e7

File tree

6 files changed

+77
-44
lines changed

6 files changed

+77
-44
lines changed

config/config.exs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ use Mix.Config
88
# if you want to provide default values for your application for third-
99
# party users, it should be done in your mix.exs file.
1010

11+
config :logger, :console,
12+
level: :debug,
13+
truncate: 1024,
14+
format: "$time [$level] $message ($metadata)\n\n",
15+
metadata: [:module, :function, :line]
16+
1117
# Sample configuration:
1218
#
1319
# config :logger, :console,

lib/mongo.ex

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,12 @@ defmodule Mongo do
4646
interval, the operation returns an error
4747
"""
4848

49+
require Logger
50+
4951
use Bitwise
5052
use Mongo.Messages
5153
alias Mongo.Query
5254
alias Mongo.ReadPreference
53-
alias Mongo.TopologyDescription
5455
alias Mongo.Topology
5556
alias Mongo.UrlParser
5657
alias Mongo.Session.ServerSession
@@ -304,27 +305,28 @@ defmodule Mongo do
304305
session pool for reuse.
305306
"""
306307
def issue_command(topology_pid, cmd, :write, opts) do
308+
309+
Logger.debug("issue_command: write #{inspect cmd}")
310+
307311
with {:ok, conn, _, _, session_id} <- Topology.select_server(topology_pid, :write, opts),
308312
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts),
309-
ok <- checkin_session_id(topology_pid, cmd, session_id) do
313+
:ok <- checkin_session_id(topology_pid, cmd, session_id) do
310314
{:ok, conn, doc}
311315
else
312316
{:new_connection, _server} ->
313-
IO.puts "too fast, call it again"
314317
issue_command(topology_pid, cmd, :write, opts)
315318
end
316319
end
317320
def issue_command(topology_pid, cmd, :read, opts) do
318321

319-
IO.puts "issue_command: read #{inspect cmd}"
322+
Logger.debug("issue_command: read #{inspect cmd}")
320323
with {:ok, conn, slave_ok, _, session_id} <- Topology.select_server(topology_pid, :read, opts),
321324
opts = Keyword.put(opts, :slave_ok, slave_ok),
322325
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts),
323-
ok <- checkin_session_id(topology_pid, cmd, session_id) do
326+
:ok <- checkin_session_id(topology_pid, cmd, session_id) do
324327
{:ok, conn, doc}
325328
else
326329
{:new_connection, _server} ->
327-
IO.puts "too fast, call it again"
328330
issue_command(topology_pid, cmd, :read, opts)
329331
end
330332
end
@@ -333,10 +335,10 @@ defmodule Mongo do
333335
cmd
334336
end
335337
defp update_session_id(cmd, %ServerSession{:session_id => session_id}) do
336-
IO.puts "update_session_id #{inspect session_id}"
338+
Logger.debug("update_session_id for cmd #{inspect session_id}")
337339
Keyword.merge(cmd, [lsid: %{id: session_id}])
338340
end
339-
defp checkin_session_id(topology_pid, cmd, nil), do: :ok
341+
defp checkin_session_id(_topology_pid, _cmd, nil), do: :ok
340342
defp checkin_session_id(topology_pid, cmd, session_id) do
341343
case Keyword.get(cmd, :lsid, :implicit) do
342344
:implicit -> Topology.checkin_session_id(topology_pid, session_id)
@@ -618,12 +620,11 @@ defmodule Mongo do
618620
end
619621

620622
@doc false
621-
## refactor: exec_command
622623
@spec exec_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
623624
def exec_command(conn, cmd, opts) do
624625
action = %Query{action: :command}
625626

626-
IO.puts "Executing cmd #{inspect cmd}"
627+
Logger.debug("Executing cmd: #{inspect cmd}")
627628

628629
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
629630
{:ok, doc} <- check_for_error(doc) do
@@ -972,14 +973,14 @@ defmodule Mongo do
972973
end
973974
end
974975

975-
def start_session(top, opts \\ []) do
976+
def start_session(top) do
976977
## todo error code handling
977978
with {:ok, %{"id" => uuid, "ok" => ok}} when ok == 1 <- command(top, [startSession: 1], database: "admin") do
978979
uuid
979980
end
980981
end
981982

982-
def end_sessions(top, sessions, opts \\ []) do
983+
def end_sessions(top, sessions) do
983984
## todo error code handling
984985
with {:ok, %{"ok" => ok}} when ok == 1 <- command(top, [endSessions: sessions], database: "admin") do
985986
:ok

lib/mongo/bulk_write.ex

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,8 @@ defmodule Mongo.BulkWrite do
158158
alias Mongo.UnorderedBulk
159159
alias Mongo.OrderedBulk
160160
alias Mongo.BulkWriteResult
161+
alias Mongo.Session.ServerSession
162+
alias Mongo.Topology
161163

162164
@doc """
163165
Executes unordered and ordered bulk writes.
@@ -181,27 +183,39 @@ defmodule Mongo.BulkWrite do
181183
def write(topology_pid, %UnorderedBulk{} = bulk, opts) do
182184

183185
write_concern = write_concern(opts)
184-
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts) do
185-
one_bulk_write(conn, bulk, write_concern, opts)
186+
187+
with {:ok, conn, _, _, session_id} <- Topology.select_server(topology_pid, :write, opts),
188+
result = one_bulk_write(conn, bulk, write_concern, Keyword.merge(opts, [lsid: session_id])),
189+
:ok <- checkin_session_id(topology_pid, Keyword.get(opts, :lsid, :implicit), session_id) do
190+
result
191+
else
192+
{:new_connection, _server} ->
193+
write(topology_pid, bulk, opts)
186194
end
187195

188196
end
189197

190-
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do
198+
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops} = bulk, opts) do
191199

192200
write_concern = write_concern(opts)
193201

194202
empty = %BulkWriteResult{acknowledged: acknowledged(write_concern)}
195203

196-
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts),
204+
with {:ok, conn, _, _, session_id} <- Topology.select_server(topology_pid, :write, opts),
197205
{:ok, limits} <- Mongo.limits(conn),
198-
max_batch_size <- limits.max_write_batch_size do
206+
max_batch_size <- limits.max_write_batch_size,
207+
result = ops
208+
|> get_op_sequence()
209+
|> Enum.map(fn {cmd, docs} -> one_bulk_write_operation(conn, cmd, coll, docs, write_concern, max_batch_size, Keyword.merge(opts, [lsid: session_id])) end)
210+
|> BulkWriteResult.reduce(empty),
211+
:ok <- checkin_session_id(topology_pid, Keyword.get(opts, :lsid, :implicit), session_id) do
199212

200-
ops
201-
|> get_op_sequence()
202-
|> Enum.map(fn {cmd, docs} -> one_bulk_write_operation(conn, cmd, coll, docs, write_concern, max_batch_size, opts) end)
203-
|> BulkWriteResult.reduce(empty)
213+
result
214+
else
215+
{:new_connection, _server} ->
216+
write(topology_pid, bulk, opts)
204217
end
218+
205219
end
206220

207221
##
@@ -231,13 +245,23 @@ defmodule Mongo.BulkWrite do
231245
max_batch_size <- limits.max_write_batch_size,
232246
insert_result <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, max_batch_size, opts),
233247
update_result <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, max_batch_size, opts),
234-
delete_result <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, max_batch_size, opts) do
248+
delete_result <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, max_batch_size, opts) do
235249

236250
[insert_result, update_result, delete_result]
237251
|> BulkWriteResult.reduce(%BulkWriteResult{acknowledged: acknowledged(write_concern)})
238252
end
239253
end
240254

255+
defp update_session_id(cmd, nil) do
256+
cmd
257+
end
258+
defp update_session_id(cmd, %ServerSession{:session_id => session_id}) do
259+
Keyword.merge(cmd, [lsid: %{id: session_id}])
260+
end
261+
262+
defp checkin_session_id(topology_pid, :implicit, session_id), do: Topology.checkin_session_id(topology_pid, session_id)
263+
defp checkin_session_id(_, _, _), do: :ok
264+
241265
###
242266
# Executes the command `cmd` and collects the result.
243267
#
@@ -340,13 +364,14 @@ defmodule Mongo.BulkWrite do
340364
Enum.map(cmds, fn cmd -> Mongo.exec_command(conn, cmd, opts) end)
341365
end
342366

343-
defp get_insert_cmds(coll, docs, write_concern, max_batch_size, _opts) do
367+
defp get_insert_cmds(coll, docs, write_concern, max_batch_size, opts) do
344368

345369
{ids, docs} = assign_ids(docs)
346370

347371
cmds = docs
348372
|> Enum.chunk_every(max_batch_size)
349373
|> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, write_concern) end)
374+
|> Enum.map(fn cmd -> update_session_id(cmd, Keyword.get(opts, :lsid)) end)
350375
{cmds, ids}
351376

352377
end
@@ -364,6 +389,7 @@ defmodule Mongo.BulkWrite do
364389
docs
365390
|> Enum.chunk_every(max_batch_size)
366391
|> Enum.map(fn deletes -> get_delete_cmd(coll, deletes, write_concern, opts) end)
392+
|> Enum.map(fn cmd -> update_session_id(cmd, Keyword.get(opts, :lsid)) end)
367393

368394
end
369395

@@ -389,6 +415,7 @@ defmodule Mongo.BulkWrite do
389415
docs
390416
|> Enum.chunk_every(max_batch_size)
391417
|> Enum.map(fn updates -> get_update_cmd(coll, updates, write_concern, opts) end)
418+
|> Enum.map(fn cmd -> update_session_id(cmd, Keyword.get(opts, :lsid)) end)
392419

393420
end
394421

lib/mongo/monitor.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ defmodule Mongo.Monitor do
2323

2424
# this is not configurable because the specification says so
2525
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#minheartbeatfrequencyms
26-
@min_heartbeat_frequency_ms 500
26+
# not used @min_heartbeat_frequency_ms 500
2727

2828
def start_link(args) do
2929
GenServer.start_link(__MODULE__, args)

lib/mongo/topology.ex

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,15 @@
11
defmodule Mongo.Topology do
22
@moduledoc false
33

4+
require Logger
5+
46
use GenServer
57
alias Mongo.Events.{ServerDescriptionChangedEvent, ServerOpeningEvent, ServerClosedEvent,
68
TopologyDescriptionChangedEvent, TopologyOpeningEvent, TopologyClosedEvent}
79
alias Mongo.TopologyDescription
810
alias Mongo.ServerDescription
911
alias Mongo.Monitor
1012
alias Mongo.Session.SessionPool
11-
alias Mongo.Session.ServerSession
1213

1314
# https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#heartbeatfrequencyms-defaults-to-10-seconds-or-60-seconds
1415
@heartbeat_frequency_ms 10_000
@@ -109,14 +110,6 @@ defmodule Mongo.Topology do
109110
})
110111
end
111112

112-
def handle_call(:topology, _from, state) do
113-
{:reply, state.topology, state}
114-
end
115-
116-
def handle_call({:connection, address}, _from, state) do
117-
{:reply, Map.fetch(state.connection_pools, address), state}
118-
end
119-
120113
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#updating-the-topologydescription
121114
def handle_cast({:server_description, server_description}, state) do
122115
new_state = handle_server_description(state, server_description)
@@ -226,30 +219,37 @@ defmodule Mongo.Topology do
226219
state
227220
end
228221

222+
def handle_call(:topology, _from, state) do
223+
{:reply, state.topology, state}
224+
end
225+
226+
def handle_call({:connection, address}, _from, state) do
227+
{:reply, Map.fetch(state.connection_pools, address), state}
228+
end
229+
229230
def handle_call({:select_server, type, opts}, from, %{:topology => topology, :waiting_pids => waiting, connection_pools: pools} = state) do
230231
case TopologyDescription.select_servers(topology, type, opts) do
231232

232233
:empty ->
233-
IO.puts "select_servers: Empty "
234+
Logger.debug("select_server: empty")
234235
{:noreply, %{state | waiting_pids: [from | waiting]}} ## no servers available, wait for connection
235236

236237
{:ok, servers, slave_ok, mongos?} -> ## found, select randomly a server and return its connection_pool
237-
IO.puts "select_servers: found #{inspect servers}"
238-
IO.puts "select_servers: found #{inspect pools}"
238+
Logger.debug("select_server: found #{inspect servers}, pools: #{inspect pools}")
239239
with {:ok, connection} <- servers
240240
|> Enum.take_random(1)
241241
|> get_connection(state) do
242242

243243
session_id = checkout_session(state)
244-
IO.puts "connection is #{inspect connection}"
245-
IO.puts "session_id is #{inspect session_id}"
246244

247245

246+
Logger.debug("select_server: connection is #{inspect connection}, session_id is #{inspect session_id}")
247+
248248
{:reply, {:ok, connection, slave_ok, mongos?, session_id}, state}
249249
end
250250
error ->
251-
IO.puts "select_servers: error "
252-
{:reply, error, state} ## in case of an error, just return the error
251+
Logger.debug("select_servers: #{inspect error}")
252+
{:reply, error, state} ## in case of an error, just return the error
253253
end
254254
end
255255

@@ -292,12 +292,11 @@ defmodule Mongo.Topology do
292292
end
293293

294294
defp update_session_pool(%{:session_pool => nil} = state, logical_session_timeout) do
295-
IO.puts "creating session pool"
295+
Logger.debug("Creating session pool")
296296
{:ok, session_pool} = SessionPool.start_link(self(), logical_session_timeout)
297297
%{ state | session_pool: session_pool}
298298
end
299299
defp update_session_pool(state, _logical_session_timeout) do
300-
IO.puts "no session pool created"
301300
state
302301
end
303302

lib/mongo/topology_description.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ defmodule Mongo.TopologyDescription do
88

99
alias Mongo.ServerDescription
1010
alias Mongo.ReadPreference
11-
alias Mongo.Session.SessionPool
1211

1312
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#topologydescription
1413
@type type :: :unknown | :single | :replica_set_no_primary | :replica_set_with_primary | :sharded
@@ -56,10 +55,11 @@ defmodule Mongo.TopologyDescription do
5655
* slave_ok:
5756
* mongod?:
5857
"""
58+
def select_servers(topology, type, opts \\ [])
5959
def select_servers(%{:compatible => false}, _type, _opts) do
6060
{:error, :invalid_wire_version}
6161
end
62-
def select_servers(topology, type, opts \\ []) do
62+
def select_servers(topology, type, opts) do
6363

6464
read_preference = opts
6565
|> Keyword.get(:read_preference)

0 commit comments

Comments
 (0)