1
+ defmodule Mongo.BulkOps do
2
+ @ moduledoc """
3
+
4
+ This module defines bulk operation for insert, update and delete. A bulk operation is a tupel of two elements
5
+
6
+ 1. an atom, which specify the type `:insert`, `:update` and `:delete`
7
+ 2. a document or another tupel which contains all parameters of the operation.
8
+
9
+ You use these function in streams:
10
+
11
+ ## Example
12
+
13
+ ```
14
+ Filestream!("large.csv")
15
+ |> Stream.map(&String.trim(&1))
16
+ |> Stream.map(&String.split(&1,","))
17
+ |> Stream.map(fn [firstname | [lastname | _]] -> %{firstname: firstname, lastname: lastname} end)
18
+ |> Stream.map(fn doc -> BulkOps.get_insert_one(doc) end)
19
+ |> UnorderedBulk.write(:mongo, "bulk", 1_000)
20
+ |> Stream.run()
21
+ ```
22
+
23
+ """
24
+
25
+ @ type bulk_op :: { atom , BSON . document } |
26
+ { atom , { BSON . document , Keyword . t } } |
27
+ { atom , { BSON . document , BSON . document , Keyword . t } }
28
+
29
+ import Mongo.Utils
30
+
31
+ @ doc """
32
+ Returns an `insert_one` operation tupel for appending to a bulk. Used to perform stream bulk writes.
33
+
34
+ ## Example
35
+ ```
36
+ BulkOps.get_insert_one(%{name: "Waldo"})
37
+
38
+ {:insert, %{name: "Waldo"}}
39
+ ```
40
+ """
41
+ @ spec get_insert_one ( BSON . document ) :: bulk_op
42
+ def get_insert_one ( doc ) , do: { :insert , doc }
43
+
44
+ @ doc """
45
+ Returns an `delete_one` operation tupel for appending to a bulk. Used to perform stream bulk writes.
46
+
47
+ ## Example
48
+
49
+ ```
50
+ BulkOps.get_delete_one(%{name: "Waldo"})
51
+
52
+ {:delete, {%{name: "Waldo"}, [limit: 1]}}
53
+ ```
54
+ """
55
+ @ spec get_delete_one ( BSON . document , Keyword . t ) :: bulk_op
56
+ def get_delete_one ( doc , opts \\ [ ] ) , do: { :delete , { doc , Keyword . put ( opts , :limit , 1 ) } }
57
+
58
+
59
+ @ doc """
60
+ Returns an `delete_many` operation for appending to a bulk. Used to perform stream bulk writes.
61
+ """
62
+ @ spec get_delete_many ( BSON . document , Keyword . t ) :: bulk_op
63
+ def get_delete_many ( doc , opts \\ [ ] ) , do: { :delete , { doc , Keyword . put ( opts , :limit , 0 ) } }
64
+
65
+ @ doc """
66
+ Returns an `update_one` operation for appending to a bulk. Used to perform stream bulk writes.
67
+ """
68
+ @ spec get_update_one ( BSON . document , BSON . document , Keyword . t ) :: bulk_op
69
+ def get_update_one ( filter , update , opts \\ [ ] ) do
70
+ _ = modifier_docs ( update , :update )
71
+ { :update , { filter , update , Keyword . put ( opts , :multi , false ) } }
72
+ end
73
+
74
+ @ doc """
75
+ Returns an `update_many` operation for appending to a bulk. Used to perform stream bulk writes.
76
+ """
77
+ @ spec get_update_many ( BSON . document , BSON . document , Keyword . t ) :: bulk_op
78
+ def get_update_many ( filter , update , opts \\ [ ] ) do
79
+ _ = modifier_docs ( update , :update )
80
+ { :update , { filter , update , Keyword . put ( opts , :multi , true ) } }
81
+ end
82
+
83
+ @ doc """
84
+ Returns an `replace_one` operation for appending to a bulk. Used to perform stream bulk writes.
85
+ """
86
+ @ spec get_replace_one ( BSON . document , BSON . document , Keyword . t ) :: bulk_op
87
+ def get_replace_one ( filter , replacement , opts \\ [ ] ) do
88
+ _ = modifier_docs ( replacement , :replace )
89
+ { :update , { filter , replacement , Keyword . put ( opts , :multi , false ) } }
90
+ end
91
+
92
+ end
0 commit comments