Skip to content

Commit 849fb0d

Browse files
committed
added max_write_batch_size from database
renamed direct_command to exec_command
1 parent 9f485a3 commit 849fb0d

File tree

6 files changed

+67
-52
lines changed

6 files changed

+67
-52
lines changed

lib/mongo.ex

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ defmodule Mongo do
5353
alias Mongo.TopologyDescription
5454
alias Mongo.Topology
5555
alias Mongo.UrlParser
56-
alias Mongo.UnorderedBulk
5756

5857
@timeout 15000 # 5000
5958

@@ -290,7 +289,7 @@ defmodule Mongo do
290289
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
291290

292291
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
293-
{:ok, doc} <- direct_command(conn, cmd, opts) do
292+
{:ok, doc} <- exec_command(conn, cmd, opts) do
294293
{:ok, doc["value"]}
295294
end
296295

@@ -333,7 +332,7 @@ defmodule Mongo do
333332
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
334333

335334
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
336-
{:ok, doc} <- direct_command(conn, cmd, opts), do: {:ok, doc["value"]}
335+
{:ok, doc} <- exec_command(conn, cmd, opts), do: {:ok, doc["value"]}
337336
end
338337

339338
defp should_return_new(:after), do: true
@@ -364,7 +363,7 @@ defmodule Mongo do
364363
opts = Keyword.drop(opts, ~w(max_time projection sort collation)a)
365364

366365
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
367-
{:ok, doc} <- direct_command(conn, cmd, opts), do: {:ok, doc["value"]}
366+
{:ok, doc} <- exec_command(conn, cmd, opts), do: {:ok, doc["value"]}
368367
end
369368

370369
@doc false
@@ -468,7 +467,7 @@ defmodule Mongo do
468467

469468
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, opts),
470469
opts = Keyword.put(opts, :slave_ok, slave_ok),
471-
{:ok, doc} <- direct_command(conn, cmd, opts),
470+
{:ok, doc} <- exec_command(conn, cmd, opts),
472471
do: {:ok, doc["values"]}
473472
end
474473

@@ -567,13 +566,13 @@ defmodule Mongo do
567566
rp_opts = [read_preference: Keyword.get(opts, :read_preference, rp)]
568567
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, rp_opts),
569568
opts = Keyword.put(opts, :slave_ok, slave_ok),
570-
do: direct_command(conn, cmd, opts)
569+
do: exec_command(conn, cmd, opts)
571570
end
572571

573572
@doc false
574573
## refactor: exec_command
575-
@spec direct_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
576-
def direct_command(conn, cmd, opts) do
574+
@spec exec_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
575+
def exec_command(conn, cmd, opts) do
577576
action = %Query{action: :command}
578577

579578
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
@@ -588,13 +587,25 @@ defmodule Mongo do
588587
@doc """
589588
Returns the current wire version.
590589
"""
591-
def wire_version(conn, opts \\ []) do
590+
@spec wire_version(pid) :: {:ok, integer} | {:error, Mongo.Error.t}
591+
def wire_version(conn) do
592592
cmd = %Query{action: :wire_version}
593-
with {:ok, _cmd, version} <- DBConnection.execute(conn, cmd, %{}, defaults(opts)) do
593+
with {:ok, _cmd, version} <- DBConnection.execute(conn, cmd, %{}, defaults([])) do
594594
{:ok, version}
595595
end
596596
end
597597

598+
@doc """
599+
Returns the limits of the database.
600+
"""
601+
@spec limits(pid) :: {:ok, BSON.document} | {:error, Mongo.Error.t}
602+
def limits(conn) do
603+
cmd = %Query{action: :limits}
604+
with {:ok, _cmd, limits} <- DBConnection.execute(conn, cmd, %{}, defaults([])) do
605+
{:ok, limits}
606+
end
607+
end
608+
598609
@doc """
599610
Similar to `command/3` but unwraps the result and raises on error.
600611
"""
@@ -641,7 +652,7 @@ defmodule Mongo do
641652
] |> filter_nils()
642653

643654
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
644-
{:ok, doc} <- direct_command(conn, cmd, opts) do
655+
{:ok, doc} <- exec_command(conn, cmd, opts) do
645656
case doc do
646657
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
647658
_ ->
@@ -695,7 +706,7 @@ defmodule Mongo do
695706
] |> filter_nils()
696707

697708
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
698-
{:ok, doc} <- direct_command(conn, cmd, opts) do
709+
{:ok, doc} <- exec_command(conn, cmd, opts) do
699710
case doc do
700711
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
701712
_ ->
@@ -766,7 +777,7 @@ defmodule Mongo do
766777
] |> filter_nils()
767778

768779
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
769-
{:ok, doc} <- direct_command(conn, cmd, opts) do
780+
{:ok, doc} <- exec_command(conn, cmd, opts) do
770781
case doc do
771782
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
772783
%{ "ok" => _ok, "n" => n } ->
@@ -886,7 +897,7 @@ defmodule Mongo do
886897
] |> filter_nils()
887898

888899
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
889-
{:ok, doc} <- direct_command(conn, cmd, opts) do
900+
{:ok, doc} <- exec_command(conn, cmd, opts) do
890901

891902
case doc do
892903

lib/mongo/bulk_write.ex

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,6 @@ defmodule Mongo.BulkWrite do
158158
alias Mongo.UnorderedBulk
159159
alias Mongo.OrderedBulk
160160

161-
@max_batch_size 100_000 ## todo The maxWriteBatchSize limit of a database, which indicates the maximum number of write operations permitted in a write batch, raises from 1,000 to 100,000.
162-
163-
164161
@doc """
165162
Executes unordered and ordered bulk writes.
166163
@@ -204,10 +201,13 @@ defmodule Mongo.BulkWrite do
204201
insertedIds: [],
205202
}
206203

207-
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts) do
204+
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts),
205+
{:ok, limits} <- Mongo.limits(conn),
206+
max_batch_size <- limits.max_write_batch_size do
208207

209-
get_op_sequence(coll, ops)
210-
|> Enum.map(fn {cmd, docs} -> one_bulk_write_operation(conn, cmd, coll, docs, write_concern, opts) end)
208+
ops
209+
|> get_op_sequence()
210+
|> Enum.map(fn {cmd, docs} -> one_bulk_write_operation(conn, cmd, coll, docs, write_concern, max_batch_size, opts) end)
211211
|> Enum.reduce(empty, fn
212212
{cmd, {count, ids}}, acc -> merge(cmd, count, ids, acc)
213213
{cmd, {mat, mod, ups, ids}}, acc -> merge(cmd, mat, mod, ups, ids, acc)
@@ -258,9 +258,11 @@ defmodule Mongo.BulkWrite do
258258
#
259259
defp one_bulk_write(conn, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, write_concern, opts) do
260260

261-
with {_, {inserts, ids}} <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, opts),
262-
{_, {matched, modified, upserts, upsert_ids}} <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, opts),
263-
{_, deletes} <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, opts) do
261+
with {:ok, limits} <- Mongo.limits(conn),
262+
max_batch_size <- limits.max_write_batch_size,
263+
{_, {inserts, ids}} <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, max_batch_size, opts),
264+
{_, {matched, modified, upserts, upsert_ids}} <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, max_batch_size, opts),
265+
{_, deletes} <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, max_batch_size, opts) do
264266

265267
%{
266268
acknowledged: acknowledged(write_concern),
@@ -278,18 +280,18 @@ defmodule Mongo.BulkWrite do
278280
###
279281
# Executes the command `cmd` and collects the result.
280282
#
281-
defp one_bulk_write_operation(conn, cmd, coll, docs, write_concern, opts) do
282-
with result <- conn |> run_commands(get_cmds(cmd, coll, docs, write_concern, opts), opts) |> collect(cmd) do
283+
defp one_bulk_write_operation(conn, cmd, coll, docs, write_concern, max_batch_size, opts) do
284+
with result <- conn |> run_commands(get_cmds(cmd, coll, docs, write_concern, max_batch_size, opts), opts) |> collect(cmd) do
283285
{cmd, result}
284286
end
285287
end
286288

287289
##
288290
# Converts the list of operations into insert/update/delete commands
289291
#
290-
defp get_cmds(:insert, coll, docs, write_concern, opts), do: get_insert_cmds(coll, docs, write_concern, opts)
291-
defp get_cmds(:update, coll, docs, write_concern, opts), do: get_update_cmds(coll, docs, write_concern, opts)
292-
defp get_cmds(:delete, coll, docs, write_concern, opts), do: get_delete_cmds(coll, docs, write_concern, opts)
292+
defp get_cmds(:insert, coll, docs, write_concern, max_batch_size, opts), do: get_insert_cmds(coll, docs, write_concern, max_batch_size, opts)
293+
defp get_cmds(:update, coll, docs, write_concern, max_batch_size, opts), do: get_update_cmds(coll, docs, write_concern, max_batch_size, opts)
294+
defp get_cmds(:delete, coll, docs, write_concern, max_batch_size, opts), do: get_delete_cmds(coll, docs, write_concern, max_batch_size, opts)
293295

294296
defp acknowledged(%{w: w}) when w > 0, do: true
295297
defp acknowledged(%{}), do: false
@@ -299,15 +301,15 @@ defmodule Mongo.BulkWrite do
299301
#
300302
# [inserts, inserts, updates] -> [[inserts, inserts],[updates]]
301303
#
302-
defp get_op_sequence(coll, ops) do
303-
get_op_sequence(coll, ops, [])
304+
defp get_op_sequence(ops) do
305+
get_op_sequence(ops, [])
304306
end
305-
defp get_op_sequence(_coll, [], acc), do: acc
306-
defp get_op_sequence(coll, ops, acc) do
307+
defp get_op_sequence([], acc), do: acc
308+
defp get_op_sequence(ops, acc) do
307309

308310
[{kind, _doc} | _rest] = ops
309-
{docs, rest} = find_max_sequence(kind, ops)
310-
get_op_sequence(coll, rest, [{kind, docs} | acc])
311+
{docs, rest} = find_max_sequence(kind, ops)
312+
get_op_sequence(rest, [{kind, docs} | acc])
311313

312314
end
313315

@@ -371,20 +373,19 @@ defmodule Mongo.BulkWrite do
371373
defp filter_upsert_ids(nil), do: []
372374
defp filter_upsert_ids(upserted), do: Enum.map(upserted, fn doc -> doc["_id"] end)
373375

374-
375376
defp run_commands(conn, {cmds, ids}, opts) do
376-
{Enum.map(cmds, fn cmd -> Mongo.direct_command(conn, cmd, opts) end), ids}
377+
{Enum.map(cmds, fn cmd -> Mongo.exec_command(conn, cmd, opts) end), ids}
377378
end
378379
defp run_commands(conn, cmds, opts) do
379-
Enum.map(cmds, fn cmd -> Mongo.direct_command(conn, cmd, opts) end)
380+
Enum.map(cmds, fn cmd -> Mongo.exec_command(conn, cmd, opts) end)
380381
end
381382

382-
defp get_insert_cmds(coll, docs, write_concern, _opts) do
383+
defp get_insert_cmds(coll, docs, write_concern, max_batch_size, _opts) do
383384

384385
{ids, docs} = assign_ids(docs)
385386

386387
cmds = docs
387-
|> Enum.chunk_every(@max_batch_size)
388+
|> Enum.chunk_every(max_batch_size)
388389
|> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, write_concern) end)
389390
{cmds, ids}
390391

@@ -398,10 +399,10 @@ defmodule Mongo.BulkWrite do
398399

399400
end
400401

401-
defp get_delete_cmds(coll, docs, write_concern, opts) do
402+
defp get_delete_cmds(coll, docs, write_concern, max_batch_size, opts) do
402403

403404
docs
404-
|> Enum.chunk_every(@max_batch_size)
405+
|> Enum.chunk_every(max_batch_size)
405406
|> Enum.map(fn deletes -> get_delete_cmd(coll, deletes, write_concern, opts) end)
406407

407408
end
@@ -423,10 +424,10 @@ defmodule Mongo.BulkWrite do
423424

424425
end
425426

426-
defp get_update_cmds(coll, docs, write_concern, opts) do
427+
defp get_update_cmds(coll, docs, write_concern, max_batch_size, opts) do
427428

428429
docs
429-
|> Enum.chunk_every(@max_batch_size)
430+
|> Enum.chunk_every(max_batch_size)
430431
|> Enum.map(fn updates -> get_update_cmd(coll, updates, write_concern, opts) end)
431432

432433
end

lib/mongo/cursor.ex

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ defmodule Mongo.Cursor do
4444
"cursor" => %{
4545
"id" => cursor_id,
4646
"ns" => coll,
47-
"firstBatch" => docs}}} when ok == 1 <- Mongo.direct_command(conn, cmd, opts) do
47+
"firstBatch" => docs}}} when ok == 1 <- Mongo.exec_command(conn, cmd, opts) do
4848
state(conn: conn, cursor: cursor_id, coll: coll, docs: docs)
4949
end
5050
end
@@ -95,8 +95,8 @@ defmodule Mongo.Cursor do
9595
"cursor" => %{
9696
"id" => cursor_id,
9797
"ns" => coll,
98-
"firstBatch" => docs} = response}} when ok == 1 <- Mongo.direct_command(conn, cmd, opts),
99-
{:ok, wire_version} <- Mongo.wire_version(conn, opts) do
98+
"firstBatch" => docs} = response}} when ok == 1 <- Mongo.exec_command(conn, cmd, opts),
99+
{:ok, wire_version} <- Mongo.wire_version(conn) do
100100

101101
[%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(cmd, :pipeline) # extract the change stream options
102102

@@ -138,7 +138,7 @@ defmodule Mongo.Cursor do
138138
maxTimeMS: opts[:max_time]
139139
] |> filter_nils()
140140

141-
with {:ok, %{"cursor" => %{ "id" => cursor_id, "nextBatch" => docs}, "ok" => ok}} when ok == 1 <- Mongo.direct_command(conn, cmd, opts) do
141+
with {:ok, %{"cursor" => %{ "id" => cursor_id, "nextBatch" => docs}, "ok" => ok}} when ok == 1 <- Mongo.exec_command(conn, cmd, opts) do
142142
{:ok, %{cursor_id: cursor_id, docs: docs}}
143143
end
144144

@@ -158,7 +158,7 @@ defmodule Mongo.Cursor do
158158
with {:ok, %{"operationTime" => op_time,
159159
"cursor" => %{"id" => new_cursor_id,
160160
"nextBatch" => docs} = cursor,
161-
"ok" => ok}} when ok == 1 <- Mongo.direct_command(conn, get_more, opts) do
161+
"ok" => ok}} when ok == 1 <- Mongo.exec_command(conn, get_more, opts) do
162162

163163
old_token = change_stream(change_stream, :resume_token)
164164
change_stream = update_change_stream(change_stream, cursor["postBatchResumeToken"], op_time, List.last(docs))
@@ -175,7 +175,7 @@ defmodule Mongo.Cursor do
175175
{:error, %Mongo.Error{code: code} = not_resumable} when code == 11601 or code == 136 or code == 237 -> {:error, not_resumable}
176176
{:error, _error} ->
177177

178-
with {:ok, wire_version} <- Mongo.wire_version(conn, opts) do
178+
with {:ok, wire_version} <- Mongo.wire_version(conn) do
179179

180180
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
181181

@@ -281,7 +281,7 @@ defmodule Mongo.Cursor do
281281
with {:ok, %{"cursorsAlive" => [],
282282
"cursorsNotFound" => [],
283283
"cursorsUnknown" => [],
284-
"ok" => ok}} when ok == 1 <- Mongo.direct_command(conn, cmd, opts) do
284+
"ok" => ok}} when ok == 1 <- Mongo.exec_command(conn, cmd, opts) do
285285
:ok
286286
end
287287
end

lib/mongo/mongo_db_connection.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,9 @@ defmodule Mongo.MongoDBConnection do
239239
end
240240
end
241241

242+
defp execute_action(:limits, _, _, state) do
243+
{:ok, state.limits, state}
244+
end
242245
defp execute_action(:wire_version, _, _, state) do
243246
{:ok, state.wire_version, state}
244247
end

lib/mongo/monitor.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ defmodule Mongo.Monitor do
104104
defp call_is_master(conn_pid, opts) do
105105
start_time = System.monotonic_time
106106
result = try do
107-
Mongo.direct_command(conn_pid, [isMaster: 1], opts)
107+
Mongo.exec_command(conn_pid, [isMaster: 1], opts)
108108
rescue
109109
e -> {:error, e}
110110
end

lib/mongo_db_connection/utils.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ defmodule Mongo.MongoDBConnection.Utils do
2323

2424
@doc """
2525
Invoking a command using connection stored in state, that means within a DBConnection call. Therefore
26-
we cannot call DBConnect.execute() to reuse the command function in Monto.direct_command()
26+
we cannot call DBConnect.execute() to reuse the command function in Monto.exec_command()
2727
2828
Using op_query structure to invoke the command
2929
"""

0 commit comments

Comments
 (0)