Skip to content
This repository was archived by the owner on Aug 30, 2024. It is now read-only.

Commit 9ed9833

Browse files
authored
Add support for _scheduler endpoints (#379)
* Add support for _scheduler endpoints * PR updates: - add get_doc for replication docs by docid - fix methods so they return the json, not the whole response * Fetch features/metadata, update replication_state * Copyrights, docstrings * Copyright * Fix TODO # Local Variables: * Attempt to fix broken test * Add new expected state for scheduler endpoint * Add new expected state for scheduler endpoint
1 parent f2962dc commit 9ed9833

File tree

5 files changed

+368
-16
lines changed

5 files changed

+368
-16
lines changed

src/cloudant/client.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ def __init__(self, user, auth_token, admin_party=False, **kwargs):
108108
if (not user and not auth_token) and (parsed_url.username and parsed_url.password):
109109
self._user = parsed_url.username
110110
self._auth_token = parsed_url.password
111+
self._features = None
111112

112113
connect_to_couch = kwargs.get('connect', False)
113114
if connect_to_couch and self._DATABASE_CLASS == CouchDatabase:
@@ -122,6 +123,18 @@ def is_iam_authenticated(self):
122123
"""
123124
return self._use_iam
124125

126+
def features(self):
127+
"""
128+
lazy fetch and cache features
129+
"""
130+
if self._features is None:
131+
metadata = self.metadata()
132+
if "features" in metadata:
133+
self._features = metadata["features"]
134+
else:
135+
self._features = []
136+
return self._features
137+
125138
def connect(self):
126139
"""
127140
Starts up an authentication session for the client using cookie
@@ -324,6 +337,16 @@ def db_updates(self, raw_data=False, **kwargs):
324337
"""
325338
return Feed(self, raw_data, **kwargs)
326339

340+
def metadata(self):
341+
"""
342+
Retrieves the remote server metadata dictionary.
343+
344+
:returns: Dictionary containing server metadata details
345+
"""
346+
resp = self.r_session.get(self.server_url)
347+
resp.raise_for_status()
348+
return resp.json()
349+
327350
def keys(self, remote=False):
328351
"""
329352
Returns the database names for this client. Default is

src/cloudant/replicator.py

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@
1818

1919
import uuid
2020

21+
from requests.exceptions import HTTPError
22+
2123
from .error import CloudantReplicatorException, CloudantClientException
2224
from .document import Document
25+
from .scheduler import Scheduler
2326

2427
class Replicator(object):
2528
"""
@@ -34,6 +37,7 @@ class Replicator(object):
3437

3538
def __init__(self, client):
3639
repl_db = '_replicator'
40+
self.client = client
3741
try:
3842
self.database = client[repl_db]
3943
except Exception:
@@ -133,12 +137,20 @@ def replication_state(self, repl_id):
133137
134138
:returns: Replication state as a ``str``
135139
"""
136-
try:
137-
repl_doc = self.database[repl_id]
138-
except KeyError:
139-
raise CloudantReplicatorException(404, repl_id)
140-
repl_doc.fetch()
141-
return repl_doc.get('_replication_state')
140+
if "scheduler" in self.client.features():
141+
try:
142+
repl_doc = Scheduler(self.client).get_doc(repl_id)
143+
except HTTPError as err:
144+
raise CloudantReplicatorException(err.response.status_code, repl_id)
145+
state = repl_doc['state']
146+
else:
147+
try:
148+
repl_doc = self.database[repl_id]
149+
except KeyError:
150+
raise CloudantReplicatorException(404, repl_id)
151+
repl_doc.fetch()
152+
state = repl_doc.get('_replication_state')
153+
return state
142154

143155
def follow_replication(self, repl_id):
144156
"""
@@ -161,12 +173,19 @@ def update_state():
161173
"""
162174
Retrieves the replication state.
163175
"""
164-
try:
165-
arepl_doc = self.database[repl_id]
166-
arepl_doc.fetch()
167-
return arepl_doc, arepl_doc.get('_replication_state')
168-
except KeyError:
169-
return None, None
176+
if "scheduler" in self.client.features():
177+
try:
178+
arepl_doc = Scheduler(self.client).get_doc(repl_id)
179+
return arepl_doc, arepl_doc['state']
180+
except HTTPError:
181+
return None, None
182+
else:
183+
try:
184+
arepl_doc = self.database[repl_id]
185+
arepl_doc.fetch()
186+
return arepl_doc, arepl_doc.get('_replication_state')
187+
except KeyError:
188+
return None, None
170189

171190
while True:
172191
# Make sure we fetch the state up front, just in case it moves

src/cloudant/scheduler.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
#!/usr/bin/env python
2+
# Copyright (C) 2018 IBM Corp. All rights reserved.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
"""
16+
API module for interacting with scheduler endpoints
17+
"""
18+
19+
class Scheduler(object):
20+
"""
21+
API for retrieving scheduler jobs and documents.
22+
23+
:param client: Client instance used by the database. Can either be a
24+
``CouchDB`` or ``Cloudant`` client instance.
25+
"""
26+
27+
def __init__(self, client):
28+
self._client = client
29+
self._r_session = client.r_session
30+
self._scheduler = '/'.join([self._client.server_url, '_scheduler'])
31+
32+
def list_docs(self, limit=None, skip=None):
33+
"""
34+
Lists replication documents. Includes information
35+
about all the documents, even in completed and failed
36+
states. For each document it returns the document ID, the
37+
database, the replication ID, source and target, and other
38+
information.
39+
40+
:param limit: How many results to return.
41+
:param skip: How many result to skip starting at the beginning, if ordered by document ID.
42+
"""
43+
params = dict()
44+
if limit != None:
45+
params["limit"] = limit
46+
if skip != None:
47+
params["skip"] = skip
48+
resp = self._r_session.get('/'.join([self._scheduler, 'docs']), params=params)
49+
resp.raise_for_status()
50+
return resp.json()
51+
52+
def get_doc(self, doc_id):
53+
"""
54+
Get replication document state for a given replication document ID.
55+
"""
56+
resp = self._r_session.get('/'.join([self._scheduler, 'docs', '_replicator', doc_id]))
57+
resp.raise_for_status()
58+
return resp.json()
59+
60+
61+
def list_jobs(self, limit=None, skip=None):
62+
"""
63+
Lists replication jobs. Includes replications created via
64+
/_replicate endpoint as well as those created from replication
65+
documents. Does not include replications which have completed
66+
or have failed to start because replication documents were
67+
malformed. Each job description will include source and target
68+
information, replication id, a history of recent event, and a
69+
few other things.
70+
71+
:param limit: How many results to return.
72+
:param skip: How many result to skip starting at the beginning, if ordered by document ID.
73+
"""
74+
params = dict()
75+
if limit != None:
76+
params["limit"] = limit
77+
if skip != None:
78+
params["skip"] = skip
79+
resp = self._r_session.get('/'.join([self._scheduler, 'jobs']), params=params)
80+
resp.raise_for_status()
81+
return resp.json()

tests/unit/replicator_tests.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ def test_retrieve_replication_state(self):
317317
)
318318
self.replication_ids.append(repl_id)
319319
repl_state = None
320-
valid_states = ['completed', 'error', 'triggered', None]
320+
valid_states = ['completed', 'error', 'triggered', 'running', None]
321321
finished = False
322322
for _ in range(300):
323323
repl_state = self.replicator.replication_state(repl_id)
@@ -402,11 +402,15 @@ def test_follow_replication(self):
402402
repl_id
403403
)
404404
self.replication_ids.append(repl_id)
405-
valid_states = ('completed', 'error', 'triggered', None)
405+
valid_states = ('completed', 'error', 'triggered', 'running', None)
406406
repl_states = []
407+
if 'scheduler' in self.client.features():
408+
state_key = 'state'
409+
else:
410+
state_key = '_replication_state'
407411
for doc in self.replicator.follow_replication(repl_id):
408-
self.assertIn(doc.get('_replication_state'), valid_states)
409-
repl_states.append(doc.get('_replication_state'))
412+
self.assertIn(doc.get(state_key), valid_states)
413+
repl_states.append(doc.get(state_key))
410414
self.assertTrue(len(repl_states) > 0)
411415
self.assertEqual(repl_states[-1], 'completed')
412416
self.assertNotIn('error', repl_states)

0 commit comments

Comments
 (0)