Skip to content

Commit d89061c

Browse files
authored
Merge pull request #5282 from typhoonzero/simple_pipe_reader
Simple pipe reader for hdfs or other service
2 parents 9b761ed + cd36531 commit d89061c

File tree

1 file changed

+104
-3
lines changed

1 file changed

+104
-3
lines changed

python/paddle/v2/reader/decorator.py

Lines changed: 104 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@
1414

1515
__all__ = [
1616
'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
17-
'ComposeNotAligned', 'firstn', 'xmap_readers'
17+
'ComposeNotAligned', 'firstn', 'xmap_readers', 'pipe_reader'
1818
]
1919

20+
from threading import Thread
21+
import subprocess
22+
23+
from Queue import Queue
2024
import itertools
2125
import random
22-
from Queue import Queue
23-
from threading import Thread
26+
import zlib
2427

2528

2629
def map_readers(func, *readers):
@@ -323,3 +326,101 @@ def xreader():
323326
yield sample
324327

325328
return xreader
329+
330+
331+
def _buf2lines(buf, line_break="\n"):
332+
# FIXME: line_break should be automatically configured.
333+
lines = buf.split(line_break)
334+
return lines[:-1], lines[-1]
335+
336+
337+
def pipe_reader(left_cmd,
338+
parser,
339+
bufsize=8192,
340+
file_type="plain",
341+
cut_lines=True,
342+
line_break="\n"):
343+
"""
344+
pipe_reader read data by stream from a command, take it's
345+
stdout into a pipe buffer and redirect it to the parser to
346+
parse, then yield data as your desired format.
347+
348+
You can using standard linux command or call another program
349+
to read data, from HDFS, Ceph, URL, AWS S3 etc:
350+
351+
cmd = "hadoop fs -cat /path/to/some/file"
352+
cmd = "cat sample_file.tar.gz"
353+
cmd = "curl http://someurl"
354+
cmd = "python print_s3_bucket.py"
355+
356+
A sample parser:
357+
358+
def sample_parser(lines):
359+
# parse each line as one sample data,
360+
# return a list of samples as batches.
361+
ret = []
362+
for l in lines:
363+
ret.append(l.split(" ")[1:5])
364+
return ret
365+
366+
:param left_cmd: command to excute to get stdout from.
367+
:type left_cmd: string
368+
:param parser: parser function to parse lines of data.
369+
if cut_lines is True, parser will receive list
370+
of lines.
371+
if cut_lines is False, parser will receive a
372+
raw buffer each time.
373+
parser should return a list of parsed values.
374+
:type parser: callable
375+
:param bufsize: the buffer size used for the stdout pipe.
376+
:type bufsize: int
377+
:param file_type: can be plain/gzip, stream buffer data type.
378+
:type file_type: string
379+
:param cut_lines: whether to pass lines instead of raw buffer
380+
to the parser
381+
:type cut_lines: bool
382+
:param line_break: line break of the file, like \n or \r
383+
:type line_break: string
384+
385+
:return: the reader generator.
386+
:rtype: callable
387+
"""
388+
if not isinstance(left_cmd, str):
389+
raise TypeError("left_cmd must be a string")
390+
if not callable(parser):
391+
raise TypeError("parser must be a callable object")
392+
393+
process = subprocess.Popen(
394+
left_cmd.split(" "), bufsize=bufsize, stdout=subprocess.PIPE)
395+
# TODO(typhoonzero): add a thread to read stderr
396+
397+
# Always init a decompress object is better than
398+
# create in the loop.
399+
dec = zlib.decompressobj(
400+
32 + zlib.MAX_WBITS) # offset 32 to skip the header
401+
402+
def reader():
403+
remained = ""
404+
while True:
405+
buff = process.stdout.read(bufsize)
406+
if buff:
407+
if file_type == "gzip":
408+
decomp_buff = dec.decompress(buff)
409+
elif file_type == "plain":
410+
decomp_buff = buff
411+
else:
412+
raise TypeError("file_type %s is not allowed" % file_type)
413+
414+
if cut_lines:
415+
lines, remained = _buf2lines(''.join(
416+
[remained, decomp_buff]), line_break)
417+
parsed_list = parser(lines)
418+
for ret in parsed_list:
419+
yield ret
420+
else:
421+
for ret in parser(decomp_buff):
422+
yield ret
423+
else:
424+
break
425+
426+
return reader

0 commit comments

Comments
 (0)