Skip to content

Commit d1e5ed7

Browse files
author
Frederick Ross
committed
Finished getting everything working with Ace.
1 parent 8a3e10a commit d1e5ed7

File tree

4 files changed

+93
-30
lines changed

4 files changed

+93
-30
lines changed

splunklib/client.py

Lines changed: 57 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@
110110
class IncomparableException(Exception):
111111
pass
112112

113+
class JobNotReadyException(Exception):
114+
pass
115+
113116
def trailing(template, *targets):
114117
s = template
115118
for t in targets:
@@ -595,7 +598,8 @@ class Entity(Endpoint):
595598
def __init__(self, service, path, **kwargs):
596599
Endpoint.__init__(self, service, path)
597600
self._state = None
598-
self.refresh(kwargs.get('state', None)) # "Prefresh"
601+
if not kwargs.get('skip_refresh', False):
602+
self.refresh(kwargs.get('state', None)) # "Prefresh"
599603

600604
def __eq__(self, other):
601605
"""Raises IncomparableException.
@@ -1597,7 +1601,8 @@ def oneshot(self, **kwargs):
15971601
class Job(Entity):
15981602
"""This class represents a search job."""
15991603
def __init__(self, service, path, **kwargs):
1600-
Entity.__init__(self, service, path, **kwargs)
1604+
Entity.__init__(self, service, path, skip_refresh=True, **kwargs)
1605+
self._isReady = False
16011606

16021607
# The Job entry record is returned at the root of the response
16031608
def _load_atom_entry(self, response):
@@ -1629,6 +1634,28 @@ def finalize(self):
16291634
self.post("control", action="finalize")
16301635
return self
16311636

1637+
def isDone(self):
1638+
"""Has this job finished running on the server yet?
1639+
1640+
:returns: boolean
1641+
"""
1642+
if (not self.isReady()):
1643+
return False
1644+
return self['isDone'] == '1'
1645+
1646+
def isReady(self):
1647+
"""Is this job queryable on the server yet?
1648+
1649+
:returns: boolean
1650+
"""
1651+
try:
1652+
self.refresh()
1653+
self._isReady = True
1654+
return self._isReady
1655+
except JobNotReadyException:
1656+
self._isReady = False
1657+
return False
1658+
16321659
@property
16331660
def name(self):
16341661
"""Returns the name of the search job."""
@@ -1639,20 +1666,35 @@ def pause(self):
16391666
self.post("control", action="pause")
16401667
return self
16411668

1642-
def read(self):
1643-
"""Returns the job's current state record, corresponding to the
1644-
current state of the server-side resource."""
1645-
# If the search job is newly created, it is possible that we will
1646-
# get 204s (No Content) until the job is ready to respond.
1647-
count = 0
1648-
while count < 10:
1669+
def refresh(self, state=None):
1670+
"""Refresh the state of this entity.
1671+
1672+
If *state* is provided, load it as the new state for this
1673+
entity. Otherwise, make a roundtrip to the server (by calling
1674+
the :meth:`read` method of self) to fetch an updated state,
1675+
plus at most two additional round trips if autologin is
1676+
enabled.
1677+
1678+
**Example**::
1679+
1680+
import splunklib.client as client
1681+
s = client.connect(...)
1682+
search = s.jobs.create('search index=_internal | head 1')
1683+
search.refresh()
1684+
"""
1685+
if state is not None:
1686+
self._state = state
1687+
else:
16491688
response = self.get()
1650-
if response.status == 204:
1651-
sleep(1)
1652-
count += 1
1653-
continue
1654-
return self._load_state(response)
1655-
raise OperationError, "Operation timed out."
1689+
if response.status == 204:
1690+
self._isReady = False
1691+
raise JobNotReadyException()
1692+
else:
1693+
self._isReady = True
1694+
raw_state = self._load_state(response)
1695+
raw_state['links'] = dict([(k, urllib.unquote(v)) for k,v in raw_state['links'].iteritems()])
1696+
self._state = raw_state
1697+
return self
16561698

16571699
def results(self, timeout=None, wait_time=1, **query_params):
16581700
"""Fetch search results as an InputStream IO handle.

tests/test_fired_alert.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ def test_crud(self):
6262
'alert.severity': "5",
6363
'alert.suppress': "0",
6464
'alert.track': "1",
65-
'dispatch.earliest_time': "rt",
66-
'dispatch.latest_time': "rt",
65+
'dispatch.earliest_time': "-1m",
66+
'dispatch.latest_time': "now",
6767
'is_scheduled': "0",
6868
'realtime_schedule': "1",
6969
'cron_schedule': "* * * * *"
@@ -81,22 +81,23 @@ def test_crud(self):
8181
# Now schedule the saved search
8282
search.update(is_scheduled=1)
8383
search.refresh()
84+
8485
self.assertEqual(search.content.is_scheduled, "1")
8586
self.assertEqual(alert_count(search), 0)
8687

8788
# Wait for the saved search to run. When it runs we will see a new job
8889
# show up in the search's history.
8990
def f(search):
90-
return len(search.history()) == 1
91-
testlib.wait(search, f)
91+
search.refresh()
92+
n = len(search.history())
93+
return n==1
94+
testlib.wait(search, f, timeout=120)
9295
self.assertEqual(len(search.history()), 1)
9396

9497
# When it first runs the alert count should be zero.
9598
search.refresh()
96-
self.assertEqual(alert_count(search), 0)
97-
98-
# And the fired alerts category should not exist
99-
self.assertFalse(search_name in fired_alerts)
99+
print alert_count(search)
100+
self.assertTrue((alert_count(search) == 0) == (search_name not in fired_alerts))
100101

101102
# Submit events and verify that they each trigger the expected
102103
# alert

tests/test_job.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ def test_crud(self):
118118
job.finalize()
119119

120120
# Create a new job
121-
job = jobs.create("search * | head 1000 | stats count")
121+
job = jobs.create("search * | head 1 | stats count")
122122
self.assertTrue(jobs.contains(job.sid))
123123

124124
# Set various properties on it
@@ -163,10 +163,17 @@ def test_results(self):
163163
index = service.indexes['_internal']
164164
self.assertTrue(index['totalEventCount'] > 0)
165165

166-
job = jobs.create("search index=_internal | head 100 | stats count")
166+
job = jobs.create("search index=_internal | head 1 | stats count")
167167
job.refresh()
168168
self.assertEqual(job['isDone'], '0')
169-
self.assertRaises(ValueError, job.results)
169+
# When a job was first created in Splunk 4.x, results would
170+
# return 204 before results were available. Itay requested a
171+
# change for Ace, and now it just returns 200 with an empty
172+
# <results/> element. Thus this test is obsolete. I leave it
173+
# here as a caution to future generations:
174+
# self.assertRaises(ValueError, job.results)
175+
while not job.isDone():
176+
pass
170177
reader = results.ResultsReader(job.results(timeout=60))
171178
job.refresh()
172179
self.assertEqual(job['isDone'], '1')
@@ -178,8 +185,9 @@ def test_results(self):
178185
self.assertLessEqual(int(result["count"]), 1000)
179186

180187
# Repeat the same thing, but without the .is_preview reference.
181-
job = jobs.create("search index=_internal | head 100 | stats count")
182-
self.assertRaises(ValueError, job.results)
188+
job = jobs.create("search index=_internal | head 1 | stats count")
189+
while not job.isDone():
190+
pass
183191
reader = results.ResultsReader(job.results(timeout=60))
184192
job.refresh()
185193
self.assertEqual(job['isDone'], '1')

tests/test_saved_search.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
import datetime
1818
import testlib
1919

20+
from time import sleep
21+
2022
import splunklib.client as client
2123

2224
class TestCase(testlib.TestCase):
@@ -192,13 +194,19 @@ def test_dispatch(self):
192194
self.assertTrue('sdk-test1' in saved_searches)
193195

194196
job = saved_search.dispatch()
195-
job.preview().close()
197+
while not job.isReady():
198+
pass
199+
try:
200+
job.preview().close()
201+
except ValueError:
202+
pass
196203
job.cancel()
197204

198205
# Dispatch with some additional options
199206
kwargs = { 'dispatch.buckets': 100 }
200207
job = saved_search.dispatch(**kwargs)
201-
testlib.wait(job, lambda job: bool(int(job['isDone'])))
208+
while not job.isDone():
209+
pass
202210
job.timeline().close()
203211
job.cancel()
204212

@@ -230,23 +238,27 @@ def contains(history, sid):
230238
return sid in [job.sid for job in history]
231239

232240
job1 = saved_search.dispatch()
241+
sleep(1)
233242
history = saved_search.history()
234243
self.assertEqual(len(history), 1)
235244
self.assertTrue(contains(history, job1.sid))
236245

237246
job2 = saved_search.dispatch()
247+
sleep(1)
238248
history = saved_search.history()
239249
self.assertEqual(len(history), 2)
240250
self.assertTrue(contains(history, job1.sid))
241251
self.assertTrue(contains(history, job2.sid))
242252

243253
job1.cancel()
254+
sleep(1)
244255
history = saved_search.history()
245256
self.assertEqual(len(history), 1)
246257
self.assertFalse(contains(history, job1.sid))
247258
self.assertTrue(contains(history, job2.sid))
248259

249260
job2.cancel()
261+
sleep(1)
250262
history = saved_search.history()
251263
self.assertEqual(len(history), 0)
252264
self.assertFalse(contains(history, job1.sid))

0 commit comments

Comments
 (0)