Skip to content

Commit b92e796

Browse files
committed
Using GAPIC datastore object (and an HTTP equivalent) for commit.
This is a bit more involved since - The signature requires the **caller** to determine the mode - The signature receives a list of mutations, so we no longer collect mutations within a CommitRequest object - The exception re-mapping must be done, so we can't use the base datastore_client.DatastoreClient GAPIC object
1 parent 7ba4d7a commit b92e796

File tree

4 files changed

+82
-101
lines changed

4 files changed

+82
-101
lines changed

datastore/google/cloud/datastore/_gax.py

Lines changed: 30 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
from google.cloud.gapic.datastore.v1 import datastore_client
2121
from google.cloud.proto.datastore.v1 import datastore_pb2_grpc
22+
from google.gax.errors import GaxError
23+
from google.gax.grpc import exc_to_code
2224
from google.gax.utils import metrics
2325
from grpc import StatusCode
2426
import six
@@ -71,6 +73,13 @@ def _grpc_catch_rendezvous():
7173
"""
7274
try:
7375
yield
76+
except GaxError as exc:
77+
error_code = exc_to_code(exc.cause)
78+
error_class = _GRPC_ERROR_MAPPING.get(error_code)
79+
if error_class is None:
80+
raise
81+
else:
82+
raise error_class(exc.cause.details())
7483
except exceptions.GrpcRendezvous as exc:
7584
error_code = exc.code()
7685
error_class = _GRPC_ERROR_MAPPING.get(error_code)
@@ -158,56 +167,38 @@ def begin_transaction(self, project, request_pb):
158167
with _grpc_catch_rendezvous():
159168
return self._stub.BeginTransaction(request_pb)
160169

161-
def commit(self, project, request_pb):
162-
"""Perform a ``commit`` request.
163-
164-
:type project: str
165-
:param project: The project to connect to. This is
166-
usually your project name in the cloud console.
167-
168-
:type request_pb: :class:`.datastore_pb2.CommitRequest`
169-
:param request_pb: The request protobuf object.
170170

171-
:rtype: :class:`.datastore_pb2.CommitResponse`
172-
:returns: The returned protobuf response object.
173-
"""
174-
request_pb.project_id = project
175-
with _grpc_catch_rendezvous():
176-
return self._stub.Commit(request_pb)
171+
class GAPICDatastoreAPI(datastore_client.DatastoreClient):
172+
"""An API object that sends proto-over-gRPC requests.
177173
178-
def rollback(self, project, request_pb):
179-
"""Perform a ``rollback`` request.
174+
A light wrapper around the parent class, with exception re-mapping
175+
provided (from GaxError to our native errors).
180176
181-
:type project: str
182-
:param project: The project to connect to. This is
183-
usually your project name in the cloud console.
177+
:type args: tuple
178+
:param args: Positional arguments to pass to constructor.
184179
185-
:type request_pb: :class:`.datastore_pb2.RollbackRequest`
186-
:param request_pb: The request protobuf object.
180+
:type kwargs: dict
181+
:param kwargs: Keyword arguments to pass to constructor.
182+
"""
187183

188-
:rtype: :class:`.datastore_pb2.RollbackResponse`
189-
:returns: The returned protobuf response object.
190-
"""
191-
request_pb.project_id = project
192-
with _grpc_catch_rendezvous():
193-
return self._stub.Rollback(request_pb)
184+
def commit(self, *args, **kwargs):
185+
"""Perform a ``commit`` request.
194186
195-
def allocate_ids(self, project, request_pb):
196-
"""Perform an ``allocateIds`` request.
187+
A light wrapper around the the base method from the parent class.
188+
Intended to provide exception re-mapping (from GaxError to our
189+
native errors).
197190
198-
:type project: str
199-
:param project: The project to connect to. This is
200-
usually your project name in the cloud console.
191+
:type args: tuple
192+
:param args: Positional arguments to pass to base method.
201193
202-
:type request_pb: :class:`.datastore_pb2.AllocateIdsRequest`
203-
:param request_pb: The request protobuf object.
194+
:type kwargs: dict
195+
:param kwargs: Keyword arguments to pass to base method.
204196
205-
:rtype: :class:`.datastore_pb2.AllocateIdsResponse`
197+
:rtype: :class:`.datastore_pb2.CommitResponse`
206198
:returns: The returned protobuf response object.
207199
"""
208-
request_pb.project_id = project
209200
with _grpc_catch_rendezvous():
210-
return self._stub.AllocateIds(request_pb)
201+
return super(GAPICDatastoreAPI, self).commit(*args, **kwargs)
211202

212203

213204
def make_datastore_api(client):
@@ -222,5 +213,5 @@ def make_datastore_api(client):
222213
channel = make_secure_channel(
223214
client._credentials, DEFAULT_USER_AGENT,
224215
datastore_client.DatastoreClient.SERVICE_ADDRESS)
225-
return datastore_client.DatastoreClient(
216+
return GAPICDatastoreAPI(
226217
channel=channel, lib_name='gccl', lib_version=__version__)

datastore/google/cloud/datastore/_http.py

Lines changed: 33 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -216,23 +216,6 @@ def begin_transaction(self, project, request_pb):
216216
self.connection.api_base_url,
217217
request_pb, _datastore_pb2.BeginTransactionResponse)
218218

219-
def commit(self, project, request_pb):
220-
"""Perform a ``commit`` request.
221-
222-
:type project: str
223-
:param project: The project to connect to. This is
224-
usually your project name in the cloud console.
225-
226-
:type request_pb: :class:`.datastore_pb2.CommitRequest`
227-
:param request_pb: The request protobuf object.
228-
229-
:rtype: :class:`.datastore_pb2.CommitResponse`
230-
:returns: The returned protobuf response object.
231-
"""
232-
return _rpc(self.connection.http, project, 'commit',
233-
self.connection.api_base_url,
234-
request_pb, _datastore_pb2.CommitResponse)
235-
236219

237220
class Connection(connection_module.Connection):
238221
"""A connection to the Google Cloud Datastore via the Protobuf API.
@@ -366,37 +349,6 @@ def begin_transaction(self, project):
366349
request = _datastore_pb2.BeginTransactionRequest()
367350
return self._datastore_api.begin_transaction(project, request)
368351

369-
def commit(self, project, request, transaction_id):
370-
"""Commit mutations in context of current transaction (if any).
371-
372-
Maps the ``DatastoreService.Commit`` protobuf RPC.
373-
374-
:type project: str
375-
:param project: The project to which the transaction applies.
376-
377-
:type request: :class:`.datastore_pb2.CommitRequest`
378-
:param request: The protobuf with the mutations being committed.
379-
380-
:type transaction_id: str
381-
:param transaction_id: (Optional) The transaction ID returned from
382-
:meth:`begin_transaction`. Non-transactional
383-
batches must pass ``None``.
384-
385-
.. note::
386-
387-
This method will mutate ``request`` before using it.
388-
389-
:rtype: :class:`.datastore_pb2.CommitResponse`
390-
:returns: The protobuf response from a commit request.
391-
"""
392-
if transaction_id:
393-
request.mode = _datastore_pb2.CommitRequest.TRANSACTIONAL
394-
request.transaction = transaction_id
395-
else:
396-
request.mode = _datastore_pb2.CommitRequest.NON_TRANSACTIONAL
397-
398-
return self._datastore_api.commit(project, request)
399-
400352

401353
class HTTPDatastoreAPI(object):
402354
"""An API object that sends proto-over-HTTP requests.
@@ -410,6 +362,39 @@ class HTTPDatastoreAPI(object):
410362
def __init__(self, client):
411363
self.client = client
412364

365+
def commit(self, project, mode, mutations, transaction=None):
366+
"""Perform a ``commit`` request.
367+
368+
:type project: str
369+
:param project: The project to connect to. This is
370+
usually your project name in the cloud console.
371+
372+
:type mode: :class:`.gapic.datastore.v1.enums.CommitRequest.Mode`
373+
:param mode: The type of commit to perform. Expected to be one of
374+
``TRANSACTIONAL`` or ``NON_TRANSACTIONAL``.
375+
376+
:type mutations: list
377+
:param mutations: List of :class:`.datastore_pb2.Mutation`, the
378+
mutations to perform.
379+
380+
:type transaction: bytes
381+
:param transaction: (Optional) The transaction ID returned from
382+
:meth:`begin_transaction`. Non-transactional
383+
commits must pass :data:`None`.
384+
385+
:rtype: :class:`.datastore_pb2.CommitResponse`
386+
:returns: The returned protobuf response object.
387+
"""
388+
request_pb = _datastore_pb2.CommitRequest(
389+
project_id=project,
390+
mode=mode,
391+
transaction=transaction,
392+
mutations=mutations,
393+
)
394+
return _rpc(self.client._http, project, 'commit',
395+
self.client._base_url,
396+
request_pb, _datastore_pb2.CommitResponse)
397+
413398
def rollback(self, project, transaction_id):
414399
"""Perform a ``rollback`` request.
415400

datastore/google/cloud/datastore/batch.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class Batch(object):
7777

7878
def __init__(self, client):
7979
self._client = client
80-
self._commit_request = _datastore_pb2.CommitRequest()
80+
self._mutations = []
8181
self._partial_key_entities = []
8282
self._status = self._INITIAL
8383

@@ -110,7 +110,8 @@ def _add_partial_key_entity_pb(self):
110110
:returns: The newly created entity protobuf that will be
111111
updated and sent with a commit.
112112
"""
113-
new_mutation = self.mutations.add()
113+
new_mutation = _datastore_pb2.Mutation()
114+
self._mutations.append(new_mutation)
114115
return new_mutation.insert
115116

116117
def _add_complete_key_entity_pb(self):
@@ -123,7 +124,8 @@ def _add_complete_key_entity_pb(self):
123124
# We use ``upsert`` for entities with completed keys, rather than
124125
# ``insert`` or ``update``, in order not to create race conditions
125126
# based on prior existence / removal of the entity.
126-
new_mutation = self.mutations.add()
127+
new_mutation = _datastore_pb2.Mutation()
128+
self._mutations.append(new_mutation)
127129
return new_mutation.upsert
128130

129131
def _add_delete_key_pb(self):
@@ -133,7 +135,8 @@ def _add_delete_key_pb(self):
133135
:returns: The newly created key protobuf that will be
134136
deleted when sent with a commit.
135137
"""
136-
new_mutation = self.mutations.add()
138+
new_mutation = _datastore_pb2.Mutation()
139+
self._mutations.append(new_mutation)
137140
return new_mutation.delete
138141

139142
@property
@@ -150,7 +153,7 @@ def mutations(self):
150153
:returns: The list of :class:`.datastore_pb2.Mutation`
151154
protobufs to be sent in the commit request.
152155
"""
153-
return self._commit_request.mutations
156+
return self._mutations
154157

155158
def put(self, entity):
156159
"""Remember an entity's state to be saved during :meth:`commit`.
@@ -237,9 +240,13 @@ def _commit(self):
237240
238241
This is called by :meth:`commit`.
239242
"""
240-
# NOTE: ``self._commit_request`` will be modified.
241-
commit_response_pb = self._client._connection.commit(
242-
self.project, self._commit_request, self._id)
243+
if self._id is None:
244+
mode = _datastore_pb2.CommitRequest.NON_TRANSACTIONAL
245+
else:
246+
mode = _datastore_pb2.CommitRequest.TRANSACTIONAL
247+
248+
commit_response_pb = self._client._datastore_api.commit(
249+
self.project, mode, self._mutations, transaction=self._id)
243250
_, updated_keys = _parse_commit_response(commit_response_pb)
244251
# If the back-end returns without error, we are guaranteed that
245252
# :meth:`Connection.commit` will return keys that match (length and

system_tests/datastore.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,10 @@ class Config(object):
6161

6262

6363
def clone_client(client):
64-
# Fool the Client constructor to avoid creating a new connection.
65-
cloned_client = datastore.Client(project=client.project,
66-
namespace=client.namespace,
67-
http=object())
68-
cloned_client._connection = client._connection
69-
return cloned_client
64+
return datastore.Client(project=client.project,
65+
namespace=client.namespace,
66+
credentials=client._credentials,
67+
http=client._http)
7068

7169

7270
def setUpModule():

0 commit comments

Comments
 (0)