Skip to content

Commit 8888e97

Browse files
committed
PYTHON-1884 Support auto encryption in network.command
Add encryption I/O callbacks. Add mongocryptd process management. Add simple test for auto encryption round trip.
1 parent 611c3f8 commit 8888e97

File tree

8 files changed

+444
-4
lines changed

8 files changed

+444
-4
lines changed

bson/raw_bson.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,8 @@ def __inflated(self):
9595
# We already validated the object's size when this document was
9696
# created, so no need to do that again.
9797
# Use SON to preserve ordering of elements.
98-
self.__inflated_doc = _raw_to_dict(
99-
self.__raw, 4, len(self.__raw)-1, self.__codec_options, SON())
98+
self.__inflated_doc = _inflate_bson(
99+
self.__raw, self.__codec_options)
100100
return self.__inflated_doc
101101

102102
def __getitem__(self, item):
@@ -118,6 +118,20 @@ def __repr__(self):
118118
% (self.raw, self.__codec_options))
119119

120120

121+
def _inflate_bson(bson_bytes, codec_options):
122+
"""Inflates the top level fields of a BSON document.
123+
124+
:Parameters:
125+
- `bson_bytes`: the BSON bytes that compose this document
126+
- `codec_options`: An instance of
127+
:class:`~bson.codec_options.CodecOptions` whose ``document_class``
128+
must be :class:`RawBSONDocument`.
129+
"""
130+
# Use SON to preserve ordering of elements.
131+
return _raw_to_dict(
132+
bson_bytes, 4, len(bson_bytes)-1, codec_options, SON())
133+
134+
121135
DEFAULT_RAW_BSON_OPTIONS = DEFAULT.with_options(document_class=RawBSONDocument)
122136
"""The default :class:`~bson.codec_options.CodecOptions` for
123137
:class:`RawBSONDocument`.

pymongo/encryption.py

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
# Copyright 2019-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Client side encryption implementation."""
16+
17+
import subprocess
18+
import weakref
19+
20+
from pymongocrypt.auto_encrypter import AutoEncrypter
21+
from pymongocrypt.errors import MongoCryptError
22+
from pymongocrypt.mongocrypt import MongoCryptOptions
23+
from pymongocrypt.state_machine import MongoCryptCallback
24+
25+
from bson import _bson_to_dict, _dict_to_bson
26+
from bson.binary import STANDARD
27+
from bson.codec_options import CodecOptions
28+
from bson.raw_bson import (DEFAULT_RAW_BSON_OPTIONS,
29+
RawBSONDocument,
30+
_inflate_bson)
31+
from bson.son import SON
32+
33+
from pymongo.errors import ServerSelectionTimeoutError
34+
from pymongo.mongo_client import MongoClient
35+
from pymongo.pool import _configured_socket, PoolOptions
36+
from pymongo.ssl_support import get_ssl_context
37+
38+
39+
_HTTPS_PORT = 443
40+
_KMS_CONNECT_TIMEOUT = 10 # TODO: CDRIVER-3262 will define this value.
41+
_MONGOCRYPTD_TIMEOUT_MS = 1000
42+
43+
_DATA_KEY_OPTS = CodecOptions(document_class=SON, uuid_representation=STANDARD)
44+
# Use RawBSONDocument codec options to avoid needlessly decoding
45+
# documents from the key vault.
46+
_KEY_VAULT_OPTS = CodecOptions(document_class=RawBSONDocument,
47+
uuid_representation=STANDARD)
48+
49+
50+
class _EncryptionIO(MongoCryptCallback):
51+
def __init__(self, client, key_vault_coll, mongocryptd_client, opts):
52+
"""Internal class to perform I/O on behalf of pymongocrypt."""
53+
# Use a weak ref to break reference cycle.
54+
self.client_ref = weakref.ref(client)
55+
self.key_vault_coll = key_vault_coll.with_options(
56+
codec_options=_KEY_VAULT_OPTS)
57+
self.mongocryptd_client = mongocryptd_client
58+
self.opts = opts
59+
self._spawned = False
60+
61+
def kms_request(self, kms_context):
62+
"""Complete a KMS request.
63+
64+
:Parameters:
65+
- `kms_context`: A :class:`MongoCryptKmsContext`.
66+
67+
:Returns:
68+
None
69+
"""
70+
endpoint = kms_context.endpoint
71+
message = kms_context.message
72+
ctx = get_ssl_context(None, None, None, None, None, None, True)
73+
opts = PoolOptions(connect_timeout=_KMS_CONNECT_TIMEOUT,
74+
socket_timeout=_KMS_CONNECT_TIMEOUT,
75+
ssl_context=ctx)
76+
try:
77+
conn = _configured_socket((endpoint, _HTTPS_PORT), opts)
78+
conn.sendall(message)
79+
except Exception as exc:
80+
raise MongoCryptError(str(exc))
81+
82+
while kms_context.bytes_needed > 0:
83+
data = conn.recv(kms_context.bytes_needed)
84+
kms_context.feed(data)
85+
86+
def collection_info(self, database, filter):
87+
"""Get the collection info for a namespace.
88+
89+
The returned collection info is passed to libmongocrypt which reads
90+
the JSON schema.
91+
92+
:Parameters:
93+
- `database`: The database on which to run listCollections.
94+
- `filter`: The filter to pass to listCollections.
95+
96+
:Returns:
97+
The first document from the listCollections command response as BSON.
98+
"""
99+
with self.client_ref()[database].list_collections(
100+
filter=RawBSONDocument(filter)) as cursor:
101+
for doc in cursor:
102+
return _dict_to_bson(doc, False, _DATA_KEY_OPTS)
103+
104+
def spawn(self):
105+
"""Spawn mongocryptd.
106+
107+
Note this method is thread safe; at most one mongocryptd will start
108+
successfully.
109+
"""
110+
self._spawned = True
111+
args = [self.opts._mongocryptd_spawn_path or 'mongocryptd']
112+
args.extend(self.opts._mongocryptd_spawn_args)
113+
subprocess.Popen(args)
114+
115+
def mark_command(self, database, cmd):
116+
"""Mark a command for encryption.
117+
118+
:Parameters:
119+
- `database`: The database on which to run this command.
120+
- `cmd`: The BSON command to run.
121+
122+
:Returns:
123+
The marked command response from mongocryptd.
124+
"""
125+
if not self._spawned and not self.opts._mongocryptd_bypass_spawn:
126+
self.spawn()
127+
# Database.command only supports mutable mappings so we need to decode
128+
# the raw BSON command first.
129+
inflated_cmd = _inflate_bson(cmd, DEFAULT_RAW_BSON_OPTIONS)
130+
try:
131+
res = self.mongocryptd_client[database].command(
132+
inflated_cmd,
133+
codec_options=DEFAULT_RAW_BSON_OPTIONS)
134+
except ServerSelectionTimeoutError:
135+
if self.opts._mongocryptd_bypass_spawn:
136+
raise
137+
self.spawn()
138+
res = self.mongocryptd_client[database].command(
139+
inflated_cmd,
140+
codec_options=DEFAULT_RAW_BSON_OPTIONS)
141+
return res.raw
142+
143+
def fetch_keys(self, filter):
144+
"""Yields one or more keys from the key vault.
145+
146+
:Parameters:
147+
- `filter`: The filter to pass to find.
148+
149+
:Returns:
150+
A generator which yields the requested keys from the key vault.
151+
"""
152+
with self.key_vault_coll.find(RawBSONDocument(filter)) as cursor:
153+
for key in cursor:
154+
yield key.raw
155+
156+
def insert_data_key(self, data_key):
157+
"""Insert a data key into the key vault.
158+
159+
:Parameters:
160+
- `data_key`: The data key document to insert.
161+
162+
:Returns:
163+
The _id of the inserted data key document.
164+
"""
165+
# insert does not return the inserted _id when given a RawBSONDocument.
166+
doc = _bson_to_dict(data_key, _DATA_KEY_OPTS)
167+
res = self.key_vault_coll.insert_one(doc)
168+
return res.inserted_id
169+
170+
def close(self):
171+
"""Release resources.
172+
173+
Note it is not safe to call this method from __del__ or any GC hooks.
174+
"""
175+
self.client_ref = None
176+
self.key_vault_coll = None
177+
self.mongocryptd_client.close()
178+
self.mongocryptd_client = None
179+
180+
181+
class _Encrypter(object):
182+
def __init__(self, io_callbacks, opts):
183+
"""Encrypts and decrypts MongoDB commands.
184+
185+
This class is used to support automatic encryption and decryption of
186+
MongoDB commands.
187+
188+
:Parameters:
189+
- `io_callbacks`: A :class:`MongoCryptCallback`.
190+
- `opts`: The encrypted client's :class:`AutoEncryptionOpts`.
191+
"""
192+
if opts._schema_map is None:
193+
schema_map = None
194+
else:
195+
schema_map = _dict_to_bson(opts._schema_map, False, _DATA_KEY_OPTS)
196+
self._auto_encrypter = AutoEncrypter(io_callbacks, MongoCryptOptions(
197+
opts._kms_providers, schema_map))
198+
self._bypass_auto_encryption = opts._bypass_auto_encryption
199+
200+
def encrypt(self, database, cmd):
201+
"""Encrypt a MongoDB command.
202+
203+
:Parameters:
204+
- `database`: The database for this command.
205+
- `cmd`: A command as BSON.
206+
207+
:Returns:
208+
The encrypted command to execute.
209+
"""
210+
encrypted_cmd = self._auto_encrypter.encrypt(database, cmd)
211+
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd.
212+
return _inflate_bson(encrypted_cmd, DEFAULT_RAW_BSON_OPTIONS)
213+
214+
def decrypt(self, response):
215+
"""Decrypt a MongoDB command response.
216+
217+
:Parameters:
218+
- `response`: A MongoDB command response as BSON.
219+
220+
:Returns:
221+
The decrypted command response.
222+
"""
223+
return self._auto_encrypter.decrypt(response)
224+
225+
def close(self):
226+
"""Cleanup resources."""
227+
self._auto_encrypter.close()
228+
229+
@staticmethod
230+
def create(client, opts):
231+
"""Create a _CommandEncyptor for a client.
232+
233+
:Parameters:
234+
- `client`: The encrypted MongoClient.
235+
- `opts`: The encrypted client's :class:`AutoEncryptionOpts`.
236+
237+
:Returns:
238+
A :class:`_CommandEncrypter` for this client.
239+
"""
240+
key_vault_client = opts._key_vault_client or client
241+
db, coll = opts._key_vault_namespace.split('.', 1)
242+
key_vault_coll = key_vault_client[db][coll]
243+
244+
mongocryptd_client = MongoClient(
245+
opts._mongocryptd_uri, connect=False,
246+
serverSelectionTimeoutMS=_MONGOCRYPTD_TIMEOUT_MS)
247+
248+
io_callbacks = _EncryptionIO(
249+
client, key_vault_coll, mongocryptd_client, opts)
250+
return _Encrypter(io_callbacks, opts)

pymongo/message.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,11 @@ def command_response(self):
14391439
assert self.number_returned == 1
14401440
return docs[0]
14411441

1442+
def raw_command_response(self):
1443+
"""Return the bytes of the command response."""
1444+
# This should never be called on _OpReply.
1445+
raise NotImplementedError
1446+
14421447
@classmethod
14431448
def unpack(cls, msg):
14441449
"""Construct an _OpReply from raw bytes."""
@@ -1485,6 +1490,10 @@ def command_response(self):
14851490
"""Unpack a command response."""
14861491
return self.unpack_response()[0]
14871492

1493+
def raw_command_response(self):
1494+
"""Return the bytes of the command response."""
1495+
return self.payload_document
1496+
14881497
@classmethod
14891498
def unpack(cls, msg):
14901499
"""Construct an _OpMsg from raw bytes."""

pymongo/mongo_client.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,12 @@ def target():
713713
self._kill_cursors_executor = executor
714714
executor.open()
715715

716+
self._encrypter = None
717+
if self.__options.auto_encryption_opts:
718+
from pymongo.encryption import _Encrypter
719+
self._encrypter = _Encrypter.create(
720+
self, self.__options.auto_encryption_opts)
721+
716722
def _cache_credentials(self, source, credentials, connect=False):
717723
"""Save a set of authentication credentials.
718724
@@ -1152,6 +1158,9 @@ def close(self):
11521158
self._kill_cursors_executor.close()
11531159
self._process_periodic_tasks()
11541160
self._topology.close()
1161+
if self._encrypter:
1162+
# TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
1163+
self._encrypter.close()
11551164

11561165
def set_cursor_manager(self, manager_class):
11571166
"""DEPRECATED - Set this client's cursor manager.

pymongo/network.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
except ImportError:
3535
_SELECT_ERROR = OSError
3636

37+
from bson import _dict_to_bson, _decode_all_selective
3738
from bson.py3compat import PY3
3839

3940
from pymongo import helpers, message
@@ -114,6 +115,18 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
114115
if compression_ctx and name.lower() in _NO_COMPRESSION:
115116
compression_ctx = None
116117

118+
if (client and client._encrypter and
119+
not client._encrypter._bypass_auto_encryption):
120+
# Workaround for $clusterTime which is incompatible with check_keys.
121+
if check_keys:
122+
cluster_time = spec.pop('$clusterTime', None)
123+
spec = orig = client._encrypter.encrypt(
124+
dbname, _dict_to_bson(spec, check_keys, codec_options))
125+
if check_keys and cluster_time:
126+
spec['$clusterTime'] = cluster_time
127+
# We already checked the keys, no need to do it again.
128+
check_keys = False
129+
117130
if use_op_msg:
118131
flags = 2 if unacknowledged else 0
119132
request_id, msg, size, max_doc_size = message._op_msg(
@@ -143,6 +156,7 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
143156
sock.sendall(msg)
144157
if use_op_msg and unacknowledged:
145158
# Unacknowledged, fake a successful command response.
159+
reply = None
146160
response_doc = {"ok": 1}
147161
else:
148162
reply = receive_message(sock, request_id)
@@ -170,6 +184,12 @@ def command(sock, dbname, spec, slave_ok, is_mongos,
170184
duration = (datetime.datetime.now() - start) + encoding_duration
171185
listeners.publish_command_success(
172186
duration, response_doc, name, request_id, address)
187+
188+
if client and client._encrypter and reply:
189+
decrypted = client._encrypter.decrypt(reply.raw_command_response())
190+
response_doc = _decode_all_selective(decrypted, codec_options,
191+
user_fields)[0]
192+
173193
return response_doc
174194

175195
_UNPACK_COMPRESSION_HEADER = struct.Struct("<iiB").unpack
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"_id": {
3+
"$binary": {
4+
"base64": "YWFhYWFhYWFhYWFhYWFhYQ==",
5+
"subType": "04"
6+
}
7+
},
8+
"keyMaterial": {
9+
"$binary": {
10+
"base64": "db27rshiqK4Jqhb2xnwK4RfdFb9JuKeUe6xt5aYQF4o62tS75b7B4wxVN499gND9UVLUbpVKoyUoaZAeA895OENP335b8n8OwchcTFqS44t+P3zmhteYUQLIWQXaIgon7gEgLeJbaDHmSXS6/7NbfDDFlB37N7BP/2hx1yCOTN6NG/8M1ppw3LYT3CfP6EfXVEttDYtPbJpbb7nBVlxD7w==",
11+
"subType": "00"
12+
}
13+
},
14+
"creationDate": { "$date": { "$numberLong": "1564354142963" } },
15+
"updateDate": { "$date": { "$numberLong": "1564354142963" } },
16+
"status": { "$numberInt": "0" },
17+
"masterKey": { "provider": "local" }
18+
}

0 commit comments

Comments
 (0)