Skip to content

Commit 44b4d50

Browse files
authored
Merge pull request #3127 from dhermes/ds-move-commit-to-GAPIC
Using GAPIC datastore object (and an HTTP equivalent) for commit.
2 parents 7ba4d7a + 99ba184 commit 44b4d50

File tree

9 files changed

+446
-399
lines changed

9 files changed

+446
-399
lines changed

datastore/google/cloud/datastore/_gax.py

Lines changed: 34 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616

1717

1818
import contextlib
19+
import sys
1920

2021
from google.cloud.gapic.datastore.v1 import datastore_client
2122
from google.cloud.proto.datastore.v1 import datastore_pb2_grpc
23+
from google.gax.errors import GaxError
24+
from google.gax.grpc import exc_to_code
2225
from google.gax.utils import metrics
2326
from grpc import StatusCode
2427
import six
@@ -71,13 +74,22 @@ def _grpc_catch_rendezvous():
7174
"""
7275
try:
7376
yield
77+
except GaxError as exc:
78+
error_code = exc_to_code(exc.cause)
79+
error_class = _GRPC_ERROR_MAPPING.get(error_code)
80+
if error_class is None:
81+
raise
82+
else:
83+
new_exc = error_class(exc.cause.details())
84+
six.reraise(error_class, new_exc, sys.exc_info()[2])
7485
except exceptions.GrpcRendezvous as exc:
7586
error_code = exc.code()
7687
error_class = _GRPC_ERROR_MAPPING.get(error_code)
7788
if error_class is None:
7889
raise
7990
else:
80-
raise error_class(exc.details())
91+
new_exc = error_class(exc.details())
92+
six.reraise(error_class, new_exc, sys.exc_info()[2])
8193

8294

8395
class _DatastoreAPIOverGRPC(object):
@@ -158,56 +170,38 @@ def begin_transaction(self, project, request_pb):
158170
with _grpc_catch_rendezvous():
159171
return self._stub.BeginTransaction(request_pb)
160172

161-
def commit(self, project, request_pb):
162-
"""Perform a ``commit`` request.
163-
164-
:type project: str
165-
:param project: The project to connect to. This is
166-
usually your project name in the cloud console.
167-
168-
:type request_pb: :class:`.datastore_pb2.CommitRequest`
169-
:param request_pb: The request protobuf object.
170173

171-
:rtype: :class:`.datastore_pb2.CommitResponse`
172-
:returns: The returned protobuf response object.
173-
"""
174-
request_pb.project_id = project
175-
with _grpc_catch_rendezvous():
176-
return self._stub.Commit(request_pb)
174+
class GAPICDatastoreAPI(datastore_client.DatastoreClient):
175+
"""An API object that sends proto-over-gRPC requests.
177176
178-
def rollback(self, project, request_pb):
179-
"""Perform a ``rollback`` request.
177+
A light wrapper around the parent class, with exception re-mapping
178+
provided (from GaxError to our native errors).
180179
181-
:type project: str
182-
:param project: The project to connect to. This is
183-
usually your project name in the cloud console.
180+
:type args: tuple
181+
:param args: Positional arguments to pass to constructor.
184182
185-
:type request_pb: :class:`.datastore_pb2.RollbackRequest`
186-
:param request_pb: The request protobuf object.
183+
:type kwargs: dict
184+
:param kwargs: Keyword arguments to pass to constructor.
185+
"""
187186

188-
:rtype: :class:`.datastore_pb2.RollbackResponse`
189-
:returns: The returned protobuf response object.
190-
"""
191-
request_pb.project_id = project
192-
with _grpc_catch_rendezvous():
193-
return self._stub.Rollback(request_pb)
187+
def commit(self, *args, **kwargs):
188+
"""Perform a ``commit`` request.
194189
195-
def allocate_ids(self, project, request_pb):
196-
"""Perform an ``allocateIds`` request.
190+
A light wrapper around the the base method from the parent class.
191+
Intended to provide exception re-mapping (from GaxError to our
192+
native errors).
197193
198-
:type project: str
199-
:param project: The project to connect to. This is
200-
usually your project name in the cloud console.
194+
:type args: tuple
195+
:param args: Positional arguments to pass to base method.
201196
202-
:type request_pb: :class:`.datastore_pb2.AllocateIdsRequest`
203-
:param request_pb: The request protobuf object.
197+
:type kwargs: dict
198+
:param kwargs: Keyword arguments to pass to base method.
204199
205-
:rtype: :class:`.datastore_pb2.AllocateIdsResponse`
200+
:rtype: :class:`.datastore_pb2.CommitResponse`
206201
:returns: The returned protobuf response object.
207202
"""
208-
request_pb.project_id = project
209203
with _grpc_catch_rendezvous():
210-
return self._stub.AllocateIds(request_pb)
204+
return super(GAPICDatastoreAPI, self).commit(*args, **kwargs)
211205

212206

213207
def make_datastore_api(client):
@@ -222,5 +216,5 @@ def make_datastore_api(client):
222216
channel = make_secure_channel(
223217
client._credentials, DEFAULT_USER_AGENT,
224218
datastore_client.DatastoreClient.SERVICE_ADDRESS)
225-
return datastore_client.DatastoreClient(
219+
return GAPICDatastoreAPI(
226220
channel=channel, lib_name='gccl', lib_version=__version__)

datastore/google/cloud/datastore/_http.py

Lines changed: 33 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -216,23 +216,6 @@ def begin_transaction(self, project, request_pb):
216216
self.connection.api_base_url,
217217
request_pb, _datastore_pb2.BeginTransactionResponse)
218218

219-
def commit(self, project, request_pb):
220-
"""Perform a ``commit`` request.
221-
222-
:type project: str
223-
:param project: The project to connect to. This is
224-
usually your project name in the cloud console.
225-
226-
:type request_pb: :class:`.datastore_pb2.CommitRequest`
227-
:param request_pb: The request protobuf object.
228-
229-
:rtype: :class:`.datastore_pb2.CommitResponse`
230-
:returns: The returned protobuf response object.
231-
"""
232-
return _rpc(self.connection.http, project, 'commit',
233-
self.connection.api_base_url,
234-
request_pb, _datastore_pb2.CommitResponse)
235-
236219

237220
class Connection(connection_module.Connection):
238221
"""A connection to the Google Cloud Datastore via the Protobuf API.
@@ -366,37 +349,6 @@ def begin_transaction(self, project):
366349
request = _datastore_pb2.BeginTransactionRequest()
367350
return self._datastore_api.begin_transaction(project, request)
368351

369-
def commit(self, project, request, transaction_id):
370-
"""Commit mutations in context of current transaction (if any).
371-
372-
Maps the ``DatastoreService.Commit`` protobuf RPC.
373-
374-
:type project: str
375-
:param project: The project to which the transaction applies.
376-
377-
:type request: :class:`.datastore_pb2.CommitRequest`
378-
:param request: The protobuf with the mutations being committed.
379-
380-
:type transaction_id: str
381-
:param transaction_id: (Optional) The transaction ID returned from
382-
:meth:`begin_transaction`. Non-transactional
383-
batches must pass ``None``.
384-
385-
.. note::
386-
387-
This method will mutate ``request`` before using it.
388-
389-
:rtype: :class:`.datastore_pb2.CommitResponse`
390-
:returns: The protobuf response from a commit request.
391-
"""
392-
if transaction_id:
393-
request.mode = _datastore_pb2.CommitRequest.TRANSACTIONAL
394-
request.transaction = transaction_id
395-
else:
396-
request.mode = _datastore_pb2.CommitRequest.NON_TRANSACTIONAL
397-
398-
return self._datastore_api.commit(project, request)
399-
400352

401353
class HTTPDatastoreAPI(object):
402354
"""An API object that sends proto-over-HTTP requests.
@@ -410,6 +362,39 @@ class HTTPDatastoreAPI(object):
410362
def __init__(self, client):
411363
self.client = client
412364

365+
def commit(self, project, mode, mutations, transaction=None):
366+
"""Perform a ``commit`` request.
367+
368+
:type project: str
369+
:param project: The project to connect to. This is
370+
usually your project name in the cloud console.
371+
372+
:type mode: :class:`.gapic.datastore.v1.enums.CommitRequest.Mode`
373+
:param mode: The type of commit to perform. Expected to be one of
374+
``TRANSACTIONAL`` or ``NON_TRANSACTIONAL``.
375+
376+
:type mutations: list
377+
:param mutations: List of :class:`.datastore_pb2.Mutation`, the
378+
mutations to perform.
379+
380+
:type transaction: bytes
381+
:param transaction: (Optional) The transaction ID returned from
382+
:meth:`begin_transaction`. Non-transactional
383+
commits must pass :data:`None`.
384+
385+
:rtype: :class:`.datastore_pb2.CommitResponse`
386+
:returns: The returned protobuf response object.
387+
"""
388+
request_pb = _datastore_pb2.CommitRequest(
389+
project_id=project,
390+
mode=mode,
391+
transaction=transaction,
392+
mutations=mutations,
393+
)
394+
return _rpc(self.client._http, project, 'commit',
395+
self.client._base_url,
396+
request_pb, _datastore_pb2.CommitResponse)
397+
413398
def rollback(self, project, transaction_id):
414399
"""Perform a ``rollback`` request.
415400

datastore/google/cloud/datastore/batch.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ class Batch(object):
7777

7878
def __init__(self, client):
7979
self._client = client
80-
self._commit_request = _datastore_pb2.CommitRequest()
80+
self._mutations = []
8181
self._partial_key_entities = []
8282
self._status = self._INITIAL
8383

@@ -110,7 +110,8 @@ def _add_partial_key_entity_pb(self):
110110
:returns: The newly created entity protobuf that will be
111111
updated and sent with a commit.
112112
"""
113-
new_mutation = self.mutations.add()
113+
new_mutation = _datastore_pb2.Mutation()
114+
self._mutations.append(new_mutation)
114115
return new_mutation.insert
115116

116117
def _add_complete_key_entity_pb(self):
@@ -123,7 +124,8 @@ def _add_complete_key_entity_pb(self):
123124
# We use ``upsert`` for entities with completed keys, rather than
124125
# ``insert`` or ``update``, in order not to create race conditions
125126
# based on prior existence / removal of the entity.
126-
new_mutation = self.mutations.add()
127+
new_mutation = _datastore_pb2.Mutation()
128+
self._mutations.append(new_mutation)
127129
return new_mutation.upsert
128130

129131
def _add_delete_key_pb(self):
@@ -133,7 +135,8 @@ def _add_delete_key_pb(self):
133135
:returns: The newly created key protobuf that will be
134136
deleted when sent with a commit.
135137
"""
136-
new_mutation = self.mutations.add()
138+
new_mutation = _datastore_pb2.Mutation()
139+
self._mutations.append(new_mutation)
137140
return new_mutation.delete
138141

139142
@property
@@ -150,7 +153,7 @@ def mutations(self):
150153
:returns: The list of :class:`.datastore_pb2.Mutation`
151154
protobufs to be sent in the commit request.
152155
"""
153-
return self._commit_request.mutations
156+
return self._mutations
154157

155158
def put(self, entity):
156159
"""Remember an entity's state to be saved during :meth:`commit`.
@@ -237,9 +240,13 @@ def _commit(self):
237240
238241
This is called by :meth:`commit`.
239242
"""
240-
# NOTE: ``self._commit_request`` will be modified.
241-
commit_response_pb = self._client._connection.commit(
242-
self.project, self._commit_request, self._id)
243+
if self._id is None:
244+
mode = _datastore_pb2.CommitRequest.NON_TRANSACTIONAL
245+
else:
246+
mode = _datastore_pb2.CommitRequest.TRANSACTIONAL
247+
248+
commit_response_pb = self._client._datastore_api.commit(
249+
self.project, mode, self._mutations, transaction=self._id)
243250
_, updated_keys = _parse_commit_response(commit_response_pb)
244251
# If the back-end returns without error, we are guaranteed that
245252
# :meth:`Connection.commit` will return keys that match (length and

0 commit comments

Comments
 (0)