@@ -323,3 +323,101 @@ def xreader():
323
323
yield sample
324
324
325
325
return xreader
326
+
327
+
328
+ def _buf2lines (buf , line_break = "\n " ):
329
+ # FIXME: line_break should be automatically configured.
330
+ lines = buf .split (line_break )
331
+ return lines [:- 1 ], lines [- 1 ]
332
+
333
+
334
+ def pipe_reader (left_cmd ,
335
+ parser ,
336
+ bufsize = 8192 ,
337
+ file_type = "plain" ,
338
+ cut_lines = True ,
339
+ line_break = "\n " ):
340
+ """
341
+ pipe_reader read data by stream from a command, take it's
342
+ stdout into a pipe buffer and redirect it to the parser to
343
+ parse, then yield data as your desired format.
344
+
345
+ You can using standard linux command or call another program
346
+ to read data, from HDFS, Ceph, URL, AWS S3 etc:
347
+
348
+ cmd = "hadoop fs -cat /path/to/some/file"
349
+ cmd = "cat sample_file.tar.gz"
350
+ cmd = "curl http://someurl"
351
+ cmd = "python print_s3_bucket.py"
352
+
353
+ A sample parser:
354
+
355
+ def sample_parser(lines):
356
+ # parse each line as one sample data,
357
+ # return a list of samples as batches.
358
+ ret = []
359
+ for l in lines:
360
+ ret.append(l.split(" ")[1:5])
361
+ return ret
362
+
363
+ :param left_cmd: command to excute to get stdout from.
364
+ :type left_cmd: string
365
+ :param parser: parser function to parse lines of data.
366
+ if cut_lines is True, parser will receive list
367
+ of lines.
368
+ if cut_lines is False, parser will receive a
369
+ raw buffer each time.
370
+ parser should return a list of parsed values.
371
+ :type parser: callable
372
+ :param bufsize: the buffer size used for the stdout pipe.
373
+ :type bufsize: int
374
+ :param file_type: can be plain/gzip, stream buffer data type.
375
+ :type file_type: string
376
+ :param cut_lines: whether to pass lines instead of raw buffer
377
+ to the parser
378
+ :type cut_lines: bool
379
+ :param line_break: line break of the file, like \n or \r
380
+ :type line_break: string
381
+
382
+ :return: the reader generator.
383
+ :rtype: callable
384
+ """
385
+ if not isinstance (left_cmd , str ):
386
+ raise TypeError ("left_cmd must be a string" )
387
+ if not callable (parser ):
388
+ raise TypeError ("parser must be a callable object" )
389
+
390
+ process = subprocess .Popen (
391
+ left_cmd .split (" " ), bufsize = bufsize , stdout = subprocess .PIPE )
392
+ # TODO(typhoonzero): add a thread to read stderr
393
+
394
+ # Always init a decompress object is better than
395
+ # create in the loop.
396
+ dec = zlib .decompressobj (
397
+ 32 + zlib .MAX_WBITS ) # offset 32 to skip the header
398
+
399
+ def reader ():
400
+ remained = ""
401
+ while True :
402
+ buff = process .stdout .read (bufsize )
403
+ if buff :
404
+ if file_type == "gzip" :
405
+ decomp_buff = dec .decompress (buff )
406
+ elif file_type == "plain" :
407
+ decomp_buff = buff
408
+ else :
409
+ raise TypeError ("file_type %s is not allowed" % file_type )
410
+
411
+ if cut_lines :
412
+ lines , remained = _buf2lines ('' .join (
413
+ [remained , decomp_buff ]), line_break )
414
+ parsed_list = parser (lines )
415
+ for ret in parsed_list :
416
+ yield ret
417
+ else :
418
+ for ret in parser (decomp_buff ):
419
+ yield ret
420
+ else :
421
+ break
422
+
423
+ return reader
0 commit comments