1
1
defmodule Mongo.Messages do
2
- @ moduledoc false
2
+ @ moduledoc """
3
+ This module encodes and decodes the data from and to the mongodb server.
4
+ We only support MongoDB >= 3.2 and use op_query with the hack collection "$cmd"
5
+ Other op codes are deprecated. Therefore only op_reply and op_query are supported.
6
+ """
3
7
4
8
defmacro __using__ ( _opts ) do
5
9
quote do
@@ -14,21 +18,8 @@ defmodule Mongo.Messages do
14
18
import Record
15
19
import Mongo.BinaryUtils
16
20
17
- @ op_update 2001
18
- @ op_insert 2002
19
- @ op_query 2004
20
- @ op_get_more 2005
21
- @ op_delete 2006
22
- @ op_kill_cursors 2007
23
-
24
- @ update_flags [
25
- upsert: 0x1 ,
26
- multi: 0x2
27
- ]
28
-
29
- @ insert_flags [
30
- continue_on_error: 0x1
31
- ]
21
+ @ op_reply 1
22
+ @ op_query 2004
32
23
33
24
@ query_flags [
34
25
tailable_cursor: 0x2 ,
@@ -40,116 +31,71 @@ defmodule Mongo.Messages do
40
31
partial: 0x80
41
32
]
42
33
43
- @ delete_flags [
44
- single: 0x1
45
- ]
46
-
47
34
@ header_size 4 * 4
48
35
49
36
defrecordp :msg_header , [ :length , :request_id , :response_to , :op_code ]
50
- defrecord :op_update , [ :coll , :flags , :query , :update ]
51
- defrecord :op_insert , [ :flags , :coll , :docs ]
52
37
defrecord :op_query , [ :flags , :coll , :num_skip , :num_return , :query , :select ]
53
- defrecord :op_get_more , [ :coll , :num_return , :cursor_id ]
54
- defrecord :op_delete , [ :coll , :flags , :query ]
55
- defrecord :op_kill_cursors , [ :cursor_ids ]
56
38
defrecord :op_reply , [ :flags , :cursor_id , :from , :num , :docs ]
57
39
58
- def encode ( request_id , op ) do
59
- iodata = encode_op ( op )
60
- header = msg_header ( length: IO . iodata_length ( iodata ) + @ header_size ,
61
- request_id: request_id , response_to: 0 ,
62
- op_code: op_to_code ( op ) )
63
-
64
- [ encode_header ( header ) | iodata ]
40
+ @ doc """
41
+ Decodes the header from response of a request sent by the mongodb server
42
+ """
43
+ def decode_header ( iolist ) when is_list ( iolist ) do
44
+ case IO . iodata_length ( iolist ) >= @ header_size do
45
+ true -> iolist |> IO . iodata_to_binary ( ) |> decode_header ( )
46
+ false -> :error
47
+ end
65
48
end
49
+ def decode_header ( << length :: int32 , request_id :: int32 , response_to :: int32 , op_code :: int32 , rest :: binary >> ) do
50
+ header = msg_header ( length: length - @ header_size , request_id: request_id , response_to: response_to , op_code: op_code ) ## todo don't subtract header-size here
51
+ { :ok , header , rest }
52
+ end
53
+ def decode_header ( _binary ) , do: :error
66
54
55
+ @ doc """
56
+ Decodes the response body of a request sent by the mongodb server
57
+ """
67
58
def decode_response ( msg_header ( length: length ) = header , iolist ) when is_list ( iolist ) do
68
- if IO . iodata_length ( iolist ) >= length ,
69
- do: decode_response ( header , IO . iodata_to_binary ( iolist ) ) ,
70
- else: :error
59
+ case IO . iodata_length ( iolist ) >= length do
60
+ true -> decode_response ( header , IO . iodata_to_binary ( iolist ) )
61
+ false -> :error
62
+ end
71
63
end
72
- def decode_response ( msg_header ( length: length , response_to: response_to ) , binary ) when byte_size ( binary ) >= length do
64
+ def decode_response ( msg_header ( length: length , response_to: response_to , op_code: op_code ) , binary ) when byte_size ( binary ) >= length do
73
65
<< response :: binary ( length ) , rest :: binary >> = binary
74
- { :ok , response_to , decode_reply ( response ) , rest }
75
- end
76
- def decode_response ( _header , _binary ) do
77
- :error
66
+ case op_code do
67
+ @ op_reply -> { :ok , response_to , decode_reply ( response ) , rest }
68
+ _ -> :error
69
+ end
78
70
end
71
+ def decode_response ( _header , _binary ) , do: :error
79
72
80
- def decode_header ( iolist ) when is_list ( iolist ) do
81
- if IO . iodata_length ( iolist ) >= @ header_size ,
82
- do: IO . iodata_to_binary ( iolist ) |> decode_header ,
83
- else: :error
84
- end
85
- def decode_header ( << length :: int32 , request_id :: int32 , response_to :: int32 ,
86
- op_code :: int32 , rest :: binary >> ) do
87
- header = msg_header ( length: length - @ header_size , request_id: request_id , response_to: response_to , op_code: op_code )
88
- { :ok , header , rest }
89
- end
90
- def decode_header ( _binary ) do
91
- :error
73
+ @ doc """
74
+ Decodes a reply message from the response
75
+ """
76
+ def decode_reply ( << flags :: int32 , cursor_id :: int64 , from :: int32 , num :: int32 , rest :: binary >> ) do
77
+ op_reply ( flags: flags , cursor_id: cursor_id , from: from , num: num , docs: BSON.Decoder . documents ( rest ) )
92
78
end
93
79
94
- defp encode_op ( op_update ( coll: coll , flags: flags , query: query , update: update ) ) do
95
- [ << 0x00 :: int32 >> ,
96
- coll ,
97
- << 0x00 , blit_flags ( :update , flags ) :: int32 >> ,
98
- query ,
99
- update ]
80
+ def encode ( request_id , op_query ( ) = op ) do
81
+ iodata = encode_op ( op )
82
+ header = msg_header ( length: IO . iodata_length ( iodata ) + @ header_size , request_id: request_id , response_to: 0 , op_code: @ op_query )
83
+ [ encode_header ( header ) | iodata ]
100
84
end
101
85
102
- defp encode_op ( op_insert ( flags: flags , coll: coll , docs: docs ) ) do
103
- [ << blit_flags ( :insert , flags ) :: int32 >> ,
104
- coll ,
105
- 0x00 ,
106
- docs ]
86
+ defp encode_header ( msg_header ( length: length , request_id: request_id , response_to: response_to , op_code: op_code ) ) do
87
+ << length :: int32 , request_id :: int32 , response_to :: int32 , op_code :: int32 >>
107
88
end
108
89
109
90
defp encode_op ( op_query ( flags: flags , coll: coll , num_skip: num_skip ,
110
91
num_return: num_return , query: query , select: select ) ) do
111
92
[ << blit_flags ( :query , flags ) :: int32 >> ,
112
93
coll ,
113
94
<< 0x00 , num_skip :: int32 , num_return :: int32 >> ,
114
- query ,
95
+ BSON.Encoder . document ( query ) ,
115
96
select ]
116
97
end
117
98
118
- defp encode_op ( op_get_more ( coll: coll , num_return: num_return , cursor_id: cursor_id ) ) do
119
- [ << 0x00 :: int32 >> ,
120
- coll ,
121
- << 0x00 , num_return :: int32 , cursor_id :: int64 >> ]
122
- end
123
-
124
- defp encode_op ( op_delete ( coll: coll , flags: flags , query: query ) ) do
125
- [ << 0x00 :: int32 >> ,
126
- coll ,
127
- << 0x00 , blit_flags ( :delete , flags ) :: int32 >> |
128
- query ]
129
- end
130
-
131
- defp encode_op ( op_kill_cursors ( cursor_ids: ids ) ) do
132
- binary_ids = for id <- ids , into: "" , do: << id :: int64 >>
133
- num = div byte_size ( binary_ids ) , 8
134
- [ << 0x00 :: int32 , num :: int32 >> , binary_ids ]
135
- end
136
-
137
- defp op_to_code ( op_update ( ) ) , do: @ op_update
138
- defp op_to_code ( op_insert ( ) ) , do: @ op_insert
139
- defp op_to_code ( op_query ( ) ) , do: @ op_query
140
- defp op_to_code ( op_get_more ( ) ) , do: @ op_get_more
141
- defp op_to_code ( op_delete ( ) ) , do: @ op_delete
142
- defp op_to_code ( op_kill_cursors ( ) ) , do: @ op_kill_cursors
143
-
144
- defp decode_reply ( << flags :: int32 , cursor_id :: int64 , from :: int32 , num :: int32 , rest :: binary >> ) do
145
- op_reply ( flags: flags , cursor_id: cursor_id , from: from , num: num , docs: rest )
146
- end
147
-
148
- defp encode_header ( msg_header ( length: length , request_id: request_id ,
149
- response_to: response_to , op_code: op_code ) ) do
150
- << length :: int32 , request_id :: int32 , response_to :: int32 , op_code :: int32 >>
151
- end
152
-
153
99
defp blit_flags ( op , flags ) when is_list ( flags ) do
154
100
import Bitwise
155
101
Enum . reduce ( flags , 0x0 , & ( flag_to_bit ( op , & 1 ) ||| & 2 ) )
@@ -158,21 +104,9 @@ defmodule Mongo.Messages do
158
104
flags
159
105
end
160
106
161
- Enum . each ( @ update_flags , fn { flag , bit } ->
162
- defp flag_to_bit ( :update , unquote ( flag ) ) , do: unquote ( bit )
163
- end )
164
-
165
- Enum . each ( @ insert_flags , fn { flag , bit } ->
166
- defp flag_to_bit ( :insert , unquote ( flag ) ) , do: unquote ( bit )
167
- end )
168
-
169
107
Enum . each ( @ query_flags , fn { flag , bit } ->
170
108
defp flag_to_bit ( :query , unquote ( flag ) ) , do: unquote ( bit )
171
109
end )
172
110
173
- Enum . each ( @ delete_flags , fn { flag , bit } ->
174
- defp flag_to_bit ( :delete , unquote ( flag ) ) , do: unquote ( bit )
175
- end )
176
-
177
111
defp flag_to_bit ( _op , _flag ) , do: 0x0
178
112
end
0 commit comments