Skip to content

Commit ce83c50

Browse files
committed
#63: fixed wire version in change streams, removed some compiler warnings
1 parent bbf159c commit ce83c50

File tree

4 files changed

+27
-38
lines changed

4 files changed

+27
-38
lines changed

lib/mongo/bulk_write.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ defmodule Mongo.BulkWrite do
191191

192192
end
193193

194-
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops} = bulk, opts) do
194+
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do
195195

196196
write_concern = write_concern(opts)
197197

lib/mongo/change_stream.ex

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ defmodule Mongo.ChangeStream do
33
alias Mongo.Session
44
alias Mongo.Error
55

6-
require Logger
7-
86
import Record, only: [defrecordp: 2]
97

108
defstruct [:topology_pid, :session, :doc, :cmd, :on_resume_token, :opts]
@@ -71,7 +69,6 @@ defmodule Mongo.ChangeStream do
7169

7270
def aggregate(topology_pid, cmd, fun, opts) do
7371

74-
Logger.info("start_implicit_session ")
7572
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
7673
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
7774

@@ -87,13 +84,11 @@ defmodule Mongo.ChangeStream do
8784

8885
def aggregate(topology_pid, session, doc, cmd, fun) do
8986

90-
Logger.info("aggregate wire version: #{inspect Mongo.wire_version(topology_pid)}")
9187
with %{"operationTime" => op_time,
9288
"cursor" => %{
9389
"id" => cursor_id,
9490
"ns" => coll,
95-
"firstBatch" => docs} = response} <- doc,
96-
{:ok, wire_version} <- Mongo.wire_version(topology_pid) do
91+
"firstBatch" => docs} = response} <- doc do
9792

9893
[%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(cmd, :pipeline) # extract the change stream options
9994

@@ -105,7 +100,7 @@ defmodule Mongo.ChangeStream do
105100
# The initial aggregate response did not include a postBatchResumeToken.
106101

107102
has_values = stream_opts["startAtOperationTime"] || stream_opts["startAfter"] || stream_opts["resumeAfter"]
108-
op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], wire_version)
103+
op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], Session.wire_version(session))
109104

110105
# When the ChangeStream is started:
111106
# If startAfter is set, cache it.
@@ -132,7 +127,6 @@ defmodule Mongo.ChangeStream do
132127
change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd,
133128
on_resume_token: fun) = change_stream, opts) do
134129

135-
Logger.info("Get more")
136130
get_more = [
137131
getMore: %BSON.LongNumber{value: cursor_id},
138132
collection: coll,
@@ -159,26 +153,20 @@ defmodule Mongo.ChangeStream do
159153
{:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable}
160154
{:error, _error} ->
161155

162-
Logger.info("Resuming....: #{inspect Mongo.wire_version(topology_pid) }")
163-
with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do
164-
165-
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
156+
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
166157

167-
stream_opts = update_stream_options(stream_opts, resume_token, op_time, wire_version)
168-
aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end)
158+
stream_opts = update_stream_options(stream_opts, resume_token, op_time, Session.wire_version(session))
159+
aggregate_cmd = Keyword.update!(aggregate_cmd, :pipeline, fn _ -> [%{"$changeStream" => stream_opts} | pipeline] end)
169160

170-
# kill the cursor
171-
kill_cursors(session, coll, [cursor_id], opts)
161+
# kill the cursor
162+
kill_cursors(session, coll, [cursor_id], opts)
172163

173-
# Start aggregation again...
174-
Logger.info("Calling aggregate again")
175-
with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do
176-
{:resume, state}
177-
end
164+
# Start aggregation again...
165+
with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do
166+
{:resume, state}
178167
end
179168
reason ->
180-
Logger.info("Error: #{inspect reason}")
181-
{:error, nil}
169+
{:error, reason}
182170
end
183171
end
184172

@@ -260,7 +248,6 @@ defmodule Mongo.ChangeStream do
260248
"""
261249
def kill_cursors(session, coll, cursor_ids, opts) do
262250

263-
## todo Logger.info("Kill-Cursor")
264251
cmd = [
265252
killCursors: coll,
266253
cursors: cursor_ids |> Enum.map(fn id -> %BSON.LongNumber{value: id} end)

lib/mongo/session.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,14 @@ defmodule Mongo.Session do
364364

365365
end
366366

367+
@doc """
368+
Return the wire_version used in the session.
369+
"""
370+
@spec connection(Session.t) :: pid
371+
def wire_version(pid) do
372+
call(pid, :wire_version)
373+
end
374+
367375
@doc """
368376
Return the connection used in the session.
369377
"""
@@ -532,6 +540,9 @@ defmodule Mongo.Session do
532540
def handle_call_event(:abort_transaction, _state, _data) do
533541
{:keep_state_and_data, :ok}
534542
end
543+
def handle_call_event(:wire_version, _state, %{wire_version: wire_version}) do
544+
{:keep_state_and_data, wire_version}
545+
end
535546
def handle_call_event(:connection, _state, %{conn: conn}) do
536547
{:keep_state_and_data, conn}
537548
end

lib/mongo/topology.ex

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
defmodule Mongo.Topology do
22
@moduledoc false
33

4-
require Logger
5-
64
use GenServer
75

86
alias Mongo.Events.ServerDescriptionChangedEvent
@@ -73,7 +71,6 @@ defmodule Mongo.Topology do
7371
end
7472

7573
def mark_server_unknown(pid, address) do
76-
## todo Logger.info("mark_server_unknown #{inspect address} ")
7774
server_description = ServerDescription.from_is_master_error(address, "not writable primary or recovering")
7875
update_server_description(pid, server_description)
7976
end
@@ -226,7 +223,6 @@ defmodule Mongo.Topology do
226223
end
227224

228225
def handle_info({:new_connection, waiting_pids}, state) do
229-
## todo Logger.info("Notify waitings pid with new_connection #{inspect waiting_pids}")
230226
Enum.each(waiting_pids, fn from -> GenServer.reply(from, :new_connection) end)
231227
{:noreply, state}
232228
end
@@ -255,20 +251,16 @@ defmodule Mongo.Topology do
255251
:empty ->
256252
case current == @min_heartbeat_frequency_ms do
257253
true ->
258-
## todo Logger.info("Topology: no primary found, searching")
259254
state
260255
false ->
261-
## todo Logger.info("Topology: no primary found, start searching")
262256
Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @min_heartbeat_frequency_ms) end)
263257
put_in(state[:topology][:heartbeat_frequency_ms], @min_heartbeat_frequency_ms)
264258
end
265-
host ->
259+
_host ->
266260
case current == @max_heartbeat_frequency_ms do
267261
true ->
268-
## todo Logger.info("Topology: primary exist")
269262
state
270263
false ->
271-
## todo Logger.info("Topology: primary found #{inspect host}, stop searching")
272264

273265
## filter own pid
274266
Enum.each(monitors, fn {_address, pid} -> Monitor.set_heartbeat_frequency_ms(pid, @max_heartbeat_frequency_ms) end)
@@ -351,7 +343,7 @@ defmodule Mongo.Topology do
351343
case TopologyDescription.select_servers(topology, :write, []) do
352344
:empty ->
353345
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :limits, cmd_type: :write, topology: topology})
354-
{:reply, nil, state}
346+
{:reply, {:error, :empty}, state}
355347
{:ok, {address, _opts}} ->
356348
with {:ok, limits} <- get_limits(address, topology) do
357349
{:reply, {:ok, limits}, state}
@@ -362,11 +354,10 @@ defmodule Mongo.Topology do
362354
end
363355

364356
def handle_call(:wire_version, _from, %{:topology => topology} = state) do
365-
case TopologyDescription.select_servers(topology, :read, []) do
357+
case TopologyDescription.select_servers(topology, :write, []) do
366358
:empty ->
367359
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :read, topology: topology})
368-
## todo fix me
369-
{:reply, {:ok, 9}, state}
360+
{:reply, {:error, :empty}, state}
370361

371362
{:ok, {address, _opts}} ->
372363
{:reply, {:ok, wire_version(address, topology)}, state}

0 commit comments

Comments
 (0)