Skip to content

Commit a59c571

Browse files
committed
added support for transaction to bucket/gridfs and bulk operations
1 parent b8ba0d2 commit a59c571

File tree

8 files changed

+249
-52
lines changed

8 files changed

+249
-52
lines changed

lib/mongo.ex

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,35 @@ defmodule Mongo do
664664
issue_command(topology_pid, [ping: 1], :read, [batch_size: 1])
665665
end
666666

667+
@doc """
668+
Explicitly creates a collection or view.
669+
"""
670+
@spec create(GenServer.server, collection, Keyword.t) :: :ok | {:error, Mongo.Error.t}
671+
def create(topology_pid, coll, opts \\ []) do
672+
673+
cmd = [
674+
create: coll,
675+
capped: opts[:capped],
676+
autoIndexId: opts[:auto_index_id],
677+
size: opts[:size],
678+
max: opts[:max],
679+
storageEngine: opts[:storage_engine],
680+
validator: opts[:validator],
681+
validationLevel: opts[:validation_level],
682+
validationAction: opts[:validation_action],
683+
indexOptionDefaults: opts[:index_option_defaults],
684+
viewOn: opts[:view_on],
685+
pipeline: opts[:pipeline],
686+
collation: opts[:collation],
687+
writeConcern: write_concern(opts),
688+
] |> filter_nils()
689+
690+
with {:ok, _doc} <- issue_command(topology_pid, cmd, :write, opts) do
691+
:ok
692+
end
693+
694+
end
695+
667696
@doc """
668697
Insert a single document into the collection.
669698

lib/mongo/grid_fs/bucket.ex

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,14 @@ defmodule Mongo.GridFs.Bucket do
4141

4242
Keyword.merge(@defaults, options)
4343
|> Enum.reduce(%Bucket{topology_pid: topology_pid, opts: options}, fn {k, v}, bucket -> Map.put(bucket, k, v) end)
44-
|> check_indexes
44+
|> check_indexes()
4545

4646
end
4747

48+
def add_session(%Bucket{opts: opts} = bucket, session_opts) do
49+
%Bucket{bucket | opts: opts ++ session_opts}
50+
end
51+
4852
@doc """
4953
Returns the collection name for the files collection, default is fs.files.
5054
"""
@@ -127,19 +131,41 @@ defmodule Mongo.GridFs.Bucket do
127131
#
128132
defp check_indexes(bucket) do
129133

130-
case files_collection_empty?(bucket)do
131-
true ->
132-
_ = create_files_index({bucket, false})
133-
_ = create_chunks_index({bucket, false})
134-
134+
case files_collection_empty?(bucket) do
135+
true -> create_indexes(bucket)
135136
false ->
136-
bucket
137-
|> check_files_index()
138-
|> create_files_index()
139-
|> check_chunks_index()
140-
|> create_chunks_index()
137+
with :ok <- check_and_create_files_index(bucket),
138+
:ok <- check_and_chunks_files_index(bucket) do
139+
bucket
140+
end
141+
end
142+
143+
end
144+
145+
##
146+
# create the collection and indexes for files and chunks
147+
#
148+
defp create_indexes(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
149+
with :ok <- Mongo.create(topology_pid, files_collection_name(bucket), opts),
150+
:ok <- Mongo.create(topology_pid, chunks_collection_name(bucket), opts),
151+
:ok <- create_files_index(bucket),
152+
:ok <- create_chunks_index(bucket) do
153+
bucket
141154
end
155+
end
142156

157+
def check_and_create_files_index(bucket) do
158+
case check_files_index(bucket) do
159+
true -> create_files_index(bucket)
160+
false -> :ok
161+
end
162+
end
163+
164+
def check_and_chunks_files_index(bucket) do
165+
case check_chunks_index(bucket) do
166+
true -> create_chunks_index(bucket)
167+
false -> :ok
168+
end
143169
end
144170

145171
##
@@ -150,33 +176,33 @@ defmodule Mongo.GridFs.Bucket do
150176
# db.fs.files.findOne({}, { _id : 1 })
151177
#
152178
defp files_collection_empty?(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
153-
179+
coll_name = files_collection_name(bucket)
154180
topology_pid
155-
|> Mongo.find_one(files_collection_name(bucket), %{}, Keyword.merge(opts, projection: %{_id: 1}))
181+
|> Mongo.show_collections()
182+
|> Enum.find(fn name -> name == coll_name end)
156183
|> is_nil()
157-
158184
end
159185

160186
##
161187
# Checks the indexes for the fs.files collection
162188
#
163189
defp check_files_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
164-
{bucket, index_member?(topology_pid, files_collection_name(bucket), @files_index_name, opts)}
190+
index_member?(topology_pid, files_collection_name(bucket), @files_index_name, opts)
165191
end
166192

167193
##
168194
# Checks the indexes for the fs.chunks collection
169195
#
170196
defp check_chunks_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
171-
{bucket, index_member?(topology_pid, chunks_collection_name(bucket), @chunks_index_name, opts)}
197+
index_member?(topology_pid, chunks_collection_name(bucket), @chunks_index_name, opts)
172198
end
173199

174200
# returns true if the collection contains a index with the given name
175201
def index_member?(topology_pid, coll, index, opts) do
176202
topology_pid
177203
|> Mongo.list_indexes(coll, opts)
178204
|> Enum.map(fn
179-
%{"name" => name} -> name
205+
%{"name" => name} -> name
180206
_ -> ""
181207
end)
182208
|> Enum.member?(index)
@@ -185,34 +211,23 @@ defmodule Mongo.GridFs.Bucket do
185211
##
186212
# Creates the indexes for the fs.chunks collection
187213
#
188-
defp create_chunks_index({%Bucket{topology_pid: topology_pid, opts: opts} = bucket, false} ) do
189-
190-
cmd = [createIndexes: chunks_collection_name(bucket), indexes: [[key: [files_id: 1, n: 1], name: @chunks_index_name, unique: true]]]
191-
{:ok, _} = Mongo.issue_command(topology_pid, cmd, :write, opts)
192-
193-
bucket
214+
defp create_chunks_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
215+
cmd = [createIndexes: chunks_collection_name(bucket), indexes: [[key: [files_id: 1, n: 1], name: @chunks_index_name, unique: true]]]
216+
with {:ok, _} <- Mongo.issue_command(topology_pid, cmd, :write, opts) do
217+
:ok
218+
end
194219
end
195220

196-
##
197-
# index exists, nothing to do
198-
#
199-
defp create_chunks_index({bucket, true}), do: bucket
200-
201221
##
202222
# Creates the indexes for the fs.files collection
203223
#
204-
defp create_files_index({%Bucket{topology_pid: topology_pid, opts: opts} = bucket, false}) do
205-
206-
cmd = [createIndexes: files_collection_name(bucket), indexes: [[key: [filename: 1, uploadDate: 1], name: @files_index_name]]]
207-
{:ok, _} = Mongo.issue_command(topology_pid, cmd, :write, opts)
208-
209-
bucket
224+
defp create_files_index(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
225+
cmd = [createIndexes: files_collection_name(bucket), indexes: [[key: [filename: 1, uploadDate: 1], name: @files_index_name]]]
226+
with {:ok, _} <- Mongo.issue_command(topology_pid, cmd, :write, opts) do
227+
:ok
228+
end
210229
end
211230

212-
##
213-
# index exists, nothing to do
214-
#
215-
defp create_files_index({bucket, true}), do: bucket
216231

217232
defimpl Inspect, for: Bucket do
218233

lib/mongo/session.ex

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,10 @@ defmodule Mongo.Session do
140140

141141
## warte max 120ms, ansonsten kill
142142
try do
143+
fun.(opts)
143144
rescue
144145
reason -> {:error, reason}
145146
end
146-
fun.(opts)
147147

148148
end
149149

@@ -280,7 +280,12 @@ defmodule Mongo.Session do
280280
maxTimeMS: Keyword.get(opts, :max_commit_time_ms)
281281
] |> filter_nils()
282282

283-
Mongo.exec_command(conn, cmd, database: "admin")
283+
_doc = Mongo.exec_command(conn, cmd, database: "admin")
284+
285+
# {:ok, %{"$clusterTime" => %{"clusterTime" => #BSON.Timestamp<1567853627:8>,
286+
# "signature" => %{"hash" => #BSON.Binary<0000000000000000000000000000000000000000>, "keyId" => 0}},
287+
# "ok" => 1.0, "operationTime" => #BSON.Timestamp<1567853627:6>}}
288+
:ok
284289
end
285290

286291
defp run_abort_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do
@@ -295,7 +300,15 @@ defmodule Mongo.Session do
295300
writeConcern: write_concern(opts)
296301
] |> filter_nils()
297302

298-
Mongo.exec_command(conn, cmd, database: "admin")
303+
_doc = Mongo.exec_command(conn, cmd, database: "admin")
304+
305+
#
306+
# doc:
307+
# %{"$clusterTime" => %{"clusterTime" => #BSON.Timestamp<1567853164:4>,
308+
# "signature" => %{"hash" => #BSON.Binary<0000000000000000000000000000000000000000>, "keyId" => 0}},
309+
#"ok" => 1.0, "operationTime" => #BSON.Timestamp<1567853164:4>}
310+
311+
:ok
299312
end
300313

301314

lib/mongo/write_concern.ex

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,20 @@ defmodule Mongo.WriteConcern do
99
w: Keyword.get(opts, :w),
1010
j: Keyword.get(opts, :j),
1111
wtimeout: Keyword.get(opts, :wtimeout)
12-
} |> filter_nils()
12+
} |> filter_nils() |> filter_empty()
1313

1414
end
1515

16+
def filter_empty(map) when is_map(map) and map == %{} do
17+
nil
18+
end
19+
def filter_empty(map) when is_map(map) do
20+
map
21+
end
22+
23+
def acknowledged?(nil) do
24+
true
25+
end
1626
def acknowledged?(write_concern) when is_map(write_concern) do
1727
case Map.get(write_concern, :w) do
1828
0 -> false

test/mongo/grid_fs/upload_test.exs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ defmodule Mongo.GridFs.UploadTest do
33

44
alias Mongo.GridFs.Bucket
55
alias Mongo.GridFs.Upload
6+
alias Mongo.Session
67

78
setup_all do
89
assert {:ok, pid} = Mongo.TestConnection.connect
@@ -90,4 +91,47 @@ defmodule Mongo.GridFs.UploadTest do
9091
assert x == chksum
9192
end
9293

94+
@tag :mongo_4_2
95+
test "upload a text file, check download, length, meta-data and checksum transaction", c do
96+
97+
src_filename = "./test/data/test.txt"
98+
chksum = calc_checksum(src_filename)
99+
bucket = Bucket.new(c.pid)
100+
101+
{:ok, upload_stream } = Session.with_transaction(c.pid, fn opt ->
102+
bucket = Bucket.add_session(bucket, opt)
103+
upload_stream = Upload.open_upload_stream(bucket, "my-example-file.txt", %{tag: "checked", chk_sum: chksum})
104+
File.stream!(src_filename, [], 512) |> Stream.into(upload_stream) |> Stream.run()
105+
{:ok, upload_stream}
106+
end, w: 1)
107+
108+
file_id = upload_stream.id
109+
110+
assert file_id != nil
111+
112+
%{"metadata" => %{"tag" => "checked", "chk_sum" => x}} = Mongo.find_one(c.pid, Bucket.files_collection_name(bucket), %{_id: file_id})
113+
assert x == chksum
114+
end
115+
116+
@tag :mongo_4_2
117+
test "upload a text file, check download, length, meta-data and checksum abort transaction", c do
118+
119+
src_filename = "./test/data/test.txt"
120+
chksum = calc_checksum(src_filename)
121+
bucket = Bucket.new(c.pid)
122+
123+
{:error, upload_stream} = Session.with_transaction(c.pid, fn opt ->
124+
bucket = Bucket.add_session(bucket, opt)
125+
upload_stream = Upload.open_upload_stream(bucket, "my-example-file.txt", %{tag: "checked", chk_sum: chksum})
126+
File.stream!(src_filename, [], 512) |> Stream.into(upload_stream) |> Stream.run()
127+
{:error, upload_stream}
128+
end, w: 1)
129+
130+
file_id = upload_stream.id
131+
132+
assert file_id != nil
133+
134+
assert nil == Mongo.find_one(c.pid, Bucket.files_collection_name(Bucket.new(c.pid)), %{_id: file_id})
135+
end
136+
93137
end

0 commit comments

Comments
 (0)