Skip to content

Commit d1dbe14

Browse files
committed
Remove deprecated ResultsReader and related code
1 parent 76e3722 commit d1dbe14

File tree

3 files changed

+3
-454
lines changed

3 files changed

+3
-454
lines changed

splunklib/results.py

Lines changed: 3 additions & 209 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,10 @@
3232
print(f"Results are a preview: {reader.is_preview}")
3333
"""
3434

35-
from io import BufferedReader, BytesIO
36-
37-
38-
import xml.etree.ElementTree as et
39-
40-
from collections import OrderedDict
35+
from io import BufferedReader
4136
from json import loads as json_loads
4237

43-
__all__ = ["ResultsReader", "Message", "JSONResultsReader"]
44-
45-
import deprecation
38+
__all__ = ["JSONResultsReader", "Message"]
4639

4740

4841
class Message:
@@ -70,205 +63,6 @@ def __hash__(self):
7063
return hash((self.type, self.message))
7164

7265

73-
class _ConcatenatedStream:
74-
"""Lazily concatenate zero or more streams into a stream.
75-
76-
As you read from the concatenated stream, you get characters from
77-
each stream passed to ``_ConcatenatedStream``, in order.
78-
79-
**Example**::
80-
81-
from StringIO import StringIO
82-
s = _ConcatenatedStream(StringIO("abc"), StringIO("def"))
83-
assert s.read() == "abcdef"
84-
"""
85-
86-
def __init__(self, *streams):
87-
self.streams = list(streams)
88-
89-
def read(self, n=None):
90-
"""Read at most *n* characters from this stream.
91-
92-
If *n* is ``None``, return all available characters.
93-
"""
94-
response = b""
95-
while len(self.streams) > 0 and (n is None or n > 0):
96-
txt = self.streams[0].read(n)
97-
response += txt
98-
if n is not None:
99-
n -= len(txt)
100-
if n is None or n > 0:
101-
del self.streams[0]
102-
return response
103-
104-
105-
class _XMLDTDFilter:
106-
"""Lazily remove all XML DTDs from a stream.
107-
108-
All substrings matching the regular expression <?[^>]*> are
109-
removed in their entirety from the stream. No regular expressions
110-
are used, however, so everything still streams properly.
111-
112-
**Example**::
113-
114-
from StringIO import StringIO
115-
s = _XMLDTDFilter("<?xml abcd><element><?xml ...></element>")
116-
assert s.read() == "<element></element>"
117-
"""
118-
119-
def __init__(self, stream):
120-
self.stream = stream
121-
122-
def read(self, n=None):
123-
"""Read at most *n* characters from this stream.
124-
125-
If *n* is ``None``, return all available characters.
126-
"""
127-
response = b""
128-
while n is None or n > 0:
129-
c = self.stream.read(1)
130-
if c == b"":
131-
break
132-
if c == b"<":
133-
c += self.stream.read(1)
134-
if c == b"<?":
135-
while True:
136-
q = self.stream.read(1)
137-
if q == b">":
138-
break
139-
else:
140-
response += c
141-
if n is not None:
142-
n -= len(c)
143-
else:
144-
response += c
145-
if n is not None:
146-
n -= 1
147-
return response
148-
149-
150-
@deprecation.deprecated(
151-
details="Use the JSONResultsReader function instead in conjuction with the 'output_mode' query param set to 'json'"
152-
)
153-
class ResultsReader:
154-
"""This class returns dictionaries and Splunk messages from an XML results
155-
stream.
156-
157-
``ResultsReader`` is iterable, and returns a ``dict`` for results, or a
158-
:class:`Message` object for Splunk messages. This class has one field,
159-
``is_preview``, which is ``True`` when the results are a preview from a
160-
running search, or ``False`` when the results are from a completed search.
161-
162-
This function has no network activity other than what is implicit in the
163-
stream it operates on.
164-
165-
:param `stream`: The stream to read from (any object that supports
166-
``.read()``).
167-
168-
**Example**::
169-
170-
import results
171-
response = ... # the body of an HTTP response
172-
reader = results.ResultsReader(response)
173-
for result in reader:
174-
if isinstance(result, dict):
175-
print(f"Result: {result}")
176-
elif isinstance(result, results.Message):
177-
print(f"Message: {result}")
178-
print(f"is_preview = {reader.is_preview}")
179-
"""
180-
181-
# Be sure to update the docstrings of client.Jobs.oneshot,
182-
# client.Job.results_preview and client.Job.results to match any
183-
# changes made to ResultsReader.
184-
#
185-
# This wouldn't be a class, just the _parse_results function below,
186-
# except that you cannot get the current generator inside the
187-
# function creating that generator. Thus it's all wrapped up for
188-
# the sake of one field.
189-
def __init__(self, stream):
190-
# The search/jobs/exports endpoint, when run with
191-
# earliest_time=rt and latest_time=rt streams a sequence of
192-
# XML documents, each containing a result, as opposed to one
193-
# results element containing lots of results. Python's XML
194-
# parsers are broken, and instead of reading one full document
195-
# and returning the stream that follows untouched, they
196-
# destroy the stream and throw an error. To get around this,
197-
# we remove all the DTD definitions inline, then wrap the
198-
# fragments in a fiction <doc> element to make the parser happy.
199-
stream = _XMLDTDFilter(stream)
200-
stream = _ConcatenatedStream(BytesIO(b"<doc>"), stream, BytesIO(b"</doc>"))
201-
self.is_preview = None
202-
self._gen = self._parse_results(stream)
203-
204-
def __iter__(self):
205-
return self
206-
207-
def __next__(self):
208-
return next(self._gen)
209-
210-
def _parse_results(self, stream):
211-
"""Parse results and messages out of *stream*."""
212-
result = None
213-
values = None
214-
try:
215-
for event, elem in et.iterparse(stream, events=("start", "end")):
216-
if elem.tag == "results" and event == "start":
217-
# The wrapper element is a <results preview="0|1">. We
218-
# don't care about it except to tell is whether these
219-
# are preview results, or the final results from the
220-
# search.
221-
is_preview = elem.attrib["preview"] == "1"
222-
self.is_preview = is_preview
223-
if elem.tag == "result":
224-
if event == "start":
225-
result = OrderedDict()
226-
elif event == "end":
227-
yield result
228-
result = None
229-
elem.clear()
230-
231-
elif elem.tag == "field" and result is not None:
232-
# We need the 'result is not None' check because
233-
# 'field' is also the element name in the <meta>
234-
# header that gives field order, which is not what we
235-
# want at all.
236-
if event == "start":
237-
values = []
238-
elif event == "end":
239-
field_name = elem.attrib["k"]
240-
if len(values) == 1:
241-
result[field_name] = values[0]
242-
else:
243-
result[field_name] = values
244-
# Calling .clear() is necessary to let the
245-
# element be garbage collected. Otherwise
246-
# arbitrarily large results sets will use
247-
# arbitrarily large memory intead of
248-
# streaming.
249-
elem.clear()
250-
251-
elif elem.tag in ("text", "v") and event == "end":
252-
text = "".join(elem.itertext())
253-
values.append(text)
254-
elem.clear()
255-
256-
elif elem.tag == "msg":
257-
if event == "start":
258-
msg_type = elem.attrib["type"]
259-
elif event == "end":
260-
text = elem.text if elem.text is not None else ""
261-
yield Message(msg_type, text)
262-
elem.clear()
263-
except SyntaxError as pe:
264-
# This is here to handle the same incorrect return from
265-
# splunk that is described in __init__.
266-
if "no element found" in pe.msg:
267-
return
268-
else:
269-
raise
270-
271-
27266
class JSONResultsReader:
27367
"""This class returns dictionaries and Splunk messages from a JSON results
27468
stream.
@@ -303,7 +97,7 @@ class JSONResultsReader:
30397
# except that you cannot get the current generator inside the
30498
# function creating that generator. Thus it's all wrapped up for
30599
# the sake of one field.
306-
def __init__(self, stream):
100+
def __init__(self, stream) -> None:
307101
# The search/jobs/exports endpoint, when run with
308102
# earliest_time=rt and latest_time=rt, output_mode=json, streams a sequence of
309103
# JSON documents, each containing a result, as opposed to one

tests/integration/test_job.py

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -438,70 +438,5 @@ def test_v1_job_fallback(self):
438438
self.assertEqual(n_events, n_preview, n_results)
439439

440440

441-
class TestResultsReader(unittest.TestCase):
442-
def test_results_reader(self):
443-
# Run jobs.export("search index=_internal | stats count",
444-
# earliest_time="rt", latest_time="rt") and you get a
445-
# streaming sequence of XML fragments containing results.
446-
test_dir = Path(__file__).parent
447-
data_file = test_dir / "data" / "results.xml"
448-
with io.open(str(data_file), mode="br") as input:
449-
reader = results.ResultsReader(input)
450-
self.assertFalse(reader.is_preview)
451-
N_results = 0
452-
N_messages = 0
453-
for r in reader:
454-
from collections import OrderedDict
455-
456-
self.assertTrue(
457-
isinstance(r, OrderedDict) or isinstance(r, results.Message)
458-
)
459-
if isinstance(r, OrderedDict):
460-
N_results += 1
461-
elif isinstance(r, results.Message):
462-
N_messages += 1
463-
self.assertEqual(N_results, 4999)
464-
self.assertEqual(N_messages, 2)
465-
466-
def test_results_reader_with_streaming_results(self):
467-
# Run jobs.export("search index=_internal | stats count",
468-
# earliest_time="rt", latest_time="rt") and you get a
469-
# streaming sequence of XML fragments containing results.
470-
test_dir = Path(__file__).parent
471-
data_file = test_dir / "data" / "streaming_results.xml"
472-
with io.open(str(data_file), "br") as input:
473-
reader = results.ResultsReader(input)
474-
N_results = 0
475-
N_messages = 0
476-
for r in reader:
477-
from collections import OrderedDict
478-
479-
self.assertTrue(
480-
isinstance(r, OrderedDict) or isinstance(r, results.Message)
481-
)
482-
if isinstance(r, OrderedDict):
483-
N_results += 1
484-
elif isinstance(r, results.Message):
485-
N_messages += 1
486-
self.assertEqual(N_results, 3)
487-
self.assertEqual(N_messages, 3)
488-
489-
def test_xmldtd_filter(self):
490-
s = results._XMLDTDFilter(
491-
BytesIO(
492-
b"""<?xml asdf awe awdf=""><boris>Other stuf</boris><?xml dafawe \n asdfaw > ab"""
493-
)
494-
)
495-
self.assertEqual(s.read(), b"<boris>Other stuf</boris> ab")
496-
497-
def test_concatenated_stream(self):
498-
s = results._ConcatenatedStream(
499-
BytesIO(b"This is a test "), BytesIO(b"of the emergency broadcast system.")
500-
)
501-
self.assertEqual(s.read(3), b"Thi")
502-
self.assertEqual(s.read(20), b"s is a test of the e")
503-
self.assertEqual(s.read(), b"mergency broadcast system.")
504-
505-
506441
if __name__ == "__main__":
507442
unittest.main()

0 commit comments

Comments
 (0)